Source code for fastcs_pandablocks.panda.panda_controller
import asyncio
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.controller import Controller
from fastcs.cs_methods import Scan
from fastcs_pandablocks.panda.blocks import Blocks
from fastcs_pandablocks.panda.client_wrapper import RawPanda
from fastcs_pandablocks.types import PandaName
from .handlers import panda_value_to_attribute_value
[docs]
class PandaController(Controller):
"""Controller for polling data from the panda through pandablocks-client.
Changes are received at a given poll period and passed to sub-controllers.
"""
def __init__(self, hostname: str, poll_period: float) -> None:
# TODO https://github.com/DiamondLightSource/FastCS/issues/62
super().__init__()
self.attributes: dict[str, Attribute] = {}
self._raw_panda = RawPanda(hostname)
self._blocks: Blocks = Blocks(self._raw_panda)
self.update = Scan(self._update, poll_period)
self.connected = False
async def connect(self) -> None:
if self.connected:
# `connect` needs to be called in `initialise`,
# then FastCS will attempt to call it again.
return
await self._raw_panda.connect()
await self._blocks.parse_introspected_data()
await self._blocks.setup_post_introspection()
self.connected = True
async def initialise(self) -> None:
await self.connect()
for block_name, block in self._blocks.controllers():
self.register_sub_controller(block_name, block)
async def update_field_value(self, panda_name: PandaName, value: str):
attribute = self._blocks.get_attribute(panda_name)
attribute_value = panda_value_to_attribute_value(attribute.datatype, value)
if isinstance(attribute, AttrW) and not isinstance(attribute, AttrRW):
await attribute.process(attribute_value)
elif isinstance(attribute, AttrR):
await attribute.set(attribute_value)
else:
raise RuntimeError(f"Couldn't find panda field for {panda_name}.")
async def _update(self):
changes = await self._raw_panda.get_changes()
await asyncio.gather(
*[
self.update_field_value(PandaName.from_string(raw_panda_name), value)
for raw_panda_name, value in changes.items()
]
)