Source code for pandablocks.commands

from dataclasses import dataclass
from typing import Dict, Generic, List, Tuple, TypeVar, Union

from .responses import FieldType

T = TypeVar("T")
# One or more lines to send
Lines = Union[bytes, List[bytes]]


[docs]class CommandException(Exception): """Raised if a `Command` receives a mal-formed response"""
[docs]@dataclass class Command(Generic[T]): """Abstract baseclass for all `ControlConnection` commands to be inherited from"""
[docs] def lines(self) -> Lines: """Return lines that should be sent to the PandA, with no newlines in them""" raise NotImplementedError(self)
[docs] def response(self, lines: Lines) -> T: """Create a response from the lines received from the PandA""" raise NotImplementedError(self)
[docs] def ok_if(self, ok, lines: Lines): """If not ok then raise a suitable `CommandException`""" if not ok: msg = f"{self} ->" if isinstance(lines, list): for line in lines: msg += "\n " + line.decode() else: msg += " " + lines.decode() raise CommandException(msg)
[docs]@dataclass class Get(Command[Lines]): """Get the value of a field or star command. Args: field: The field, attribute, or star command to get For example:: Get("PCAP.ACTIVE") -> b"1" Get("SEQ1.TABLE") -> [b"1048576", b"0", b"1000", b"1000"] Get("*IDN") -> b"PandA 1.1..." """ field: str def lines(self) -> Lines: return f"{self.field}?".encode()
[docs] def response(self, lines: Lines) -> Lines: """The value that was requested as a byte string. If it is multiline then it will be a list of byte strings""" if not isinstance(lines, list): # We got OK =value self.ok_if(lines.startswith(b"OK ="), lines) return lines[4:] else: return lines
[docs]@dataclass class Put(Command[None]): """Put the value of a field. Args: field: The field, attribute, or star command to put value: The value, possibly multiline, to put For example:: Put("PCAP.TRIG", "PULSE1.OUT") Put("SEQ1.TABLE", ["1048576", "0", "1000", "1000"]) """ field: str value: Union[str, List[str]] = "" def lines(self) -> Lines: if isinstance(self.value, list): # Multiline table with blank line to terminate return ( [f"{self.field}<".encode()] + [v.encode() for v in self.value] + [b""] ) else: return f"{self.field}={self.value}".encode() def response(self, lines: Lines): self.ok_if(lines == b"OK", lines)
[docs]class Arm(Command[None]): """Arm PCAP for an acquisition by sending ``*PCAP.ARM=``""" def lines(self) -> Lines: return b"*PCAP.ARM=" def response(self, lines: Lines): self.ok_if(lines == b"OK", lines)
[docs]class GetBlockNumbers(Command[Dict[str, int]]): """Get the descriptions and field lists of the requested Blocks. For example:: GetBlockNumbers() -> {"LUT": 8, "PCAP": 1, ...} """ def lines(self) -> Lines: return b"*BLOCKS?"
[docs] def response(self, lines: Lines) -> Dict[str, int]: """The name and number of each block type in a dictionary, alphabetically ordered""" blocks = {} assert isinstance(lines, list), f"Expected list of Blocks, got {lines!r}" for line in lines: block, num = line.split() blocks[block.decode()] = int(num) return {block: num for block, num in sorted(blocks.items())}
[docs]@dataclass class GetFields(Command[Dict[str, FieldType]]): """Get the fields of a block, returning a `FieldType` for each one. Args: block: The name of the block type For example:: GetFields("LUT") -> {"INPA": FieldType("bit_mux"), ...} """ block: str def lines(self) -> Lines: return f"{self.block}.*?".encode()
[docs] def response(self, lines: Lines) -> Dict[str, FieldType]: """The name and `FieldType` of each field in a dictionary, ordered to match the definition order in the PandA""" unsorted: Dict[int, Tuple[str, FieldType]] = {} assert isinstance(lines, list), f"Expected list of Fields, got {lines!r}" for line in lines: name, index, type_subtype = line.decode().split(maxsplit=2) unsorted[int(index)] = (name, FieldType(*type_subtype.split())) # Dict keeps insertion order, so insert in the order the server said fields = {name: field for _, (name, field) in sorted(unsorted.items())} return fields
[docs]class GetPcapBitsLabels(Command): """Get the labels for the bit fields in PCAP. For example:: GetPcapBitsLabels() -> PcapBitsLabels() """
[docs]class GetChanges(Command): """Get the changes since the last time this was called. For example:: GetChanges() -> Changes() """
# Checks whether the server will interpret cmd as a table command: search for # first of '?', '=', '<', if '<' found first then it's a multiline command. def is_multiline_command(cmd: str): for ch in cmd: if ch in "?=": return False if ch == "<": return True return False
[docs]@dataclass class Raw(Command[List[str]]): """Send a raw command Args: inp: The input lines to send For example:: Raw(["PCAP.ACTIVE?"]) -> ["OK =1"] Raw(["SEQ1.TABLE>", "1", "1", "0", "0"]) -> ["OK"] Raw(["SEQ1.TABLE?"]) -> ["!1", "!1", "!0", "!0", "."]) """ inp: List[str] def lines(self) -> Lines: return [line.encode() for line in self.inp]
[docs] def response(self, lines: Lines) -> List[str]: """The lines that PandA responded, including the multiline markup""" if isinstance(lines, List): # Add the multiline markup back in... return [f"!{line.decode()}" for line in lines] + ["."] else: return [lines.decode()]