Source code for pandablocks.blocking

import socket
from typing import Iterable, Iterator, List, Optional, Union, overload

from .commands import Command, T
from .connections import ControlConnection, DataConnection
from .responses import Data

# Define the public API of this module
__all__ = ["BlockingClient"]


class _SocketHelper:
    _socket: Optional[socket.socket] = None

    def connect(self, host: str, port: int):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((host, port))
        s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        self._socket = s

    @property
    def socket(self) -> socket.socket:
        assert self._socket, "connect() not called yet"
        return self._socket

    def close(self):
        s = self.socket
        self._socket = None
        s.shutdown(socket.SHUT_WR)
        s.close()


[docs] class BlockingClient: """Blocking implementation of a PandABlocks client. For example:: with BlockingClient("hostname-or-ip") as client: # Control port is now connected resp1, resp2 = client.send([cmd1, cmd2]) resp3 = client.send(cmd3) for data in client.data(): handle(data) # Control and data ports are now disconnected """ def __init__(self, host: str): self._host = host self._ctrl_connection = ControlConnection() self._ctrl_socket = _SocketHelper()
[docs] def connect(self): """Connect to the control port, and be ready to handle commands""" self._ctrl_socket.connect(self._host, 8888)
[docs] def close(self): """Close the control connection, and wait for completion""" self._ctrl_socket.close()
def __enter__(self) -> "BlockingClient": self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() @overload def send(self, commands: Command[T], timeout: Optional[int] = None) -> T: ... @overload def send( self, commands: Iterable[Command], timeout: Optional[int] = None ) -> List: ...
[docs] def send( self, commands: Union[Command[T], Iterable[Command]], timeout: Optional[int] = None, ): """Send a command to control port of the PandA, returning its response. Args: commands: If single `Command`, return its response. If a list of commands return a list of reponses timeout: If no reponse in this time, raise `socket.timeout` """ s = self._ctrl_socket.socket s.settimeout(timeout) if isinstance(commands, Command): commands = [commands] else: commands = list(commands) for command in commands: to_send = self._ctrl_connection.send(command) s.sendall(to_send) # Rely on dicts being ordered, Ellipsis is shorthand for "no response yet" cr = {id(command): ... for command in commands} while ... in cr.values(): received = s.recv(4096) to_send = self._ctrl_connection.receive_bytes(received) s.sendall(to_send) for command, response in self._ctrl_connection.responses(): assert cr[id(command)] is ..., "Already got response for {command}" cr[id(command)] = response responses = list(cr.values()) for response in responses: if isinstance(response, Exception): raise response if len(responses) == 1: return responses[0] else: return responses
[docs] def data( self, scaled: bool = True, frame_timeout: Optional[int] = None ) -> Iterator[Data]: """Connect to data port and yield data frames Args: scaled: Whether to scale and average data frames, reduces throughput frame_timeout: If no data is received for this amount of time, raise `socket.timeout` """ data_socket = _SocketHelper() data_socket.connect(self._host, 8889) connection = DataConnection() s = data_socket.socket s.settimeout(frame_timeout) # close enough s.sendall(connection.connect(scaled)) try: while True: received = s.recv(4096) yield from connection.receive_bytes(received) finally: data_socket.close()