import asyncio
import logging
from asyncio.streams import StreamReader, StreamWriter
from contextlib import suppress
from typing import AsyncGenerator, Dict, Iterable, Optional
from .commands import Command, T
from .connections import ControlConnection, DataConnection
from .responses import Data
# Define the public API of this module
__all__ = ["AsyncioClient"]
class _StreamHelper:
_reader: Optional[StreamReader] = None
_writer: Optional[StreamWriter] = None
def reader(self) -> StreamReader:
assert self._reader, "connect() not called yet"
return self._reader
def writer(self) -> StreamWriter:
assert self._writer, "connect() not called yet"
return self._writer
async def write_and_drain(self, data: bytes, timeout: Optional[float] = None):
writer = self.writer
# Cannot simply await the drain, as if the remote end has disconnected
# then the drain will never complete as the OS cannot clear its send buffer.
write_task = asyncio.create_task(writer.drain())
_, pending = await asyncio.wait([write_task], timeout=timeout)
if len(pending):
for task in pending:
raise asyncio.TimeoutError("Timeout writing data")
async def connect(self, host: str, port: int):
self._reader, self._writer = await asyncio.open_connection(host, port)
async def close(self):
writer = self.writer
self._reader = None
self._writer = None
await writer.wait_closed()
class AsyncioClient:
"""Asyncio implementation of a PandABlocks client.
For example::
async with AsyncioClient("hostname-or-ip") as client:
# Control port is now connected
resp1, resp2 = await asyncio.gather(client.send(cmd1), client.send(cmd2))
resp3 = await client.send(cmd3)
async for data in
# Control and data ports are now disconnected
def __init__(self, host: str):
self._host = host
self._ctrl_connection = ControlConnection()
self._ctrl_task: Optional[asyncio.Task] = None
self._ctrl_queues: Dict[int, asyncio.Queue] = {}
self._ctrl_stream = _StreamHelper()
async def connect(self):
"""Connect to the control port, and be ready to handle commands"""
await self._ctrl_stream.connect(self._host, 8888)
self._ctrl_task = asyncio.create_task(
def is_connected(self):
"""True if there is a currently active connection.
NOTE: This does not indicate if the remote end is still connected."""
if self._ctrl_task and not self._ctrl_task.done():
return True
return False
async def close(self):
"""Close the control connection, and wait for completion"""
assert self._ctrl_task, "connect() not called yet"
await self._ctrl_stream.close()
async def __aenter__(self) -> "AsyncioClient":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def _ctrl_read_forever(self, reader: asyncio.StreamReader):
while True:
received = await
to_send = self._ctrl_connection.receive_bytes(received)
await self._ctrl_stream.write_and_drain(to_send)
for command, response in self._ctrl_connection.responses():
queue = self._ctrl_queues.pop(id(command))
except Exception:
logging.exception(f"Error handling '{received.decode()}'")
async def send(self, command: Command[T], timeout: Optional[float] = None) -> T:
"""Send a command to control port of the PandA, returning its response.
command: The `Command` to send
queue: asyncio.Queue[T] = asyncio.Queue()
# Need to use the id as non-frozen dataclasses don't hash
self._ctrl_queues[id(command)] = queue
to_send = self._ctrl_connection.send(command)
await self._ctrl_stream.write_and_drain(to_send, timeout)
response = await asyncio.wait_for(queue.get(), timeout)
if isinstance(response, Exception):
raise response
return response
async def data(
scaled: bool = True,
flush_period: Optional[float] = None,
frame_timeout: Optional[float] = None,
) -> AsyncGenerator[Data, None]:
"""Connect to data port and yield data frames
scaled: Whether to scale and average data frames, reduces throughput
flush_period: How often to flush partial data frames, None is on every
chunk of data from the server
frame_timeout: If no data is received for this amount of time, raise
stream = _StreamHelper()
connection = DataConnection()
queue: asyncio.Queue[Iterable[Data]] = asyncio.Queue()
def raise_timeouterror():
raise asyncio.TimeoutError(f"No data received for {frame_timeout}s")
async def periodic_flush():
if flush_period is not None:
while True:
# Every flush_period seconds flush and queue data
await asyncio.sleep(flush_period)
async def read_from_stream():
reader = stream.reader
# Should we flush every FrameData?
flush_every_frame = flush_period is None
while True:
recv = await asyncio.wait_for(, frame_timeout)
except asyncio.TimeoutError:
queue.put_nowait(connection.receive_bytes(recv, flush_every_frame))
await stream.connect(self._host, 8889)
await stream.write_and_drain(connection.connect(scaled))
fut = asyncio.gather(periodic_flush(), read_from_stream())
while True:
for data in await queue.get():
yield data
await stream.close()
with suppress(asyncio.CancelledError):
await fut