API¶
The top level pandablocks module contains a number of packages that can be used from code:
pandablocks.commands: The control commands that can be sent to a PandApandablocks.responses: The control and data responses that will be receivedpandablocks.connections: Control and data connections that implements the parsing logicpandablocks.asyncio: An asyncio client that uses the control and data connectionspandablocks.blocking: A blocking client that uses the control and data connectionspandablocks.hdf: Some helpers for efficiently writing data responses to disk
Commands¶
There is a Command subclass for every sort of command that can be sent to
the ControlConnection of a PandA. Many common actions can be accomplished
with a simple Get or Put, but some convenience commands like
GetBlockNumbers, GetFields, etc. are provided that parse output into
specific classes.
- 
exception 
pandablocks.commands.CommandException[source]¶ Raised if a
Commandreceives a mal-formed response
- 
class 
pandablocks.commands.Command[source]¶ Abstract baseclass for all
ControlConnectioncommands to be inherited from- 
lines() → Union[bytes, List[bytes]][source]¶ Return lines that should be sent to the PandA, with no newlines in them
- 
response(lines: Union[bytes, List[bytes]]) → T[source]¶ Create a response from the lines received from the PandA
- 
ok_if(ok, lines: Union[bytes, List[bytes]])[source]¶ If not ok then raise a suitable
CommandException
- 
 
- 
class 
pandablocks.commands.Get(field: str)[source]¶ Get the value of a field or star command.
- Parameters
 field – The field, attribute, or star command to get
For example:
Get("PCAP.ACTIVE") -> b"1" Get("SEQ1.TABLE") -> [b"1048576", b"0", b"1000", b"1000"] Get("*IDN") -> b"PandA 1.1..."
- 
class 
pandablocks.commands.Put(field: str, value: Union[str, List[str]] = '')[source]¶ Put the value of a field.
- Parameters
 field – The field, attribute, or star command to put
value – The value, possibly multiline, to put
For example:
Put("PCAP.TRIG", "PULSE1.OUT") Put("SEQ1.TABLE", ["1048576", "0", "1000", "1000"])
- 
class 
pandablocks.commands.GetBlockNumbers[source]¶ Get the descriptions and field lists of the requested Blocks.
For example:
GetBlockNumbers() -> {"LUT": 8, "PCAP": 1, ...}
- 
class 
pandablocks.commands.GetFields(block: str)[source]¶ Get the fields of a block, returning a
FieldTypefor each one.- Parameters
 block – The name of the block type
For example:
GetFields("LUT") -> {"INPA": FieldType("bit_mux"), ...}
- 
class 
pandablocks.commands.GetPcapBitsLabels[source]¶ Get the labels for the bit fields in PCAP.
For example:
GetPcapBitsLabels() -> PcapBitsLabels()
- 
class 
pandablocks.commands.GetChanges[source]¶ Get the changes since the last time this was called.
For example:
GetChanges() -> Changes()
Responses¶
Classes used in responses from both the ControlConnection and
DataConnection of a PandA live in this package.
- 
class 
pandablocks.responses.FieldType(type: str, subtype: Optional[str] = None)[source]¶ Field type and subtype as exposed by TCP server: https://pandablocks-server.readthedocs.io/en/latest/fields.html#field-types
- 
type¶ Field type, like “param”, “bit_out”, “pos_mux”, etc.
- 
subtype¶ Some types have subtype, like “uint”, “scalar”, “lut”, etc.
- 
 
- 
class 
pandablocks.responses.EndReason(value)[source]¶ The reason that a PCAP acquisition completed
- 
OK= 'Ok'¶ Experiment completed by falling edge of
PCAP.ENABLE`
- 
DISARMED= 'Disarmed'¶ Experiment manually completed by
*PCAP.DISARM=command
- 
EARLY_DISCONNECT= 'Early disconnect'¶ Client disconnect detected
- 
DATA_OVERRUN= 'Data overrun'¶ Client not taking data quickly or network congestion, internal buffer overflow
- 
FRAMING_ERROR= 'Framing error'¶ Triggers too fast for configured data capture
- 
DRIVER_DATA_OVERRUN= 'Driver data overrun'¶ Probable CPU overload on PandA, should not occur
- 
DMA_DATA_ERROR= 'DMA data error'¶ Data capture too fast for memory bandwidth
- 
 
- 
class 
pandablocks.responses.FieldCapture(name: str, type: numpy.dtype, capture: str, scale: float = 1.0, offset: float = 0.0, units: str = '')[source]¶ Information about a field that is being captured
- 
name¶ Name of captured field
- 
type¶ Numpy data type of the field as transmitted
- 
capture¶ Value of CAPTURE field used to enable this field
- 
scale¶ Scaling factor, default 1.0
- 
offset¶ Offset, default 0.0
- 
units¶ Units string, default “”
- 
 
- 
class 
pandablocks.responses.Data[source]¶ Baseclass for all responses yielded by a
DataConnection
- 
class 
pandablocks.responses.ReadyData[source]¶ Yielded once when the connection is established and ready to take data
- 
class 
pandablocks.responses.StartData(fields: List[pandablocks.responses.FieldCapture], missed: int, process: str, format: str, sample_bytes: int)[source]¶ Yielded when a new PCAP acquisition starts.
- 
fields¶ Information about each captured field as a
FieldCaptureobject
- 
missed¶ Number of samples missed by late data port connection
- 
process¶ Data processing option, only “Scaled” or “Raw” are requested
- 
format¶ Data delivery formatting, only “Framed” is requested
- 
sample_bytes¶ Number of bytes in one sample
- 
 
- 
class 
pandablocks.responses.FrameData(data: numpy.ndarray)[source]¶ Yielded when a new data frame is flushed.
- 
data¶ A numpy
Structured Array
Data is structured into complete columns. Each column name is
<name>.<capture>from the correspondingFieldType. Data can be accessed with these column names. For example:# Table view with 2 captured fields >>> fdata.data array([(0, 10), (1, 11), (2, 12)], dtype=[('COUNTER1.OUT.Value', '<f8'), ('COUNTER2.OUT.Value', '<f8')]) # Row view >>> fdata.data[0] (0, 10) # Column names >>> fdata.column_names ('COUNTER1.OUT.Value', 'COUNTER2.OUT.Value') # Column view >>> fdata.data['COUNTER1.OUT.Value'] (0, 1, 2)
- 
property 
column_names¶ Return all the column names
- 
 
Connections¶
Sans-IO connections for both the Control and Data ports of PandA TCP server.
- 
exception 
pandablocks.connections.NeedMoreData[source]¶ Raised if the
Bufferisn’t full enough to return the requested bytes
- 
class 
pandablocks.connections.Buffer[source]¶ Byte storage that provides line reader and bytes reader interfaces. For example:
buf = Buffer() buf += bytes_from_server line = buf.read_line() # raises NeedMoreData if no line for line in buf: pass bytes = buf.read_bytes(50) # raises NeedMoreData if not enough bytes
- 
read_bytes(num: int) → bytearray[source]¶ Read and pop num bytes from the beginning of the buffer, raising
NeedMoreDataif the buffer isn’t full enough to do so
- 
peek_bytes(num: int) → bytearray[source]¶ Read but do not pop num bytes from the beginning of the buffer, raising
NeedMoreDataif the buffer isn’t full enough to do so
- 
read_line()[source]¶ Read and pop a newline terminated line (without terminator) from the beginning of the buffer, raising
NeedMoreDataif the buffer isn’t full enough to do so
- 
 
- 
class 
pandablocks.connections.ControlConnection[source]¶ Sans-IO connection to control port of PandA TCP server, supporting a Command based interface. For example:
cc = ControlConnection() # Connection says what bytes should be sent to execute command to_send = cc.send(command) socket.sendall(to_send) while True: # Repeatedly process bytes from the server looking for responses from_server = socket.recv() for command, response in cc.receive_bytes(from_server): do_something_with(response)
- 
receive_bytes(received: bytes) → Iterator[Tuple[pandablocks.commands.Command, Any]][source]¶ Tell the connection that you have received some bytes off the network. Parse these into high level responses which are yielded back with the command that triggered this response
- 
send(command: pandablocks.commands.Command) → bytes[source]¶ Tell the connection you want to send an event, and it will return some bytes to send down the network
- 
 
- 
class 
pandablocks.connections.DataConnection[source]¶ Sans-IO connection to data port of PandA TCP server, supporting an flushable iterator interface. For example:
dc = ControlConnection() # Single connection string to send to_send = dc.connect() socket.sendall(to_send) while True: # Repeatedly process bytes from the server looking for data from_server = socket.recv() for data in dc.receive_bytes(from_server): do_something_with(data)
- 
receive_bytes(received: bytes, flush_every_frame=True) → Iterator[pandablocks.responses.Data][source]¶ Tell the connection that you have received some bytes off the network. Parse these into Data structures and yield them back.
- 
flush() → Iterator[pandablocks.responses.FrameData][source]¶ If there is a partial data frame, pop and yield it
- 
 
Asyncio Client¶
This is an asyncio wrapper to the ControlConnection and DataConnection
objects, allowing async calls to send(command) and iterate over
data().
- 
class 
pandablocks.asyncio.AsyncioClient(host: str)[source]¶ Asyncio implementation of a PandABlocks client. For example:
async with AsyncioClient("hostname-or-ip") as client: # Control and data ports are now connected resp1, resp2 = await asyncio.gather(client.send(cmd1), client.send(cmd2)) resp3 = await client.send(cmd3) async for data in client.data(): handle(data) # Control and data ports are now disconnected
- 
async 
send(command: pandablocks.commands.Command[T]) → T[source]¶ Send a command to control port of the PandA, returning its response.
- Parameters
 command – The
Commandto send
- 
data(scaled: bool = True, flush_period: float = None, frame_timeout: float = None) → AsyncGenerator[pandablocks.responses.Data, None][source]¶ Connect to data port and yield data frames
- Parameters
 scaled – Whether to scale and average data frames, reduces throughput
flush_period – How often to flush partial data frames, None is on every chunk of data from the server
frame_timeout – If no data is received for this amount of time, raise
asyncio.TimeoutError
- 
async 
 
Blocking Client¶
This is a blocking wrapper to the ControlConnection and DataConnection
objects, allowing blocking calls to send(commands) and iterate over
data().
- 
class 
pandablocks.blocking.BlockingClient(host: str)[source]¶ Blocking implementation of a PandABlocks client. For example:
with BlockingClient("hostname-or-ip") as client: # Control and data ports are now connected resp1, resp2 = client.send([cmd1, cmd2]) resp3 = client.send(cmd3) for data in client.data(): handle(data) # Control and data ports are now disconnected
- 
send(commands: pandablocks.commands.Command[T], timeout: int = 'None') → T[source]¶ - 
send(commands: Iterable[pandablocks.commands.Command], timeout: int = 'None') → List Send a command to control port of the PandA, returning its response.
- Parameters
 commands – If single
Command, return its response. If a list of commands return a list of reponsestimeout – If no reponse in this time, raise
socket.timeout
- 
data(scaled: bool = True, frame_timeout: int = None) → Iterator[pandablocks.responses.Data][source]¶ Connect to data port and yield data frames
- Parameters
 scaled – Whether to scale and average data frames, reduces throughput
frame_timeout – If no data is received for this amount of time, raise
socket.timeout
- 
 
HDF Writing¶
This package contains components needed to write PCAP data to and HDF file
in the most efficient way. The oneshot write_hdf_files is exposed in the
commandline interface. It assembles a short Pipeline of:
The FrameProcessor and HDFWriter run in their own threads as most of the heavy lifting is done by numpy and h5py, so running in their own threads gives multi-CPU benefits without hitting the limit of the GIL.
- 
class 
pandablocks.hdf.Pipeline[source]¶ Helper class that runs a pipeline consumer process in its own thread
- 
what_to_do: Dict[Type, Callable]¶ Subclasses should create this dictionary with handlers for each data type, returning transformed data that should be passed downstream
- 
 
- 
class 
pandablocks.hdf.HDFWriter(scheme: str)[source]¶ Write an HDF file per data collection. Each field will be written in a 1D dataset
/<field.name>.<field.capture>.- Parameters
 scheme – Filepath scheme, where %d will be replaced with the acquisition number, starting with 1
- 
class 
pandablocks.hdf.FrameProcessor[source]¶ Scale field data according to the information in the StartData
- 
pandablocks.hdf.create_pipeline(*elements: pandablocks.hdf.Pipeline) → List[pandablocks.hdf.Pipeline][source]¶ Create a pipeline of elements, wiring them and starting them before returning them
- 
pandablocks.hdf.stop_pipeline(pipeline: List[pandablocks.hdf.Pipeline])[source]¶ Stop and join each element of the pipeline
- 
async 
pandablocks.hdf.write_hdf_files(client: pandablocks.asyncio.AsyncioClient, scheme: str, num: int = 1, arm: bool = False)[source]¶ Connect to host PandA data port, and write num acquisitions to HDF file according to scheme
- Parameters
 client – The
AsyncioClientto use for communicationsscheme – Filenaming scheme for HDF files, with %d for scan number starting at 1
num – The number of acquisitions to store in separate files
arm – Whether to arm PCAP at the start, and after each successful acquisition