Source code for pandablocks.blocking
import socket
from typing import Iterable, Iterator, List, Optional, Union, overload
from .commands import Command, T
from .connections import ControlConnection, DataConnection
from .responses import Data
# Define the public API of this module
__all__ = ["BlockingClient"]
class _SocketHelper:
_socket: Optional[socket.socket] = None
def connect(self, host: str, port: int):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self._socket = s
@property
def socket(self) -> socket.socket:
assert self._socket, "connect() not called yet"
return self._socket
def close(self):
s = self.socket
self._socket = None
s.shutdown(socket.SHUT_WR)
s.close()
[docs]
class BlockingClient:
"""Blocking implementation of a PandABlocks client.
For example::
with BlockingClient("hostname-or-ip") as client:
# Control port is now connected
resp1, resp2 = client.send([cmd1, cmd2])
resp3 = client.send(cmd3)
for data in client.data():
handle(data)
# Control and data ports are now disconnected
"""
def __init__(self, host: str):
self._host = host
self._ctrl_connection = ControlConnection()
self._ctrl_socket = _SocketHelper()
[docs]
def connect(self):
"""Connect to the control port, and be ready to handle commands"""
self._ctrl_socket.connect(self._host, 8888)
[docs]
def close(self):
"""Close the control connection, and wait for completion"""
self._ctrl_socket.close()
def __enter__(self) -> "BlockingClient":
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@overload
def send(self, commands: Command[T], timeout: Optional[int] = None) -> T: ...
@overload
def send(
self, commands: Iterable[Command], timeout: Optional[int] = None
) -> List: ...
[docs]
def send(
self,
commands: Union[Command[T], Iterable[Command]],
timeout: Optional[int] = None,
):
"""Send a command to control port of the PandA, returning its response.
Args:
commands: If single `Command`, return its response. If a list of commands
return a list of reponses
timeout: If no reponse in this time, raise `socket.timeout`
"""
s = self._ctrl_socket.socket
s.settimeout(timeout)
if isinstance(commands, Command):
commands = [commands]
else:
commands = list(commands)
for command in commands:
to_send = self._ctrl_connection.send(command)
s.sendall(to_send)
# Rely on dicts being ordered, Ellipsis is shorthand for "no response yet"
cr = {id(command): ... for command in commands}
while ... in cr.values():
received = s.recv(4096)
to_send = self._ctrl_connection.receive_bytes(received)
s.sendall(to_send)
for command, response in self._ctrl_connection.responses():
assert cr[id(command)] is ..., "Already got response for {command}"
cr[id(command)] = response
responses = list(cr.values())
for response in responses:
if isinstance(response, Exception):
raise response
if len(responses) == 1:
return responses[0]
else:
return responses
[docs]
def data(
self, scaled: bool = True, frame_timeout: Optional[int] = None
) -> Iterator[Data]:
"""Connect to data port and yield data frames
Args:
scaled: Whether to scale and average data frames, reduces throughput
frame_timeout: If no data is received for this amount of time, raise
`socket.timeout`
"""
data_socket = _SocketHelper()
data_socket.connect(self._host, 8889)
connection = DataConnection()
s = data_socket.socket
s.settimeout(frame_timeout) # close enough
s.sendall(connection.connect(scaled))
try:
while True:
received = s.recv(4096)
yield from connection.receive_bytes(received)
finally:
data_socket.close()