importstructimportsysimportxml.etree.ElementTreeasETfromcollectionsimportdequefromdataclassesimportdataclassfromtypingimportAny,Callable,Deque,Iterator,List,Optional,Tupleimportnumpyasnpfrom._exchangeimportExchange,ExchangeGenerator,Exchangesfrom.commandsimportCommand,CommandExceptionfrom.responsesimport(Data,EndData,EndReason,FieldCapture,FrameData,ReadyData,StartData,)# Define the public API of this module__all__=["NeedMoreData","NoContextAvailable","Buffer","ControlConnection","DataConnection",]# The name of the samples field used for averaging unscaled fieldsSAMPLES_FIELD="PCAP.SAMPLES.Value"
[docs]classNeedMoreData(Exception):"""Raised if the `Buffer` isn't full enough to return the requested bytes"""
[docs]classNoContextAvailable(Exception):"""Raised if there were no contexts available for this connection. This may result from calling `ControlConnection.receive_bytes()` without calling `ControlConnection.send()`, or if there were unmatched sends/receives"""
[docs]classBuffer:"""Byte storage that provides line reader and bytes reader interfaces. For example:: buf = Buffer() buf += bytes_from_server line = buf.read_line() # raises NeedMoreData if no line for line in buf: pass bytes = buf.read_bytes(50) # raises NeedMoreData if not enough bytes """def__init__(self):self._buf=bytearray()def__iadd__(self,byteslike:bytes):"""Add some data from the server"""self._buf+=byteslikereturnselfdef_extract_frame(self,num_to_extract,num_to_discard=0)->bytearray:# extract num_to_extract bytes from the start of the bufferframe=self._buf[:num_to_extract]# Update the buffer in place, to take advantage of bytearray's# optimized delete-from-beginning feature.delself._buf[:num_to_extract+num_to_discard]returnframe
[docs]defread_bytes(self,num:int)->bytearray:"""Read and pop num bytes from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so"""ifnum>len(self._buf):raiseNeedMoreData()else:returnself._extract_frame(num)
[docs]defpeek_bytes(self,num:int)->bytearray:"""Read but do not pop num bytes from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so"""ifnum>len(self._buf):raiseNeedMoreData()else:returnself._buf[:num]
[docs]defread_line(self):"""Read and pop a newline terminated line (without terminator) from the beginning of the buffer, raising `NeedMoreData` if the buffer isn't full enough to do so"""idx=self._buf.find(b"\n")ifidx<0:raiseNeedMoreData()else:returnself._extract_frame(idx,num_to_discard=1)
@dataclassclass_ExchangeContext:#: The exchange we should be fillingexchange:Exchange#: The command that produced itcommand:Command#: If this was the last in the list, the generator to call nextgenerator:Optional[ExchangeGenerator[Any]]=Nonedefexception(self,e:Exception)->CommandException:"""Return a `CommandException` with the sent and received strings in the text"""msg=f"{self.command} ->"ifself.exchange.is_multiline:forlineinself.exchange.multiline:msg+="\n "+lineelse:msg+=" "+self.exchange.lineife.args:msg+=f"\n{type(e).__name__}:{e}"returnCommandException(msg).with_traceback(e.__traceback__)
[docs]classControlConnection:"""Sans-IO connection to control port of PandA TCP server, supporting a Command based interface. For example:: cc = ControlConnection() # Connection says what bytes should be sent to execute command to_send = cc.send(command) socket.sendall(to_send) while True: # Repeatedly process bytes from the PandA received = socket.recv() # Sending any subsequent bytes to be sent back to the PandA to_send = cc.receive_bytes(received) socket.sendall(to_send) # And processing the produced responses for command, response in cc.responses() do_something_with(response) """def__init__(self)->None:self._buf=Buffer()self._lines:List[str]=[]self._contexts:Deque[_ExchangeContext]=deque()self._responses:Deque[Tuple[Command,Any]]=deque()def_update_contexts(self,lines:List[str],is_multiline=False)->bytes:to_send=b""iflen(self._contexts)==0:raiseNoContextAvailable()context=self._contexts.popleft()# Update the exchange with what we've gotcontext.exchange.received=linescontext.exchange.is_multiline=is_multiline# If we're given a generator to run then do soifcontext.generator:try:# Return the bytes from sending the next bit of the commandexchanges=next(context.generator)exceptStopIterationase:# Command complete, store the resultself._responses.append((context.command,e.value))exceptExceptionase:# Command failed, store an exceptionself._responses.append((context.command,context.exception(e)))else:to_send=b"".join(self._bytes_from_exchanges(exchanges,context.command,context.generator))returnto_senddef_bytes_from_exchanges(self,exchanges:Exchanges,command:Command,generator:ExchangeGenerator[Any])->Iterator[bytes]:ifnotisinstance(exchanges,list):exchanges=[exchanges]# No Exchanges when a Command's yield is empty e.g. unexpected/unparseable data# received from PandAiflen(exchanges)==0:returnforexinexchanges:context=_ExchangeContext(ex,command)self._contexts.append(context)text="\n".join(ex.to_send)+"\n"yieldtext.encode()# The last exchange gets the generator so it triggers the next thing to sendcontext.generator=generator
[docs]defreceive_bytes(self,received:bytes)->bytes:"""Tell the connection that you have received some bytes off the network. Parse these into high level responses which are yielded back by `responses`. Return any bytes to send back"""self._buf+=receivedis_multiline=bool(self._lines)to_send=b""forline_binself._buf:line=line_b.decode()ifnotis_multiline:# Check if we need to switch to multiline modeis_multiline=line.startswith("!")orline=="."ifis_multiline:# Add a new line to the bufferself._lines.append(line)ifline==".":# End of multiline mode, return what we've gotto_send+=self._update_contexts(self._lines,is_multiline)self._lines=[]is_multiline=Falseelse:# Check a correctly formatted responseassertline.startswith("!"),("Multiline response %r doesn't start with !"%line)else:# Single line modeassertnotself._lines,("Multiline response %s not terminated"%self._lines)to_send+=self._update_contexts([line])returnto_send
[docs]defresponses(self)->Iterator[Tuple[Command,Any]]:"""Get the (command, response) tuples generated as part of the last receive_bytes"""whileself._responses:yieldself._responses.popleft()
[docs]defsend(self,command:Command)->bytes:"""Tell the connection you want to send an event, and it will return some bytes to send down the network """# If not given a partially run generator, start one heregenerator=command.execute()exchanges=next(generator)to_send=b"".join(self._bytes_from_exchanges(exchanges,command,generator))returnto_send
[docs]classDataConnection:"""Sans-IO connection to data port of PandA TCP server, supporting an flushable iterator interface. For example:: dc = DataConnection() # Single connection string to send to_send = dc.connect() socket.sendall(to_send) while True: # Repeatedly process bytes from the PandA looking for data received = socket.recv() for data in dc.receive_bytes(received): do_something_with(data) """def__init__(self)->None:# TODO: could support big endian, but are there any systems out there?assertsys.byteorder=="little","PandA sends data little endian"# Store of bytes received so far to parse in the handlersself._buf=Buffer()# Header text from PandA with field infoself._header=""# The next parsing handler that should be called if there is data in bufferself._next_handler:Optional[Callable[[],Optional[Iterator[Data]]]]=None# numpy dtype of produced FrameDataself._frame_dtype=None# frame data that has been received but not flushed yetself._partial_data=bytearray()# whether to flush after every frameself._flush_every_frame=Falsedef_handle_connected(self):# Get the response from connect()line=self._buf.read_line()assertline==b"OK",f"Expected OK, got {line!r}"yieldReadyData()self._next_handler=self._handle_header_startdef_handle_header_start(self):# Discard lines until we see header start tagline=self._buf.read_line()ifline==b"<header>":self._header=lineself._next_handler=self._handle_header_bodydef_handle_header_body(self):# Accumumlate header until the end tag, then parese and returnline=self._buf.read_line()self._header+=lineifline==b"</header>":fields=[]root=ET.fromstring(self._header)forfieldinroot.find("fields"):fields.append(FieldCapture(name=str(field.get("name")),type=np.dtype(field.get("type")),capture=str(field.get("capture")),scale=float(field.get("scale",1)),offset=float(field.get("offset",0)),units=str(field.get("units","")),))data=root.find("data")sample_bytes=int(data.get("sample_bytes"))ifsample_bytes-sum(f.type.itemsizeforfinfields)==4:# In raw mode with panda-server < 2.1 samples wasn't# put in if not specifically requested, but was still# sentname,capture=SAMPLES_FIELD.rsplit(".",maxsplit=1)fields.insert(0,FieldCapture(name,np.dtype("uint32"),capture),)self._frame_dtype=np.dtype([(f"{f.name}.{f.capture}",f.type)forfinfields])yieldStartData(fields=fields,missed=int(data.get("missed")),process=str(data.get("process")),format=str(data.get("format")),sample_bytes=sample_bytes,)self._next_handler=self._handle_header_enddef_handle_header_end(self):# Discard the newline at the end of the headerassertself._buf.read_bytes(1)==b"\n","Expected newline at end of header"self._next_handler=self._handle_data_startdef_handle_data_start(self):# Handle "BIN " or "END "bytes=self._buf.read_bytes(4)ifbytes==b"BIN ":self._next_handler=self._handle_data_frameelifbytes==b"END ":self._next_handler=self._handle_data_endelse:raiseValueError(f"Bad data '{bytes}'")def_handle_data_frame(self):# Handle a whole data frame# Peek message length as uint32 LE# length = len("BIN " + 4_bytes_encoded_length + data)length=struct.unpack("<I",self._buf.peek_bytes(4))[0]# we already read "BIN ", so read the restdata=self._buf.read_bytes(length-4)[4:]self._partial_data+=data# if told to flush now, then yield what we haveifself._flush_every_frame:yield fromself.flush()self._next_handler=self._handle_data_startdef_handle_data_end(self):# Handle the end reasonsamples,reason=self._buf.read_line().split(maxsplit=1)reason_enum=EndReason(reason.decode())# Flush whatever is not already flushedyield fromself.flush()yieldEndData(samples=int(samples),reason=reason_enum)self._next_handler=self._handle_header_start
[docs]defreceive_bytes(self,received:bytes,flush_every_frame=True)->Iterator[Data]:"""Tell the connection that you have received some bytes off the network. Parse these into Data structures and yield them back. Args: received: the bytes you received from the socket flush_every_frame: Whether to flush `FrameData` as soon as received. If False then they will only be sent if `flush` is called or end of acquisition reached """assertself._next_handler,"Connect not called"self._flush_every_frame=flush_every_frameself._buf+=receivedwhileTrue:# Each of these handlers should call read at most once, so that# if we don't have enough data we don't lose partial datatry:ret=self._next_handler()ifret:# This is an iterator of Data objectsyield fromretexceptNeedMoreData:break
[docs]defflush(self)->Iterator[FrameData]:"""If there is a partial data frame, pop and yield it"""ifself._partial_data:# Make a numpy array wrapper to the bytearray, no copying heredata=np.frombuffer(self._partial_data,self._frame_dtype)# Make a new bytearray, numpy view will keep the reference# to the old one so can't clear it in placeself._partial_data=bytearray()yieldFrameData(data)
[docs]defconnect(self,scaled:bool)->bytes:"""Return the bytes that need to be sent on connection"""assertnotself._next_handler,"Can't connect while midway through collection"self._next_handler=self._handle_connectedifscaled:returnb"XML FRAMED SCALED\n"else:returnb"XML FRAMED RAW\n"