from __future__ import print_function
import socket
import select
import struct
import time
import bisect
import imp
import os
import sys
from collections import namedtuple, deque
import numpy as np
from .configs import BlockConfig, RegisterCounter, make_getter_setter
from .compat import TYPE_CHECKING, add_metaclass
if TYPE_CHECKING:
from typing import List, Dict, Tuple, Any
# These are the powers of two in an array
POW_TWO = 2 ** np.arange(32, dtype=np.uint32)
ROOT = os.path.join(os.path.dirname(__file__), "..", "..")
# FPGA clock tick in seconds
CLOCK_TICK = 1.0 / 125e6
# We daemonise the server by double forking, but we leave the controlling
# terminal and other file connections alone.
def daemonise():
if os.fork():
# Exit first parent
sys.exit(0)
# Do second fork to avoid generating zombies
if os.fork():
sys.exit(0)
def properties_from_ini(src_path, ini_name=None):
# type: (str, str) -> Tuple[Any, List[property]]
if ini_name is None:
# Only given src_path, calculate ini_path and ini_name from it
ini_name = os.path.basename(src_path)
ini_path = src_path
else:
# Given both, ini_name is in the same dir as src_path
ini_path = os.path.join(os.path.dirname(src_path), ini_name)
assert ini_name.endswith(".block.ini"), \
"Expected <block>.block.ini, got %s" % ini_name
block_name = ini_name[:-len(".block.ini")]
properties = []
names = []
block_config = BlockConfig(block_name.upper(), "soft", 1, ini_path)
block_config.register_addresses(RegisterCounter())
for field in block_config.fields:
for config in field.registers + field.bus_entries:
# Delay register swallowed by wrapper, so don't expose to simulation
if not config.name.endswith("_DLY"):
names.append(config.name)
prop = property(*make_getter_setter(config))
properties.append(prop)
# Create an object BlockNames with attributes FIELD1="FIELD1", F2="F2", ...
names = namedtuple("%sNames" % block_name.title(), names)(*names)
return names, properties
class BlockSimulationMeta(type):
"""Metaclass to make sure all field names are bound to the correct
instance attribute names"""
def __new__(cls, clsname, bases, dct):
for name, val in dct.items():
if isinstance(val, property) and hasattr(val.fget, "config"):
config = getattr(val.fget, "config")
if config:
assert name == config.name, \
"Property %s mismatch with Config name %s" % (
name, config.name)
return super(BlockSimulationMeta, cls).__new__(cls, clsname, bases, dct)
[docs]@add_metaclass(BlockSimulationMeta)
class BlockSimulation(object):
bit_bus = np.zeros(128, dtype=np.bool_)
pos_bus = np.zeros(32, dtype=np.int32)
pos_change = []
#: This will be dictionary with changes pushed by any properties created
#: with properties_from_ini()
changes = None
[docs] @classmethod
def bits_to_int(cls, bits):
"""Convert 32 element bit array into an int number"""
return np.dot(bits, POW_TWO)
[docs] def on_changes(self, ts, changes):
"""Handle field changes at a particular timestamp
Args:
ts (int): The timestamp the changes occurred at
changes (dict): Field names that changed with their integer value
Returns:
If the Block needs to be called back at a particular ts then return
that int, otherwise return None and it will be called when a field
next changes
"""
# Set attributes
for name, value in changes.items():
setattr(self, name, value)
class SocketFail(Exception):
pass
class SimulationServer(object):
"""Simulation server exposing PandA simulation controller to TCP server"""
def __init__(self, controller):
"""Start simulation server and create controller
Args:
controller(Controller): Zebra2 controller object
"""
self.controller = controller
self.sock_l = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock_l.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock_l.bind(('localhost', 9999))
self.sock_l.listen(0)
# The socket we will create on run()
self.sock = None
def run(self):
"""Accept the first connection to server, then start simulation"""
(self.sock, addr) = self.sock_l.accept()
self.sock_l.close()
# Set no delay on this as we're only looking at tiny amounts of data
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
# Now start ticking the simulation
try:
while True:
timeout = self.controller.calc_timeout()
# wait for up to timeout for some data
# timeout can be negative!
(rlist, _, _) = select.select((self.sock,), (), (), timeout)
# If we got a response, service it
if rlist:
self._respond()
# Now service the controller
self.controller.do_tick()
except (KeyboardInterrupt, SocketFail) as e:
print("Simulation closed: %r" % e)
def _read(self, n):
"""Blocking read n bytes from socket and return them"""
result = ''
while len(result) < n:
rx = self.sock.recv(n - len(result))
if not rx:
raise SocketFail('End of input')
result = result + rx
return result
def _respond(self):
"""Read a command from the socket and respond to it"""
command_word = self._read(4)
command, block, num, reg = struct.unpack('cBBB', command_word)
if command == 'R':
# Read one register
tx = self.controller.do_read_register(block, num, reg)
self.sock.sendall(struct.pack('I', tx))
elif command == 'W':
# Write one register
value, = struct.unpack('I', self._read(4))
self.controller.do_write_register(block, num, reg, value)
elif command == 'T':
# Write data array to large table
length, = struct.unpack('I', self._read(4))
data = self._read(length * 4)
data = np.fromstring(data, dtype=np.int32)
self.controller.do_write_table(block, num, data)
elif command == 'D':
# Retrieve increment of data stream
length, = struct.unpack('I', self._read(4))
data = self.controller.do_read_capture(length / 4)
if data is None:
self.sock.sendall(struct.pack('i', -1))
else:
assert data.dtype == np.int32
raw_data = data.data
assert len(raw_data) <= length
self.sock.sendall(struct.pack('I', len(raw_data)))
self.sock.sendall(raw_data)
else:
print('Unexpected command', repr(command_word))
raise SocketFail('Unexpected command')
class SimulationController(object):
def __init__(self, verbose):
self.verbose = verbose
# Changesets
self.bit_changes = np.zeros(128, dtype=np.bool_)
self.pos_changes = np.zeros(32, dtype=np.bool_)
# Lookup from (block_num, num, reg) -> (Block instance, attr_name)
self.lookup = {} # type: Dict[Tuple[int, int, int], Tuple[object, str]]
# Map (block, attr) -> bus, bus_changes, idx
# Map ("bit"/"pos", idx) -> block, attr
self.bus_lookup = {}
# Bus names for each mux
# {(block, attr):"bit"/"pos"}
self.bus_muxes = {}
# Now divert the *REG registers to us
self.lookup[(0, 0, 0)] = (self, "FPGA_VERSION")
self.FPGA_VERSION = 0x111
self.lookup[(0, 0, 1)] = (self, "FPGA_BUILD")
self.FPGA_BUILD = 0x222
self.lookup[(0, 0, 2)] = (self, "USER_VERSION")
self.USER_VERSION = 0x333
self.lookup[(0, 0, 3)] = (self, "BIT_READ_RST")
self.lookup[(0, 0, 4)] = (self, "BIT_READ_VALUE")
self.lookup[(0, 0, 5)] = (self, "POS_READ_RST")
self.lookup[(0, 0, 6)] = (self, "POS_READ_VALUE")
self.lookup[(0, 0, 7)] = (self, "POS_READ_CHANGES")
# When did we start
self.start_time = time.time()
# When do our blocks next need to be woken up?
# List of (int ts, Block block, dict changes)
self.wakeups = []
# These are the next wakeup times for each block
self.next_wakeup = {}
# What blocks are listening to each bit_bus and pos_bus parameter?
# {(block, attr_name): [(block, attr_name)]}
self.listeners = {}
# What delay should each bit_mux have
# Dict of (Block block, str attr) -> int dly
self.delays = {}
# The pcap Block
self.pcap = None
# Start from base register 2 to allow for *REG and *DRV spaces
self.counters = RegisterCounter(block_count=2)
def create_block(self, ini_path, number, block_address):
"""Create an instance of the block if we can, or a Block if we can't
Args:
ini_path (str): The path to the .block.ini file, relative to TOP
number (int): The number of instances Blocks that will be created,
like 8
block_address (int): The Block section of the register address space
"""
block_name = os.path.basename(ini_path).replace(".block.ini", "")
block_config = BlockConfig(block_name.upper(), "soft", number, ini_path)
block_config.register_addresses(self.counters)
assert block_address == block_config.block_address
ini_path = os.path.join(ROOT, ini_path)
module_path = os.path.dirname(ini_path)
try:
f, pathname, description = imp.find_module(
block_name + "_sim", [module_path])
package = imp.load_module(
block_name + "_sim", f, pathname, description)
clsnames = [n for n in dir(package)
if n.lower() == block_name + "simulation"]
cls = getattr(package, clsnames[0])
print("Got %s sim" % cls.__name__)
except ImportError:
print("No %s sim, using BlockSimulation" % block_name.title())
class cls(BlockSimulation):
pass
cls.__name__ = block_name.title() + "Simulation"
for name, prop in zip(*properties_from_ini(ini_path)):
setattr(cls, name, prop)
# Make instances of it
for i in range(number):
inst = cls()
inst.changes = {}
for field in block_config.fields:
# If it's a mux, add it to the list
if field.type.endswith("_mux"):
self.bus_muxes[(inst, field.name)] = field.type[:-4]
for register in field.registers:
# Delays handled differently
if register.name.endswith("_DLY"):
self.delays[(inst, register.name)] = 0
self.lookup[(block_address, i, register.number)] = (
inst, register.name)
if field.bus_entries:
bus_entry = field.bus_entries[i]
if bus_entry.bus == "pos":
bus, bus_changes = cls.pos_bus, self.pos_changes
elif bus_entry.bus == "bit":
bus, bus_changes = cls.bit_bus, self.bit_changes
else:
# ignore ext_out
continue
# Rely on the fact that pos_out and bit_out both produce
# only one entry per block instance
assert len(field.bus_entries) == number, \
"%s.%s doesn't have %d bus entries, it has %d" % (
block_name, field.name, number,
len(field.bus_entries))
self.bus_lookup[(bus_entry.bus, bus_entry.index)] = (
inst, bus_entry.name)
self.bus_lookup[(inst, bus_entry.name)] = (
bus, bus_changes, bus_entry.index)
# Store PCAP
if block_name == "pcap":
self.pcap = inst
inst.tick_data = False
# And divert the *REG PCAP registers to PCAP
self.lookup[(0, 0, 8)] = (self.pcap, "PCAP_START_WRITE")
self.lookup[(0, 0, 9)] = (self.pcap, "PCAP_WRITE")
self.lookup[(0, 0, 10)] = (self.pcap, "PCAP_ARM")
self.lookup[(0, 0, 11)] = (self.pcap, "PCAP_DISARM")
def do_read_register(self, block_num, num, reg):
"""Read the register value for a given block and register number
Args:
block_num (int): The register base for the block
num (int): The instance number of the block (from 0..maxnum-1)
reg (int): The field register offset for the block
Returns:
int: The value of the register
"""
try:
block, name = self.lookup[(block_num, num, reg)]
except KeyError:
print('Unknown read register', block_num, num, reg)
value = 0
else:
value = getattr(block, name)
return value
def do_write_register(self, block_num, num, reg, value):
"""Write the register value for a given block and register number
Args:
block_num (int): The register base for the block
num (int): The instance number of the block (from 0..maxnum-1)
reg (int): The field register offset for the block
value (int): The value to write
"""
try:
block, name = self.lookup[(block_num, num, reg)]
except KeyError:
print('Unknown write register', block_num, num, reg)
else:
if block == self:
if name == "BIT_READ_RST":
self.capture_bit_bus()
elif name == "POS_READ_RST":
self.capture_pos_bus()
else:
print('Not writing register %s to %s' % (name, value))
else:
if self.verbose:
print("Write %s[%d].%s=%s" % (
block.__class__.__name__, num, name, value))
if (block, name) in self.delays:
# Note: this is different from the FPGA implementation
block_changes = {}
self.delays[(block, name)] = value
elif (block, name) in self.bus_muxes:
bus = self.bus_muxes[(block, name)]
block_changes = self.update_mux(block, name, value, bus)
else:
block_changes = {block: {name: value}}
self.do_tick(block_changes)
def do_write_table(self, block_num, num, data):
"""Write a table value for a given block and register number
Args:
block_num (int): The register base for the block
num (int): The instance number of the block (from 0..maxnum-1)
data (numpy.ndarray): The data to write
"""
try:
block, name = self.lookup[(block_num, num, -1)]
except KeyError:
print('Unknown table register', block_num, num)
else:
# Send data to long table data attribute of block
block_changes = {block: {name: data}}
self.do_tick(block_changes)
def do_read_capture(self, max_length):
"""Read the capture data array from self.pcap
Args:
max_length (int): Max number of int32 words to read
"""
return self.pcap.read_data(max_length)
def do_tick(self, block_changes=None):
"""Tick the simulation given the block and changes, or None
Args:
block_changes (dict): map str name -> int value of attrs that have
changed
"""
ts = int((time.time() - self.start_time) / CLOCK_TICK)
if block_changes is None:
block_changes = {}
# If we have a wakeup, then make sure we aren't going back in time
if self.wakeups:
wake_ts, _, _ = self.wakeups[0]
if ts > wake_ts:
ts = wake_ts
# Keep adding blocks to be processed at this ts
while self.wakeups:
wake_ts, wake_block, wake_changes = self.wakeups[0]
if ts == wake_ts:
block_changes.setdefault(wake_block, wake_changes)
self.wakeups.pop(0)
self.next_wakeup[wake_block] = None
else:
break
# Wake the selected blocks up
new_wakeups = self.process_blocks(ts, block_changes)
# Schedule the wakeups at the correct time
for (block, wakeup), changes in new_wakeups.items():
self.remove_wakeup(block)
self.insert_wakeup(wakeup, block, changes)
def process_blocks(self, ts, block_changes):
# map block -> changes
new_wakeups = {}
pos_changes = []
for block, changes in block_changes.items():
# Remove the old wakeup if we have one
self.remove_wakeup(block)
next_ts = block.on_changes(ts, changes)
# Update bit_bus and pos_bus
for attr, val in block.changes.items():
# Map (block, attr) -> bus, bus_changes, idx
# Map (bus, idx) -> block, attr
data = self.bus_lookup.get((block, attr), None)
if data is None:
continue
bus, bus_changes, idx = data
if bus is BlockSimulation.pos_bus:
pos_changes.append(idx)
bus[idx] = val
bus_changes[idx] = 1
# If someone's listening, tell them about it
for lblock, lattr in self.listeners.get((block, attr), []):
# How long do they need to wait for an event
dly = self.delays.get((lblock, lattr), 0)
# Add it to our list of new wakeups for this block
key = (lblock, ts+1+dly)
new_wakeups.setdefault(key, {})[lattr] = val
# Reset the changed attributes dict
block.changes = {}
# If we are due to be woken up, add this one to the wakeup dict
if next_ts is not None:
new_wakeups.setdefault((block, next_ts), {})
if pos_changes:
new_wakeups.setdefault((self.pcap, ts+1), {})["POS_BUS"] = \
pos_changes
return new_wakeups
def insert_wakeup(self, ts, block, changes):
assert block not in self.next_wakeup, \
"Block %s already has a wakeup" % block
item = (ts, block, changes)
# Insert the new wakeup
index = bisect.bisect(self.wakeups, item)
self.wakeups.insert(index, item)
self.next_wakeup[block] = ts
def remove_wakeup(self, block):
# Delete the old entry
old_ts = self.next_wakeup.pop(block, None)
if old_ts is not None:
index = bisect.bisect(self.wakeups, (old_ts, None, None))
while True:
wakeup = self.wakeups[index]
assert wakeup[0] == old_ts, \
"Gone too far %d > %d" % (wakeup[0], old_ts)
if wakeup[1] == block:
assert wakeup[2] == {}, \
"Popping a wakeup with changes: %s" % wakeup
self.wakeups.pop(index)
return
def calc_timeout(self):
"""Calculate how long before the next wakeup is due
Returns:
int: The time before next wakeup or zero if past or None if no
wakeup set
"""
if self.wakeups:
next_time = self.wakeups[0][0] * CLOCK_TICK + self.start_time
return max(0, next_time - time.time())
def update_mux(self, block, name, value, bus):
"""Update listeners for muxes on block
Args:
block (Block): Block that is updating an attr value
name (str): Attribute name that is changing
value (int): New value
bus (str): bit or pos
Returns:
dict: map block -> {name: value} to be passed as block_changes
"""
# Remove any old listener entries
for listeners in self.listeners.values():
try:
listeners.remove((block, name))
except ValueError:
pass
# check old values
old_bus_val = getattr(block, name)
if bus == "bit":
if value == 128:
new_bus_val = 0
elif value == 129:
new_bus_val = 1
else:
new_bus_val = BlockSimulation.bit_bus[value]
else:
if value == 32:
new_bus_val = 0
else:
new_bus_val = BlockSimulation.pos_bus[value]
lblock, lattr = self.bus_lookup.get((bus, value), (None, None))
if lblock:
# This comes from a block rather than being a constant, so add
# ourself to the listeners for this field
self.listeners.setdefault((lblock, lattr), []).append((block, name))
# Generate changes
if old_bus_val != new_bus_val:
return {block: {name: new_bus_val}}
def capture_bit_bus(self):
"""Capture bit bus so BIT_READ_VALUE can use it"""
self._bit_read_data = deque()
tmp_bits = np.empty(32, dtype=np.bool_)
for i in range(8):
# Pack bits from bit_bus into 32-bit number and add it to list
# Top half is bit bus
tmp_bits[16:] = BlockSimulation.bit_bus[i*16:(i+1)*16]
# Bottom half is bit changes
tmp_bits[:16] = self.bit_changes[i*16:(i+1)*16]
vals = BlockSimulation.bits_to_int(tmp_bits)
self._bit_read_data.append(vals)
self.bit_changes.fill(0)
@property
def BIT_READ_VALUE(self):
return int(self._bit_read_data.popleft())
def capture_pos_bus(self):
"""Capture pos bus so POS_READ_VALUE and POS_READ_CHANGES can read it"""
self._pos_read_data = deque(BlockSimulation.pos_bus)
self.POS_READ_CHANGES = int(BlockSimulation.bits_to_int(self.pos_changes))
self.pos_changes.fill(0)
@property
def POS_READ_VALUE(self):
return int(self._pos_read_data.popleft())