Source code for pandablocks_ioc.ioc

# Creating EPICS records directly from PandA Blocks and Fields
import asyncio
import inspect
import logging
import re
from collections.abc import Callable
from dataclasses import dataclass
from string import digits
from typing import Any, Optional

import numpy as np
from pandablocks.asyncio import AsyncioClient
from pandablocks.commands import (
    Arm,
    ChangeGroup,
    Disarm,
    Get,
    GetBlockInfo,
    GetChanges,
    GetFieldInfo,
    Put,
)
from pandablocks.responses import (
    BitMuxFieldInfo,
    BitOutFieldInfo,
    BlockInfo,
    Changes,
    EnumFieldInfo,
    ExtOutBitsFieldInfo,
    ExtOutFieldInfo,
    FieldInfo,
    PosMuxFieldInfo,
    PosOutFieldInfo,
    ScalarFieldInfo,
    SubtypeTimeFieldInfo,
    TableFieldInfo,
    TimeFieldInfo,
    UintFieldInfo,
)
from softioc import alarm, asyncio_dispatcher, builder, fields, softioc
from softioc.imports import db_put_field
from softioc.pythonSoftIoc import RecordWrapper

from ._connection_status import ConnectionStatus, Statuses
from ._hdf_ioc import Dataset, HDF5RecordController
from ._pvi import (
    Pvi,
    PviGroup,
    add_automatic_pvi_info,
    add_pcap_arm_pvi_info,
    add_positions_table_row,
)
from ._tables import TableRecordWrapper, TableUpdater
from ._types import (
    ONAM_STR,
    OUT_RECORD_FUNCTIONS,
    ZNAM_STR,
    EpicsName,
    InErrorException,
    PandAName,
    RecordInfo,
    RecordValue,
    ScalarRecordValue,
    check_num_labels,
    device_and_record_to_panda_name,
    panda_to_epics_name,
    trim_description,
    trim_string_value,
)
from ._version import __version__

# TODO: Try turning python.analysis.typeCheckingMode on, as it does highlight a couple
# of possible errors


@dataclass
class _BlockAndFieldInfo:
    """Contains all available information for a Block, including Fields and all the
    Values for `block_info.number` instances of the Fields."""

    block_info: BlockInfo
    fields: dict[str, FieldInfo]
    values: dict[EpicsName, RecordValue]


# Keep a reference to the task, as specified in documentation:
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
create_softioc_task: Optional[asyncio.Task] = None


def _when_finished(task):
    global create_softioc_task
    create_softioc_task = None


async def _create_softioc(
    client: AsyncioClient,
    record_prefix: str,
    connection_status: ConnectionStatus,
    dispatcher: asyncio_dispatcher.AsyncioDispatcher,
):
    """Asynchronous wrapper for IOC creation"""
    try:
        await client.connect()
    except OSError:
        logging.exception("Unable to connect to PandA")
        raise
    (all_records, all_values_dict, block_info_dict) = await create_records(
        client, dispatcher, record_prefix
    )

    global create_softioc_task
    if create_softioc_task:
        raise RuntimeError("Unexpected state - softioc task already exists")

    create_softioc_task = asyncio.create_task(
        update(
            client,
            connection_status,
            all_records,
            0.1,
            all_values_dict,
            block_info_dict,
        )
    )

    create_softioc_task.add_done_callback(_when_finished)


[docs] def create_softioc( client: AsyncioClient, record_prefix: str, screens_dir: Optional[str] = None, clear_bobfiles: bool = False, ) -> None: """Create a PythonSoftIOC from fields and attributes of a PandA. This function will introspect a PandA for all defined Blocks, Fields of each Block, and Attributes of each Field, and create appropriate EPICS records for each. Args: client: The asyncio client to be used to read/write to of the PandA record_prefix: The string prefix used for creation of all records. screens_dir: The directory to export bobfiles to. clear_bobfiles: Can only be true if screens_dir is provided. Clears the screens_dir of bobfiles before creating new ones. """ # TODO: This needs to read/take in a YAML configuration file, for various aspects # e.g. the update() wait time between calling GetChanges if clear_bobfiles and not screens_dir: raise ValueError("recieved clear_bobfiles=True with no screens_dir") if screens_dir: Pvi.configure_pvi(screens_dir, clear_bobfiles) try: dispatcher = asyncio_dispatcher.AsyncioDispatcher() connection_status = ConnectionStatus(record_prefix) connection_status.set_status(Statuses.CONNECTING) asyncio.run_coroutine_threadsafe( _create_softioc(client, record_prefix, connection_status, dispatcher), dispatcher.loop, ).result() connection_status.set_status(Statuses.CONNECTED) # Must leave this blocking line here, in the main thread, not in the # dispatcher's loop or it'll block every async process in this module softioc.interactive_ioc(globals()) except OSError as exc: logging.exception("Exception while initializing softioc") connection_status.set_status(Statuses.DISCONNECTED) raise exc finally: # Client was connected in the _create_softioc method if client.is_connected(): asyncio.run_coroutine_threadsafe(client.close(), dispatcher.loop).result()
def get_panda_versions(idn_repsonse: str) -> dict[EpicsName, str]: """Function that parses version info from the PandA's response to the IDN command See: https://pandablocks-server.readthedocs.io/en/latest/commands.html#system-commands Args: idn_response (str): Response from PandA to Get(*IDN) command Returns: dict[EpicsName, str]: Dictionary mapping firmware record name to version """ # Currently, IDN reports sw, fpga, and rootfs versions firmware_versions = {"PandA SW": "Unknown", "FPGA": "Unknown", "rootfs": "Unknown"} # If the *IDN response contains too many keys, break and leave versions as "Unknown" # Since spaces are used to deliminate versions and can also be in the keys and # values, if an additional key is present that we don't explicitly handle, # our approach of using regex matching will not work. if sum(name in idn_repsonse for name in firmware_versions) < idn_repsonse.count( ":" ): logging.error( f"Recieved unexpected version numbers in version string {idn_repsonse}!" ) else: for firmware_name in firmware_versions: pattern = re.compile( rf'{re.escape(firmware_name)}:\s*([^:]+?)(?=\s*\b(?: \ {"|".join(map(re.escape, firmware_versions))}):|$)' ) if match := pattern.search(idn_repsonse): firmware_versions[firmware_name] = match.group(1).strip() logging.info( f"{firmware_name} Version: {firmware_versions[firmware_name]}" ) else: logging.warning(f"Failed to get {firmware_name} version information!") return { EpicsName(firmware_name.upper().replace(" ", "_")): version for firmware_name, version in firmware_versions.items() } async def introspect_panda( client: AsyncioClient, ) -> tuple[dict[str, _BlockAndFieldInfo], dict[EpicsName, RecordValue]]: """Query the PandA for all its Blocks, Fields of each Block, and Values of each Field Args: client (AsyncioClient): Client used for commuication with the PandA Returns: Tuple of: Dict[str, BlockAndFieldInfo]: Dictionary containing all information on the block Dict[EpicsName, RecordValue]]: Dictionary containing all values from GetChanges for both scalar and multivalue fields """ block_dict = await client.send(GetBlockInfo()) for block in block_dict.keys(): if block[-1].isdigit(): raise ValueError(f"Block name '{block}' contains a trailing number") # Concurrently request info for all fields of all blocks # Note order of requests is important as it is unpacked by index below returned_infos = await asyncio.gather( *[client.send(GetFieldInfo(block)) for block in block_dict], client.send(GetChanges(ChangeGroup.ALL, True)), ) field_infos: list[dict[str, FieldInfo]] = returned_infos[0:-1] changes: Changes = returned_infos[-1] values, all_values_dict = _create_dicts_from_changes(changes, block_dict) panda_dict = {} for (block_name, block_info), field_info in zip( block_dict.items(), field_infos, strict=False ): panda_dict[block_name] = _BlockAndFieldInfo( block_info=block_info, fields=field_info, values=values[block_name] ) return (panda_dict, all_values_dict) def _create_dicts_from_changes( changes: Changes, block_info_dict: dict[str, BlockInfo] ) -> tuple[dict[str, dict[EpicsName, RecordValue]], dict[EpicsName, RecordValue]]: """Take the `Changes` object and convert it into two dictionaries. Args: changes: The `Changes` object as returned by `GetChanges` block_info_dict: Information from the initial `GetBlockInfo` request, used to check the `number` of blocks for parsing metadata Returns: Tuple of: Dict[str, Dict[EpicsName, RecordValue]]: Block-level dictionary, where each top-level key is a PandA Block, and the inner dictionary is all the fields and values associated with that Block. Dict[EpicsName, RecordValue]]: A flattened version of the above dictionary - a list of all Fields (across all Blocks) and their associated value. """ def _store_values( block_and_field_name: str, value: RecordValue, values: dict[str, dict[EpicsName, RecordValue]], ) -> None: """Parse the data given in `block_and_field_name` and `value` into a new entry in the `values` dictionary""" block_name_number, field_name = block_and_field_name.split(".", maxsplit=1) # Parse *METADATA.LABEL_<block><num> into "<block>" key and # "<block><num>:LABEL" value if block_name_number.startswith("*METADATA") and field_name.startswith( "LABEL_" ): _, block_name_number = field_name.split("_", maxsplit=1) # The block is fixed with metadata, it should end with a number # "*METADATA.LABEL_SEQ2": "NewSeqMetadataLabel", if not block_name_number[-1].isdigit(): raise ValueError( f"Recieved metadata for a block name {block_name_number} that " "didn't contain a number" ) block_name_no_number = re.sub(r"\d*$", "", block_name_number) number_of_blocks = block_info_dict[block_name_no_number].number if number_of_blocks == 1: if block_name_number[-1] != "1" or block_name_number[-2].isdigit(): raise ValueError( f"Recieved metadata '*METADATA.LABEL_{block_name_number}', " "this should have a single '1' on the end" ) block_and_field_name = EpicsName(block_name_number[:-1] + ":LABEL") else: block_and_field_name = EpicsName(block_name_number + ":LABEL") else: block_and_field_name = panda_to_epics_name(PandAName(block_and_field_name)) block_name = block_name_number.rstrip(digits) if block_name not in values: values[block_name] = {} if block_and_field_name in values[block_name]: logging.error( f"Duplicate values for {block_and_field_name} detected." " Overriding existing value." ) block_and_field_name = EpicsName(block_and_field_name) values[block_name][block_and_field_name] = value # Create a dict which maps block name to all values for all instances # of that block (e.g. {"TTLIN" : {"TTLIN1:VAL": "1", "TTLIN2:VAL" : "5", ...} }) values: dict[str, dict[EpicsName, RecordValue]] = {} for block_and_field_name, value in changes.values.items(): _store_values(block_and_field_name, value, values) # Parse the multiline data into the same values structure for block_and_field_name, multiline_value in changes.multiline_values.items(): _store_values(block_and_field_name, multiline_value, values) # Note any in_error fields so we can later set their records to a non-zero severity for block_and_field_name in changes.in_error: logging.error(f"PandA reports field in error: {block_and_field_name}") _store_values( block_and_field_name, InErrorException(block_and_field_name), values ) # Single dictionary that has all values for all types of field, as reported # from GetChanges all_values_dict = {k: v for item in values.values() for k, v in item.items()} return values, all_values_dict @dataclass class _RecordUpdater: """Handles Put'ing data back to the PandA when an EPICS record is updated. This should only be used to handle Out record types. Args: record_info: The RecordInfo structure for the record record_prefix: The prefix of the record name client: The client used to send data to PandA all_values_dict: The dictionary containing the most recent value of all records as returned from GetChanges. This dict will be dynamically updated by other methods. labels: If the record is an enum type, provide the list of labels """ record_info: RecordInfo record_prefix: str client: AsyncioClient all_values_dict: dict[EpicsName, RecordValue] labels: Optional[list[str]] # The incoming value's type depends on the record. Ensure you always cast it. async def update(self, new_val: Any): logging.debug( f"Updating record {self.record_info.record.name} with value {new_val}" ) try: # If this is an enum record, retrieve the string value val: Optional[str] if self.labels: assert int(new_val) < len( self.labels ), f"Invalid label index {new_val}, only {len(self.labels)} labels" val = self.labels[int(new_val)] elif new_val is not None: # Necessary to wrap the data_type_func call in str() as we must # differentiate between ints and floats - some PandA fields will not # accept the wrong number format. val = str(self.record_info.data_type_func(new_val)) else: # value is None - expected for action-write fields val = new_val record_name = self.record_info.record.name.removeprefix(self.record_prefix) panda_field = device_and_record_to_panda_name(record_name) await self.client.send(Put(panda_field, val)) self.record_info._pending_change = True # On success the new value will be polled by GetChanges and stored into # the all_values_dict except Exception: logging.exception( f"Unable to Put record {self.record_info.record.name}, " f"value {new_val}, to PandA", ) try: if self.record_info.record: record_name = self.record_info.record.name.removeprefix( self.record_prefix + ":" ) assert record_name in self.all_values_dict old_val = self.all_values_dict[record_name] if isinstance(old_val, InErrorException): # If PythonSoftIOC issue #53 is fixed we could put error state. logging.error( "Cannot restore previous value to record " f"{record_name}, PandA field is in error." ) return logging.warning( f"Restoring previous value {old_val} to record {record_name}" ) # Note that only Out records will be present here, due to how # _RecordUpdater instances are created. self.record_info.record.set(old_val, process=False) else: logging.error( f"No record found when updating {record_name}," " unable to roll back value" ) except Exception: logging.exception( f"Unable to roll back record {record_name} to previous value.", ) @dataclass class _WriteRecordUpdater(_RecordUpdater): """Special case record updater to send an empty value. This is necessary as some PandA fields are written using e.g. \"FOO1.BAR=\" with no explicit value at all.""" async def update(self, new_val): await super().update(None) @dataclass class _TimeRecordUpdater(_RecordUpdater): """Set the EGU values on a record when the UNITS sub-record is updated.""" base_record: RecordWrapper is_type_time: bool async def update(self, new_val: Any): # Must allow UNITS value change to be pushed to the PandA # as this triggers the MIN field to be recalculated await super().update(new_val) await self.update_parent_record(new_val) async def update_parent_record(self, new_val): self.update_egu(new_val) def update_egu(self, new_val) -> None: assert self.labels # This method gets called in two contexts: One is directly from an EPICS # on_update and the other is from *CHANGES?. # In the EPICS context, the value is an integer (from an mbbi/o record) # In the *CHANGES? context the value is a string if isinstance(new_val, str): assert new_val in self.labels new_egu = new_val else: new_egu = self.labels[new_val] array = np.require(new_egu, dtype=np.dtype("S40")) db_put_field( f"{self.base_record.name}.EGU", fields.DBF_STRING, array.ctypes.data, 1, ) @dataclass class StringRecordLabelValidator: """Validate that a given string is a valid label for a PandA enum field. This is necessary for several fields which have too many labels to fit in an EPICS mbbi/mbbo record, and so use string records instead.""" labels: list[str] def validate(self, record: RecordWrapper, new_val: str): if new_val in self.labels: return True logging.error(f"Value {new_val} not valid for record {record.name}") return False class IocRecordFactory: """Class to handle creating PythonSoftIOC records for a given field defined in a PandA""" _record_prefix: str _client: AsyncioClient _all_values_dict: dict[EpicsName, RecordValue] _pos_out_row_counter: int = 0 # List of methods in builder, used for parameter validation _builder_methods = [ method for _, method in inspect.getmembers(builder, predicate=inspect.isfunction) ] def __init__( self, client: AsyncioClient, record_prefix: str, all_values_dict: dict[EpicsName, RecordValue], ): """Initialise IocRecordFactory Args: client: AsyncioClient used when records update to Put values back to PandA record_prefix: The record prefix a.k.a. the device name all_values_dict: Dictionary of most recent values from PandA as reported by GetChanges. """ self._record_prefix = record_prefix self._client = client self._all_values_dict = all_values_dict # Set the record prefix builder.SetDeviceName(self._record_prefix) Pvi.record_prefix = self._record_prefix # All records should be blocking builder.SetBlocking(True) # A dataset cache for storing dataset names and capture modes for different # capture records self._dataset_cache: dict[EpicsName, Dataset] = {} def _process_labels( self, labels: list[str], record_value: ScalarRecordValue ) -> tuple[list[str], int]: """Find the index of `record_value` in the `labels` list, suitable for use in an `initial_value=` argument during record creation. Secondly, return a new list from the given labels that are all short enough to fit within EPICS 25 character label limit. Raises ValueError if `record_value` not found in `labels`.""" assert len(labels) > 0 if not all(len(label) < 25 for label in labels): logging.warning( "One or more labels do not fit EPICS maximum length of " f"25 characters. Long labels will be truncated. Labels: {labels}" ) # Most likely time we'll see an error is when PandA hardware has set invalid # enum value. No logging as already logged when seen from GetChanges. if isinstance(record_value, InErrorException): index = 0 else: index = labels.index(record_value) return ([label[:25] for label in labels], index) def _check_num_values( self, values: dict[EpicsName, ScalarRecordValue], num: int ) -> None: """Function to check that the number of values is at least the expected amount. Allow extra values for future-proofing, if PandA has new fields/attributes the client does not know about. Raises AssertionError if too few values.""" assert len(values) >= num, ( f"Incorrect number of values, {len(values)}, expected at least {num}.\n" + f"{values}" ) def _create_record_info( self, record_name: EpicsName, description: Optional[str], record_creation_func: Callable, data_type_func: Callable, group: PviGroup, labels: Optional[list[str]] = None, *args, **kwargs, ) -> RecordInfo: """Create the record, using the given function and passing all optional arguments and keyword arguments, and then set the description field for the record. If the record is an Out type, a default on_update mechanism will be added. This can be overriden by specifying a custom "on_update=..." keyword. Args: record_name: The name this record will be created with description: The description for this field. This will be truncated to 40 characters due to EPICS limitations. record_creation_func: The function that will be used to create this record. Must be one of the builder.* functions. data_type_func: The function to use to convert the value returned from GetChanges, which will always be a string, into a type appropriate for the record e.g. int, float. group: The group that this record will be displayed in for PVI. labels: If the record type being created is a mbbi or mbbo record, provide the list of valid labels here. *args: Additional arguments that will be passed through to the `record_creation_func` **kwargs: Additional keyword arguments that will be examined and possibly modified for various reasons, and then passed to the `record_creation_func` Returns: RecordInfo: Class containing the created record and anything needed for updating the record. """ if labels is None: labels = [] extra_kwargs: dict[str, Any] = {} assert ( record_creation_func in self._builder_methods ), "Unrecognised record creation function passed to _create_record_info" if ( record_creation_func == builder.mbbIn or record_creation_func == builder.mbbOut ): check_num_labels(labels, record_name) # Check the initial value is valid. If it is, apply data type conversion # otherwise mark the record as in error. # Note that many already have correct type, this mostly applies to values # that are being sent as strings to analog or long records as the values are # returned as strings from Changes command. if "initial_value" in kwargs: initial_value = kwargs["initial_value"] if isinstance(initial_value, InErrorException): logging.warning( f"Marking record {record_name} as invalid due to error from PandA" ) # See PythonSoftIOC issue #57 extra_kwargs.update({"STAT": "UDF", "SEVR": "INVALID"}) kwargs.pop("initial_value") elif record_creation_func in [builder.stringIn, builder.stringOut]: kwargs["initial_value"] = trim_string_value(initial_value, record_name) elif isinstance(initial_value, str): kwargs["initial_value"] = data_type_func(initial_value) record_info = RecordInfo( data_type_func=data_type_func, labels=labels if labels else None, is_in_record=record_creation_func not in OUT_RECORD_FUNCTIONS, ) # If there is no on_update, and the record type allows one, create it record_updater = None if ( "on_update" not in kwargs and "on_update_name" not in kwargs and record_creation_func in OUT_RECORD_FUNCTIONS ): record_updater = _RecordUpdater( record_info, self._record_prefix, self._client, self._all_values_dict, labels if labels else None, ) extra_kwargs["on_update"] = record_updater.update extra_kwargs["DESC"] = trim_description(description, record_name) record = record_creation_func( record_name, *labels, *args, **extra_kwargs, **kwargs ) add_automatic_pvi_info( group=group, record=record, record_name=record_name, record_creation_func=record_creation_func, ) # Annoyingly we have a circular dependency: The on_update kwarg must be provided # in order to create the record record_info.add_record(record) return record_info def _make_time( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], record_creation_func: Callable, **kwargs, ) -> dict[EpicsName, RecordInfo]: """Make one record for the timer itself, and a sub-record for its units""" assert isinstance(field_info, (TimeFieldInfo, SubtypeTimeFieldInfo)) record_dict: dict[EpicsName, RecordInfo] = {} time_record_info = self._create_record_info( record_name, field_info.description, record_creation_func, float, PviGroup.PARAMETERS, **kwargs, ) record_dict[record_name] = time_record_info units_record_name = EpicsName(record_name + ":UNITS") labels, initial_index = self._process_labels( field_info.units_labels, values[units_record_name] ) # Ensure initial EGU matches that of the :UNITS record time_record_info.record.EGU = labels[initial_index] updater: _TimeRecordUpdater record_dict[units_record_name] = self._create_record_info( units_record_name, "Units of time setting", builder.mbbOut, type(initial_index), PviGroup.PARAMETERS, labels=labels, initial_value=initial_index, on_update=lambda v: updater.update(v), ) updater = _TimeRecordUpdater( record_dict[units_record_name], self._record_prefix, self._client, self._all_values_dict, labels, time_record_info.record, isinstance(field_info, TimeFieldInfo), ) record_dict[units_record_name].on_changes_func = updater.update_parent_record return record_dict def _make_type_time( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: """Make the records for a field of type "time" - one for the time itself, one for units, and one for the MIN value. """ # RAW attribute ignored - EPICS should never care about it self._check_num_values(values, 2) assert isinstance(field_info, TimeFieldInfo) record_dict = self._make_time( record_name, field_info, values, builder.aOut, initial_value=values[record_name], ) return record_dict def _make_subtype_time_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 2) return self._make_time( record_name, field_info, values, builder.aOut, initial_value=values[record_name], ) def _make_subtype_time_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 2) return self._make_time( record_name, field_info, values, builder.aIn, initial_value=values[record_name], ) def _make_subtype_time_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_time(record_name, field_info, values, builder.aOut) def _make_bit_out( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) assert isinstance(field_info, BitOutFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} record_dict[record_name] = self._create_record_info( record_name, field_info.description, builder.boolIn, int, PviGroup.OUTPUTS, ZNAM=ZNAM_STR, ONAM=ONAM_STR, initial_value=values[record_name], ) # TODO: Add BITS table support here return record_dict def _make_pos_out( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 5) assert isinstance(field_info, PosOutFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} units_record_name = EpicsName(record_name + ":UNITS") record_dict[record_name] = self._create_record_info( record_name, field_info.description, builder.longIn, int, PviGroup.OUTPUTS, initial_value=values[record_name], EGU=values[units_record_name], ) capture_record_name = EpicsName(record_name + ":CAPTURE") dataset_record_name = EpicsName(record_name + ":DATASET") labels, capture_index = self._process_labels( field_info.capture_labels, values[capture_record_name] ) capture_record_updater: _RecordUpdater def capture_record_on_update(new_capture_mode): self._dataset_cache[record_name] = Dataset( record_dict[dataset_record_name].record.get(), labels[new_capture_mode], ) return capture_record_updater.update(new_capture_mode) record_dict[capture_record_name] = self._create_record_info( capture_record_name, "Capture options", builder.mbbOut, int, PviGroup.CAPTURE, labels=labels, initial_value=capture_index, on_update=capture_record_on_update, ) # For now we have to make a `_RecordUpdater`` here and # combine it with `on_update`. # https://github.com/PandABlocks/PandABlocks-ioc/issues/121 capture_record_updater = _RecordUpdater( record_dict[capture_record_name], self._record_prefix, self._client, self._all_values_dict, labels if labels else None, ) def dataset_record_on_update(new_dataset_name): self._dataset_cache[record_name] = Dataset( new_dataset_name, labels[record_dict[capture_record_name].record.get()], ) record_dict[dataset_record_name] = self._create_record_info( dataset_record_name, "Used to adjust the dataset name to one more scientifically relevant", builder.stringOut, str, PviGroup.CAPTURE, initial_value="", on_update=dataset_record_on_update, ) offset_record_name = EpicsName(record_name + ":OFFSET") record_dict[offset_record_name] = self._create_record_info( offset_record_name, "Offset", builder.aOut, float, PviGroup.CAPTURE, initial_value=values[offset_record_name], ) scale_record_name = EpicsName(record_name + ":SCALE") record_dict[scale_record_name] = self._create_record_info( scale_record_name, "Scale factor", builder.aOut, float, PviGroup.CAPTURE, initial_value=values[scale_record_name], ) record_dict[units_record_name] = self._create_record_info( units_record_name, "Units string", builder.stringOut, str, PviGroup.CAPTURE, initial_value=values[units_record_name], ) # SCALED attribute doesn't get returned from GetChanges. Instead # of trying to dynamically query for it we'll just recalculate it scaled_record_name = record_name + ":SCALED" scaled_calc_record = builder.records.calc( scaled_record_name, CALC="A*B + C", INPA=builder.CP(record_dict[record_name].record), INPB=builder.CP(record_dict[scale_record_name].record), INPC=builder.CP(record_dict[offset_record_name].record), DESC="Value with scaling applied", PREC=5, ) # Create the POSITIONS "table" of records. Most are aliases of the records # created above. positions_record_name = f"POSITIONS:{self._pos_out_row_counter}" builder.records.stringin( positions_record_name + ":NAME", VAL=record_name, DESC="Table of configured positional outputs", ) value_record_name = EpicsName(positions_record_name + ":VAL") scaled_calc_record.add_alias(self._record_prefix + ":" + value_record_name) record_dict[capture_record_name].record.add_alias( self._record_prefix + ":" + positions_record_name + ":" + capture_record_name.split(":")[-1] ) record_dict[offset_record_name].record.add_alias( self._record_prefix + ":" + positions_record_name + ":" + offset_record_name.split(":")[-1] ) record_dict[scale_record_name].record.add_alias( self._record_prefix + ":" + positions_record_name + ":" + scale_record_name.split(":")[-1] ) record_dict[units_record_name].record.add_alias( self._record_prefix + ":" + positions_record_name + ":" + units_record_name.split(":")[-1] ) record_dict[dataset_record_name].record.add_alias( self._record_prefix + ":" + positions_record_name + ":" + dataset_record_name.split(":")[-1] ) self._pos_out_row_counter += 1 add_positions_table_row( record_name, value_record_name, units_record_name, scale_record_name, offset_record_name, dataset_record_name, capture_record_name, ) return record_dict def _make_ext_out( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) assert isinstance(field_info, ExtOutFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} # There is no record for the ext_out field itself - the only thing # you do with them is to turn their Capture attribute on/off, and give it # an alternative dataset name capture_record_name = EpicsName(record_name + ":CAPTURE") dataset_record_name = EpicsName(record_name + ":DATASET") labels, capture_index = self._process_labels( field_info.capture_labels, values[capture_record_name] ) def dataset_record_on_update(new_dataset_name): self._dataset_cache[record_name] = Dataset( new_dataset_name, labels[record_dict[capture_record_name].record.get()], ) record_dict[dataset_record_name] = self._create_record_info( dataset_record_name, "Used to adjust the dataset name to one more scientifically relevant", builder.stringOut, str, PviGroup.OUTPUTS, initial_value="", on_update=dataset_record_on_update, ) capture_record_updater: _RecordUpdater def capture_record_on_update(new_capture_mode): self._dataset_cache[record_name] = Dataset( record_dict[dataset_record_name].record.get(), labels[new_capture_mode], ) return capture_record_updater.update(new_capture_mode) record_dict[capture_record_name] = self._create_record_info( capture_record_name, field_info.description, builder.mbbOut, int, PviGroup.OUTPUTS, labels=labels, initial_value=capture_index, on_update=capture_record_on_update, ) # For now we have to make a `_RecordUpdater`` here and # combine it with `on_update`. # https://github.com/PandABlocks/PandABlocks-ioc/issues/121 capture_record_updater = _RecordUpdater( record_dict[capture_record_name], self._record_prefix, self._client, self._all_values_dict, labels if labels else None, ) return record_dict def _make_ext_out_bits( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) assert isinstance(field_info, ExtOutBitsFieldInfo) record_dict = self._make_ext_out(record_name, field_info, values) # Create a "table" out of the items present in the list of bits # Identify which BITS field this is and calculate its offset - we want BITS0 # through BITS3 to look like one continuous table from the outside, indexed # 0 through 127 (each BITS holds 32 values) bits_index_str = record_name[-1] assert bits_index_str.isdigit() bits_index = int(bits_index_str) offset = bits_index * 32 capture_record_name = EpicsName(record_name + ":CAPTURE") capture_record_info = record_dict[capture_record_name] # There is a single CAPTURE record which is alias'd to appear in each row. # This is because you can only capture a whole field's worth of bits at a time, # and not bits individually. When one is captured, they all are. for i in range(offset, offset + 32): capture_record_info.record.add_alias( f"{self._record_prefix}:BITS:{i}:CAPTURE" ) # Each row of the table has a VAL and a NAME. for i, label in enumerate(field_info.bits): if label == "": # Some rows are empty. Do not create records. continue link = self._record_prefix + ":" + label.replace(".", ":") + " CP" enumerated_bits_prefix = f"BITS:{offset + i}" builder.records.bi( f"{enumerated_bits_prefix}:VAL", INP=link, DESC="Value of field connected to this BIT", ZNAM=ZNAM_STR, ONAM=ONAM_STR, ) builder.records.stringin( f"{enumerated_bits_prefix}:NAME", VAL=label, DESC="Name of field connected to this BIT", ) # PVI TODO: Not sure how to represent these ones return record_dict def _make_bit_mux( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 2) assert isinstance(field_info, BitMuxFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} # This should be an mbbOut record, but there are too many posssible labels # TODO: There will need to be some mechanism to retrieve the labels, # but there's a BITS table that can probably be used validator = StringRecordLabelValidator(field_info.labels) # Ensure we're putting a valid value to start with assert values[record_name] in field_info.labels record_dict[record_name] = self._create_record_info( record_name, field_info.description, builder.stringOut, str, PviGroup.INPUTS, initial_value=values[record_name], validate=validator.validate, ) delay_record_name = EpicsName(record_name + ":DELAY") record_dict[delay_record_name] = self._create_record_info( delay_record_name, "Clock delay on input", builder.longOut, int, PviGroup.INPUTS, initial_value=values[delay_record_name], ) record_dict[delay_record_name].record.DRVH = field_info.max_delay record_dict[delay_record_name].record.DRVL = 0 return record_dict def _make_pos_mux( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) assert isinstance(field_info, PosMuxFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} # This should be an mbbOut record, but there are too many posssible labels # TODO: There will need to be some mechanism to retrieve the labels, # but there's a POSITIONS table that can probably be used. # OR PVAccess somehow? validator = StringRecordLabelValidator(field_info.labels) # Ensure we're putting a valid value to start with assert values[record_name] in field_info.labels record_dict[record_name] = self._create_record_info( record_name, field_info.description, builder.stringOut, str, PviGroup.INPUTS, initial_value=values[record_name], validate=validator.validate, ) return record_dict def _make_table( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, list[str]], ) -> dict[EpicsName, RecordInfo]: assert isinstance(field_info, TableFieldInfo) table_updater = TableUpdater( self._client, record_name, field_info, self._all_values_dict, ) # Format the mode record name to remove namespace mode_record_name: str = table_updater.mode_record_info.record.name mode_record_name = EpicsName( mode_record_name.replace(self._record_prefix + ":", "") ) return {mode_record_name: table_updater.mode_record_info} def _make_uint( self, record_name: EpicsName, field_info: FieldInfo, record_creation_func: Callable, group: PviGroup, **kwargs, ) -> dict[EpicsName, RecordInfo]: assert isinstance(field_info, UintFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} record_dict[record_name] = self._create_record_info( record_name, field_info.description, record_creation_func, int, group, **kwargs, ) max_val = field_info.max_val if record_creation_func in OUT_RECORD_FUNCTIONS: record_dict[record_name].record.DRVL = 0 record_dict[record_name].record.DRVH = max_val record_dict[record_name].record.HOPR = max_val return record_dict def _make_uint_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_uint( record_name, field_info, builder.aOut, PviGroup.PARAMETERS, initial_value=values[record_name], PREC=0, ) def _make_uint_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_uint( record_name, field_info, builder.aIn, PviGroup.READBACKS, initial_value=values[record_name], PREC=0, ) def _make_uint_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) return self._make_uint( record_name, field_info, builder.aOut, PviGroup.OUTPUTS, # TODO: Is this right? No examples to follow always_update=True, PREC=0, ) # TODO: Why don't I have a _make_int()? I do for other similar types def _make_int_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return { record_name: self._create_record_info( record_name, field_info.description, builder.longOut, int, PviGroup.PARAMETERS, initial_value=values[record_name], ) } def _make_int_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return { record_name: self._create_record_info( record_name, field_info.description, builder.longIn, int, PviGroup.READBACKS, initial_value=values[record_name], ) } def _make_int_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) return { record_name: self._create_record_info( record_name, field_info.description, builder.longOut, int, PviGroup.PARAMETERS, always_update=True, ) } def _make_scalar( self, record_name: EpicsName, field_info: FieldInfo, record_creation_func: Callable, **kwargs, ) -> dict[EpicsName, RecordInfo]: # RAW attribute ignored - EPICS should never care about it assert isinstance(field_info, ScalarFieldInfo) record_dict: dict[EpicsName, RecordInfo] = {} record_dict[record_name] = self._create_record_info( record_name, field_info.description, record_creation_func, float, PviGroup.READBACKS, EGU=field_info.units, **kwargs, ) return record_dict def _make_scalar_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_scalar( record_name, field_info, builder.aOut, initial_value=values[record_name], ) def _make_scalar_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_scalar( record_name, field_info, builder.aIn, initial_value=values[record_name], ) def _make_scalar_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) return self._make_scalar( record_name, field_info, builder.aOut, always_update=True ) def _make_bit( self, record_name: EpicsName, field_info: FieldInfo, record_creation_func: Callable, group: PviGroup, **kwargs, ) -> dict[EpicsName, RecordInfo]: return { record_name: self._create_record_info( record_name, field_info.description, record_creation_func, int, group, ZNAM=ZNAM_STR, ONAM=ONAM_STR, **kwargs, ) } def _make_bit_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_bit( record_name, field_info, builder.boolOut, PviGroup.PARAMETERS, initial_value=values[record_name], ) def _make_bit_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_bit( record_name, field_info, builder.boolIn, PviGroup.READBACKS, initial_value=values[record_name], ) def _make_bit_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) return self._make_bit( record_name, field_info, builder.boolOut, PviGroup.OUTPUTS, # TODO: Is this right? No examples to follow always_update=True, ) def _make_action_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: logging.warning( f"Field of type {field_info.type} - {field_info.subtype} defined. Ignoring." ) return {} def _make_action_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) updater: _WriteRecordUpdater record_info = self._create_record_info( record_name, field_info.description, builder.Action, int, # not bool, as that'll treat string "0" as true PviGroup.OUTPUTS, # TODO: Not sure what group to use ZNAM="", ONAM="", on_update=lambda v: updater.update(v), ) updater = _WriteRecordUpdater( record_info, self._record_prefix, self._client, self._all_values_dict, None ) record_info.add_record(record_info.record) return {record_name: record_info} def _make_lut( self, record_name: EpicsName, field_info: FieldInfo, record_creation_func: Callable, group: PviGroup, **kwargs, ) -> dict[EpicsName, RecordInfo]: # RAW attribute ignored - EPICS should never care about it return { record_name: self._create_record_info( record_name, field_info.description, record_creation_func, str, group, **kwargs, ), } def _make_lut_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_lut( record_name, field_info, builder.stringOut, PviGroup.PARAMETERS, initial_value=values[record_name], ) def _make_lut_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) return self._make_lut( record_name, field_info, builder.stringIn, PviGroup.READBACKS, # TODO: Is this right? No examples to follow initial_value=values[record_name], ) def _make_lut_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 0) return self._make_lut( record_name, field_info, builder.stringOut, PviGroup.OUTPUTS, # TODO: Is this right? No examples to follow always_update=True, ) def _make_enum( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], record_creation_func: Callable, group: PviGroup, **kwargs, ) -> dict[EpicsName, RecordInfo]: self._check_num_values(values, 1) assert isinstance(field_info, EnumFieldInfo) labels, index_value = self._process_labels( field_info.labels, values[record_name] ) return { record_name: self._create_record_info( record_name, field_info.description, record_creation_func, int, group, labels=labels, initial_value=index_value, **kwargs, ) } def _make_enum_param( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: return self._make_enum( record_name, field_info, values, builder.mbbOut, PviGroup.PARAMETERS ) def _make_enum_read( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: # Don't use an mbbIn record, as many labels are too long to fit into EPICS # restricted-length labels. As you cannot write to this record, it's fine to # just have a string return { # TODO: Determining the PviGroup here is VERY fragile record_name: self._create_record_info( record_name, field_info.description, builder.stringIn, str, PviGroup.NONE if record_name.endswith("HEALTH") else PviGroup.READBACKS, initial_value=values[record_name], ) } def _make_enum_write( self, record_name: EpicsName, field_info: FieldInfo, values: dict[EpicsName, ScalarRecordValue], ) -> dict[EpicsName, RecordInfo]: assert isinstance(field_info, EnumFieldInfo) assert record_name not in values # Values are not returned for write fields. Create data for label parsing. values = {record_name: field_info.labels[0]} return self._make_enum( record_name, field_info, values, builder.mbbOut, PviGroup.OUTPUTS, # TODO: Is this right? No examples to follow always_update=True, ) def create_record( self, record_name: EpicsName, field_info: FieldInfo, field_values: dict[EpicsName, RecordValue], ) -> dict[EpicsName, RecordInfo]: # TODO: Name needs changing, this makes multiple records... """Create the record (and any child records) for the PandA field specified in the parameters. Args: record_name: The name of the record to create, with colons separating words in EPICS style. field_info: The field info for the record being created field_values: The dictionary of values for the record and all child records. The keys are in EPICS style. Returns: Dict[str, RecordInfo]: A dictionary of created RecordInfo objects that will need updating later. Not all created records are returned. """ try: key = (field_info.type, field_info.subtype) if key == ("table", None): # Table expects vals in Dict[str, List[str]] list_vals = { EpicsName(k): v for (k, v) in field_values.items() if isinstance(v, list) } return self._make_table(record_name, field_info, list_vals) # PandA can never report a table field as in error, only scalar fields str_vals = { EpicsName(k): v for (k, v) in field_values.items() if isinstance(v, (str, InErrorException)) } return self._field_record_mapping[key]( self, record_name, field_info, str_vals ) except KeyError: # Unrecognised type-subtype key, ignore this item. This allows the server # to define new types without breaking the client. logging.exception( f"Unrecognised type {key} while processing record {record_name}" ) return {} # Map a field's (type, subtype) to a function that creates and returns record(s) _field_record_mapping: dict[ tuple[str, Optional[str]], Callable[ [ "IocRecordFactory", EpicsName, FieldInfo, dict[EpicsName, ScalarRecordValue], ], dict[EpicsName, RecordInfo], ], ] = { # Order matches that of PandA server's Field Types docs ("time", None): _make_type_time, ("bit_out", None): _make_bit_out, ("pos_out", None): _make_pos_out, ("ext_out", "timestamp"): _make_ext_out, ("ext_out", "samples"): _make_ext_out, ("ext_out", "bits"): _make_ext_out_bits, ("bit_mux", None): _make_bit_mux, ("pos_mux", None): _make_pos_mux, # ("table", None): _make_table, TABLE handled separately ("param", "uint"): _make_uint_param, ("read", "uint"): _make_uint_read, ("write", "uint"): _make_uint_write, ("param", "int"): _make_int_param, ("read", "int"): _make_int_read, ("write", "int"): _make_int_write, ("param", "scalar"): _make_scalar_param, ("read", "scalar"): _make_scalar_read, ("write", "scalar"): _make_scalar_write, ("param", "bit"): _make_bit_param, ("read", "bit"): _make_bit_read, ("write", "bit"): _make_bit_write, ("param", "action"): _make_action_read, ("read", "action"): _make_action_read, ("write", "action"): _make_action_write, ("param", "lut"): _make_lut_param, ("read", "lut"): _make_lut_read, ("write", "lut"): _make_lut_write, ("param", "enum"): _make_enum_param, ("read", "enum"): _make_enum_read, ("write", "enum"): _make_enum_write, ("param", "time"): _make_subtype_time_param, ("read", "time"): _make_subtype_time_read, ("write", "time"): _make_subtype_time_write, } async def _arm_on_update(self, new_val: int) -> None: """Process an update to the Arm record, to arm/disarm the PandA""" logging.debug(f"Entering HDF5:Arm record on_update method, value {new_val}") try: if new_val: logging.info("Arming PandA") await self._client.send(Arm()) else: logging.info("Disarming PandA") await self._client.send(Disarm()) except Exception: logging.exception("Failure arming/disarming PandA") def create_block_records( self, block: str, block_info: BlockInfo, block_values: dict[EpicsName, str], default_longStringOut_length=256, ) -> dict[EpicsName, RecordInfo]: """Create the block-level records, and any other one-off block initialisation required.""" record_dict = {} for key, value in block_values.items(): # LABEL will either get its value from the block_values if present, # or fall back to the block_info description field. if (value == "" or value is None) and block_info.description: value = block_info.description # The record uses the default _RecordUpdater.update to update the value # on the panda record_dict[EpicsName(key)] = self._create_record_info( key, None, builder.longStringOut, str, PviGroup.INPUTS, length=default_longStringOut_length, initial_value=value, ) if block == "PCAP": # TODO: Need to add PVI Info here. Just use create_record_info? # And why isn't this record in the record_dict? pcap_arm_record = builder.Action( "PCAP:ARM", ZNAM="Disarm", ONAM="Arm", on_update=self._arm_on_update, DESC="Arm/Disarm the PandA", ) add_pcap_arm_pvi_info(PviGroup.INPUTS, pcap_arm_record) HDF5RecordController( self._client, self._dataset_cache, self._record_prefix, ) return record_dict def create_version_records(self, firmware_versions: dict[EpicsName, str]): """Creates handful of records for tracking versions of IOC/Firmware via EPICS Args: firmware_versions (dict[str, str]): Dictionary mapping firmwares to versions """ system_block_prefix = "SYSTEM" ioc_version_record_name = EpicsName(system_block_prefix + ":IOC_VERSION") ioc_version_record = builder.stringIn( ioc_version_record_name, DESC="IOC Version", initial_value=__version__ ) add_automatic_pvi_info( PviGroup.VERSIONS, ioc_version_record, ioc_version_record_name, builder.stringIn, ) for firmware_name, version in firmware_versions.items(): firmware_record_name = EpicsName( system_block_prefix + f":{firmware_name}_VERSION" ) firmware_ver_record = builder.stringIn( firmware_record_name, DESC=firmware_name, initial_value=version ) add_automatic_pvi_info( PviGroup.VERSIONS, firmware_ver_record, firmware_record_name, builder.stringIn, ) def initialise(self, dispatcher: asyncio_dispatcher.AsyncioDispatcher) -> None: """Perform any final initialisation code to create the records. No new records may be created after this method is called. Args: dispatcher (asyncio_dispatcher.AsyncioDispatcher): The dispatcher used in IOC initialisation """ builder.LoadDatabase() softioc.iocInit(dispatcher) async def create_records( client: AsyncioClient, dispatcher: asyncio_dispatcher.AsyncioDispatcher, record_prefix: str, ) -> tuple[ dict[EpicsName, RecordInfo], dict[ EpicsName, RecordValue, ], dict[str, BlockInfo], ]: """Query the PandA and create the relevant records based on the information returned""" # Get version information from PandA using IDN command idn_response = await client.send(Get("*IDN")) fw_vers_dict = get_panda_versions(idn_response) (panda_dict, all_values_dict) = await introspect_panda(client) # Dictionary containing every record of every type all_records: dict[EpicsName, RecordInfo] = {} record_factory = IocRecordFactory(client, record_prefix, all_values_dict) # Add records for version of IOC, FPGA, and software to SYSTEM block record_factory.create_version_records(fw_vers_dict) # For each field in each block, create block_num records of each field for block, panda_info in panda_dict.items(): block_info = panda_info.block_info values = panda_info.values block_vals = { key: value for key, value in values.items() if key.endswith(":LABEL") and isinstance(value, str) } # Create block-level records block_records = record_factory.create_block_records( block, block_info, block_vals ) for new_record in block_records: if new_record in all_records: raise Exception(f"Duplicate record name {new_record} detected.") for block_num in range(block_info.number): # Add a suffix if there are multiple of a block e.g: # "SEQ:TABLE" -> "SEQ3:TABLE" # Block numbers are indexed from 1 suffixed_block = block if block_info.number > 1: suffixed_block += str(block_num + 1) for field, field_info in panda_info.fields.items(): # ":" separator for EPICS Record names, unlike PandA's "." record_name = EpicsName(suffixed_block + ":" + field) # Get the value of the field and all its sub-fields # Watch for cases where the record name is a prefix to multiple # unrelated fields. e.g. for record_name "INENC1:CLK", # values for keys "INENC1:CLK" "INENC1:CLK:DELAY" should match # but "INENC1:CLK_PERIOD" should not field_values = { field: value for field, value in values.items() if field == record_name or field.startswith(record_name + ":") } records = record_factory.create_record( record_name, field_info, field_values ) for new_record in records: if new_record in all_records: raise Exception(f"Duplicate record name {new_record} detected.") for record_info in records.values(): record_info._field_info = field_info block_records.update(records) all_records.update(block_records) Pvi.create_pvi_records(record_prefix) record_factory.initialise(dispatcher) block_info_dict = {key: value.block_info for key, value in panda_dict.items()} return (all_records, all_values_dict, block_info_dict) async def update( client: AsyncioClient, connection_status: ConnectionStatus, all_records: dict[EpicsName, RecordInfo], poll_period: float, all_values_dict: dict[EpicsName, RecordValue], block_info_dict: dict[str, BlockInfo], ): """Query the PandA at regular intervals for any changed fields, and update the records accordingly Args: client: The AsyncioClient that will be used to get the Changes from the PandA all_records: The dictionary of all records that are expected to be updated when PandA reports changes. This is NOT all records in the IOC. poll_period: The wait time, in seconds, before the next GetChanges is called. all_values_dict: The dictionary containing the most recent value of all records as returned from GetChanges. This method will update values in the dict, which will be read and used in other places block_info: information recieved from the last `GetBlockInfo`, keys are block names""" fields_to_reset: list[tuple[RecordWrapper, Any]] = [] # Fairly arbitrary choice of timeout time timeout = 10 * poll_period while True: try: for record, value in fields_to_reset: record.set(value) fields_to_reset.remove((record, value)) try: changes = await client.send(GetChanges(ChangeGroup.ALL, True), timeout) except asyncio.TimeoutError: # Indicates PandA did not reply within the timeout logging.error( f"PandA did not respond to GetChanges within {timeout} seconds. " "Setting all records to major alarm state and disconnecting." ) connection_status.set_status(Statuses.DISCONNECTED) set_all_records_severity( all_records, alarm.MAJOR_ALARM, alarm.READ_ACCESS_ALARM ) await client.close() break _, new_all_values_dict = _create_dicts_from_changes( changes, block_info_dict ) # Apply the new values to the existing dict, so various updater classes # will have access to the latest values. # As this is the only place we write to this dict (after initial creation), # we don't need to worry about locking accesses - the GIL will enforce it all_values_dict.update(new_all_values_dict) for field in changes.in_error: field = PandAName(field) field = panda_to_epics_name(field) if field not in all_records: logging.error( f"Unknown field {field} returned from GetChanges in_error" ) continue logging.info(f"Setting record {field} to invalid value error state.") record_info: RecordInfo = all_records[field] # See PythonSoftIOC #53 if record_info.is_in_record: record_info.record.set_alarm(alarm.INVALID_ALARM, alarm.UDF_ALARM) else: logging.warning( f"Cannot set error state for record {record_info.record.name}" ) for field, value in changes.values.items(): field = PandAName(field) field = panda_to_epics_name(field) if field not in all_records: logging.error( f"Unknown field {field} returned from GetChanges values" ) continue record_info = all_records[field] record = record_info.record # Note bit_mux/pos_mux fields probably should have labels in their # RecordInfo, but that would break this code. This is only designed # for mbbi/mbbo records. converted_value = ( record_info.labels.index(value) if record_info.labels else record_info.data_type_func(value) ) try: if record_info._pending_change: record_info._pending_change = False if converted_value == record_info.record.get(): # This is most likely PandA reporting a value we just Put continue if ( record_info._field_info and record_info._field_info.type == "bit_out" ): # Its possible a bit_out field will be on for pulses shorter # than our polling time. To represent this we will invert the # value now, then set it back on the next update if converted_value == record_info.record.get(): fields_to_reset.append( (record_info.record, converted_value) ) converted_value = not converted_value # Do not process, as the on_update methods push data back to PandA. # Any processing that must be done at value update must be handled # through record_info.on_changes_func() if record_info.on_changes_func: await record_info.on_changes_func(value) extra_kwargs = {} if not record_info.is_in_record: extra_kwargs.update({"process": False}) record.set(converted_value, **extra_kwargs) except Exception: logging.exception( f"Exception setting record {record.name} to new value {value}" ) for table_field, value_list in changes.multiline_values.items(): table_field = PandAName(table_field) table_field = panda_to_epics_name(table_field) # Tables must have a MODE record defined - use it to update the table mode_record_name = EpicsName(table_field + ":MODE") if mode_record_name not in all_records: logging.error( f"Table MODE record {mode_record_name} not found." f" Skipping update to table {table_field}" ) continue mode_record = all_records[mode_record_name].record assert isinstance(mode_record, TableRecordWrapper) mode_record.update_table(value_list) await asyncio.sleep(poll_period) # Only here for testing purposes except asyncio.CancelledError: break except Exception: logging.exception("Exception while processing updates from PandA") continue def set_all_records_severity( all_records: dict[EpicsName, RecordInfo], severity: int, alarm: int ): """Set the severity of all possible records to the given state""" logging.debug(f"Setting all record to severity {severity} alarm {alarm}") for record_info in all_records.values(): if record_info.is_in_record: record_info.record.set_alarm(severity, alarm)