Source code for pandablocks.connections

import struct
import sys
import xml.etree.ElementTree as ET
from collections import deque
from dataclasses import dataclass
from typing import Any, Callable, Deque, Iterator, List, Optional, Tuple

import numpy as np

from ._exchange import Exchange, ExchangeGenerator, Exchanges
from .commands import Command, CommandException
from .responses import (
    Data,
    EndData,
    EndReason,
    FieldCapture,
    FrameData,
    ReadyData,
    StartData,
)

# Define the public API of this module
__all__ = [
    "NeedMoreData",
    "NoContextAvailable",
    "Buffer",
    "ControlConnection",
    "DataConnection",
]

# The name of the samples field used for averaging unscaled fields
SAMPLES_FIELD = "PCAP.SAMPLES.Value"


[docs] class NeedMoreData(Exception): """Raised if the `Buffer` isn't full enough to return the requested bytes"""
[docs] class NoContextAvailable(Exception): """Raised if there were no contexts available for this connection. This may result from calling `ControlConnection.receive_bytes()` without calling `ControlConnection.send()`, or if there were unmatched sends/receives"""
[docs] class Buffer: """Byte storage that provides line reader and bytes reader interfaces. For example:: buf = Buffer() buf += bytes_from_server line = buf.read_line() # raises NeedMoreData if no line for line in buf: pass bytes = buf.read_bytes(50) # raises NeedMoreData if not enough bytes """ def __init__(self): self._buf = bytearray() def __iadd__(self, byteslike: bytes): """Add some data from the server""" self._buf += byteslike return self def _extract_frame(self, num_to_extract, num_to_discard=0) -> bytearray: # extract num_to_extract bytes from the start of the buffer frame = self._buf[:num_to_extract] # Update the buffer in place, to take advantage of bytearray's # optimized delete-from-beginning feature. del self._buf[: num_to_extract + num_to_discard] return frame
[docs] def read_bytes(self, num: int) -> bytearray: """Read and pop num bytes from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so""" if num > len(self._buf): raise NeedMoreData() else: return self._extract_frame(num)
[docs] def peek_bytes(self, num: int) -> bytearray: """Read but do not pop num bytes from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so""" if num > len(self._buf): raise NeedMoreData() else: return self._buf[:num]
[docs] def read_line(self): """Read and pop a newline terminated line (without terminator) from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so""" idx = self._buf.find(b"\n") if idx < 0: raise NeedMoreData() else: return self._extract_frame(idx, num_to_discard=1)
def __iter__(self): return self def __next__(self) -> bytes: try: return self.read_line() except NeedMoreData: raise StopIteration()
@dataclass class _ExchangeContext: #: The exchange we should be filling exchange: Exchange #: The command that produced it command: Command #: If this was the last in the list, the generator to call next generator: Optional[ExchangeGenerator[Any]] = None def exception(self, e: Exception) -> CommandException: """Return a `CommandException` with the sent and received strings in the text""" msg = f"{self.command} ->" if self.exchange.is_multiline: for line in self.exchange.multiline: msg += "\n " + line else: msg += " " + self.exchange.line if e.args: msg += f"\n{type(e).__name__}:{e}" return CommandException(msg).with_traceback(e.__traceback__)
[docs] class ControlConnection: """Sans-IO connection to control port of PandA TCP server, supporting a Command based interface. For example:: cc = ControlConnection() # Connection says what bytes should be sent to execute command to_send = cc.send(command) socket.sendall(to_send) while True: # Repeatedly process bytes from the PandA received = socket.recv() # Sending any subsequent bytes to be sent back to the PandA to_send = cc.receive_bytes(received) socket.sendall(to_send) # And processing the produced responses for command, response in cc.responses() do_something_with(response) """ def __init__(self) -> None: self._buf = Buffer() self._lines: List[str] = [] self._contexts: Deque[_ExchangeContext] = deque() self._responses: Deque[Tuple[Command, Any]] = deque() def _update_contexts(self, lines: List[str], is_multiline=False) -> bytes: to_send = b"" if len(self._contexts) == 0: raise NoContextAvailable() context = self._contexts.popleft() # Update the exchange with what we've got context.exchange.received = lines context.exchange.is_multiline = is_multiline # If we're given a generator to run then do so if context.generator: try: # Return the bytes from sending the next bit of the command exchanges = next(context.generator) except StopIteration as e: # Command complete, store the result self._responses.append((context.command, e.value)) except Exception as e: # Command failed, store an exception self._responses.append((context.command, context.exception(e))) else: to_send = b"".join( self._bytes_from_exchanges( exchanges, context.command, context.generator ) ) return to_send def _bytes_from_exchanges( self, exchanges: Exchanges, command: Command, generator: ExchangeGenerator[Any] ) -> Iterator[bytes]: if not isinstance(exchanges, list): exchanges = [exchanges] # No Exchanges when a Command's yield is empty e.g. unexpected/unparseable data # received from PandA if len(exchanges) == 0: return for ex in exchanges: context = _ExchangeContext(ex, command) self._contexts.append(context) text = "\n".join(ex.to_send) + "\n" yield text.encode() # The last exchange gets the generator so it triggers the next thing to send context.generator = generator
[docs] def receive_bytes(self, received: bytes) -> bytes: """Tell the connection that you have received some bytes off the network. Parse these into high level responses which are yielded back by `responses`. Return any bytes to send back""" self._buf += received is_multiline = bool(self._lines) to_send = b"" for line_b in self._buf: line = line_b.decode() if not is_multiline: # Check if we need to switch to multiline mode is_multiline = line.startswith("!") or line == "." if is_multiline: # Add a new line to the buffer self._lines.append(line) if line == ".": # End of multiline mode, return what we've got to_send += self._update_contexts(self._lines, is_multiline) self._lines = [] is_multiline = False else: # Check a correctly formatted response assert line.startswith("!"), ( "Multiline response %r doesn't start with !" % line ) else: # Single line mode assert not self._lines, ( "Multiline response %s not terminated" % self._lines ) to_send += self._update_contexts([line]) return to_send
[docs] def responses(self) -> Iterator[Tuple[Command, Any]]: """Get the (command, response) tuples generated as part of the last receive_bytes""" while self._responses: yield self._responses.popleft()
[docs] def send(self, command: Command) -> bytes: """Tell the connection you want to send an event, and it will return some bytes to send down the network """ # If not given a partially run generator, start one here generator = command.execute() exchanges = next(generator) to_send = b"".join(self._bytes_from_exchanges(exchanges, command, generator)) return to_send
[docs] class DataConnection: """Sans-IO connection to data port of PandA TCP server, supporting an flushable iterator interface. For example:: dc = DataConnection() # Single connection string to send to_send = dc.connect() socket.sendall(to_send) while True: # Repeatedly process bytes from the PandA looking for data received = socket.recv() for data in dc.receive_bytes(received): do_something_with(data) """ def __init__(self) -> None: # TODO: could support big endian, but are there any systems out there? assert sys.byteorder == "little", "PandA sends data little endian" # Store of bytes received so far to parse in the handlers self._buf = Buffer() # Header text from PandA with field info self._header = "" # The next parsing handler that should be called if there is data in buffer self._next_handler: Optional[Callable[[], Optional[Iterator[Data]]]] = None # numpy dtype of produced FrameData self._frame_dtype = None # frame data that has been received but not flushed yet self._partial_data = bytearray() # whether to flush after every frame self._flush_every_frame = False def _handle_connected(self): # Get the response from connect() line = self._buf.read_line() assert line == b"OK", f"Expected OK, got {line!r}" yield ReadyData() self._next_handler = self._handle_header_start def _handle_header_start(self): # Discard lines until we see header start tag line = self._buf.read_line() if line == b"<header>": self._header = line self._next_handler = self._handle_header_body def _handle_header_body(self): # Accumumlate header until the end tag, then parese and return line = self._buf.read_line() self._header += line if line == b"</header>": fields = [] root = ET.fromstring(self._header) for field in root.find("fields"): fields.append( FieldCapture( name=str(field.get("name")), type=np.dtype(field.get("type")), capture=str(field.get("capture")), scale=float(field.get("scale", 1)), offset=float(field.get("offset", 0)), units=str(field.get("units", "")), ) ) data = root.find("data") sample_bytes = int(data.get("sample_bytes")) if sample_bytes - sum(f.type.itemsize for f in fields) == 4: # In raw mode with panda-server < 2.1 samples wasn't # put in if not specifically requested, but was still # sent name, capture = SAMPLES_FIELD.rsplit(".", maxsplit=1) fields.insert( 0, FieldCapture(name, np.dtype("uint32"), capture), ) self._frame_dtype = np.dtype( [(f"{f.name}.{f.capture}", f.type) for f in fields] ) yield StartData( fields=fields, missed=int(data.get("missed")), process=str(data.get("process")), format=str(data.get("format")), sample_bytes=sample_bytes, ) self._next_handler = self._handle_header_end def _handle_header_end(self): # Discard the newline at the end of the header assert self._buf.read_bytes(1) == b"\n", "Expected newline at end of header" self._next_handler = self._handle_data_start def _handle_data_start(self): # Handle "BIN " or "END " bytes = self._buf.read_bytes(4) if bytes == b"BIN ": self._next_handler = self._handle_data_frame elif bytes == b"END ": self._next_handler = self._handle_data_end else: raise ValueError(f"Bad data '{bytes}'") def _handle_data_frame(self): # Handle a whole data frame # Peek message length as uint32 LE # length = len("BIN " + 4_bytes_encoded_length + data) length = struct.unpack("<I", self._buf.peek_bytes(4))[0] # we already read "BIN ", so read the rest data = self._buf.read_bytes(length - 4)[4:] self._partial_data += data # if told to flush now, then yield what we have if self._flush_every_frame: yield from self.flush() self._next_handler = self._handle_data_start def _handle_data_end(self): # Handle the end reason samples, reason = self._buf.read_line().split(maxsplit=1) reason_enum = EndReason(reason.decode()) # Flush whatever is not already flushed yield from self.flush() yield EndData(samples=int(samples), reason=reason_enum) self._next_handler = self._handle_header_start
[docs] def receive_bytes(self, received: bytes, flush_every_frame=True) -> Iterator[Data]: """Tell the connection that you have received some bytes off the network. Parse these into Data structures and yield them back. Args: received: the bytes you received from the socket flush_every_frame: Whether to flush `FrameData` as soon as received. If False then they will only be sent if `flush` is called or end of acquisition reached """ assert self._next_handler, "Connect not called" self._flush_every_frame = flush_every_frame self._buf += received while True: # Each of these handlers should call read at most once, so that # if we don't have enough data we don't lose partial data try: ret = self._next_handler() if ret: # This is an iterator of Data objects yield from ret except NeedMoreData: break
[docs] def flush(self) -> Iterator[FrameData]: """If there is a partial data frame, pop and yield it""" if self._partial_data: # Make a numpy array wrapper to the bytearray, no copying here data = np.frombuffer(self._partial_data, self._frame_dtype) # Make a new bytearray, numpy view will keep the reference # to the old one so can't clear it in place self._partial_data = bytearray() yield FrameData(data)
[docs] def connect(self, scaled: bool) -> bytes: """Return the bytes that need to be sent on connection""" assert not self._next_handler, "Can't connect while midway through collection" self._next_handler = self._handle_connected if scaled: return b"XML FRAMED SCALED\n" else: return b"XML FRAMED RAW\n"