Source code for oxberrypis.parsing.parsers

"""Parsers for XDP streams."""
import os
import io
import struct
import os.path

from oxberrypis.errors import ParsingError

from oxberrypis.parsing.headers import PacketHeader
from oxberrypis.parsing.headers import MsgHeader


[docs]class XDPChannelUnpacker(object): """XDP channel parser (unpacker). The parser reads the supplied :py:attr:`stream`, unpacks packet headers, reads all message headers and unpacks only known messages. .. note:: The parser assumes that the stream position is not manipulated during the lifetime of this object. Packets and messages are assumed to appear in the stream consecutively. The parsers uses :py:class:`.headers.PacketHeader` and :py:class:`.headers.MsgHeader` for parsing, but you can supply your own header classes by setting :py:attr:`pkt_hdr_cls` and :py:attr:`msg_header_cls` to appropriate values. :param stream: Readable stream which feeds the parser. :param pkt_hdr_cls: Packet header class. Defaults to :py:class:`.headers.PacketHeader`. :param msg_hdr_cls: Message header class. Defaults to :py:class:`.headers.MessageHeader`. """ def __init__(self, stream, pkt_hdr_cls=None, msg_hdr_cls=None): if not stream.readable(): raise ParsingError("The stream is not readable") self.stream = stream self.pkt_hdr_cls = pkt_hdr_cls or PacketHeader self.msg_hdr_cls = msg_hdr_cls or MsgHeader
[docs] def parse_cls_from_stream(self, cls, size): """Read the stream and parse to create an instance of given class. Assumes ``cls._make`` is a method accepting a tuple of unpacked data. Returns ``None`` if the stream is empty and raises an error if unpacking failed. """ data = self.stream.read(size) if not data: # stream is empty return None try: unpacked = struct.unpack_from(cls.fmt, data) except struct.error as e: raise ParsingError('Unpacking failed') parsed = cls._make(unpacked) return parsed
[docs] def parse_msg(self, msg_header): """Parse a message from the stream. Message header is used to determined the size of the payload and the type of the message contained within it. """ size = msg_header.get_msg_size() cls = msg_header.get_msg_cls() msg = self.parse_cls_from_stream(cls, size) return msg
[docs] def advance(self, offset): """Skip ``offset`` number of bytes in the stream.""" if self.stream.seekable(): self.stream.seek(offset, os.SEEK_CUR) else: self.stream.read(offset)
[docs] def parse_packet(self, pkt_header): """Generator for messages found within packet. Packet header is used to read off the number of messages contained within the packet. Then unpacks message headers one by one and if the message type is recognised, the message is parsed. Otherwise position of the stream is advanced. """ count = pkt_header.NumberMsgs while count > 0: header = self.parse_cls_from_stream( self.msg_hdr_cls, self.msg_hdr_cls.header_size, ) if header.is_known(): msg = self.parse_msg(header) yield msg else: offset = header.get_msg_size() self.advance(offset) count -= 1
[docs] def parse(self): """Generator for all the known messages from the stream. As long as there is a data remaining in the stream (assumed to be sufficiently long), it unpack the packet header. For each packet header parses the messages contained within. """ while True: pkt_header = self.parse_cls_from_stream( self.pkt_hdr_cls, self.pkt_hdr_cls.header_size, ) if pkt_header is None: # End reading the stream as no data remains break msgs = self.parse_packet(pkt_header) for msg in msgs: yield (pkt_header, msg) # Close the stream when we are done self.stream.close()
[docs]class FileXDPChannelUnpacker(XDPChannelUnpacker): """XDP channel parser (unpacker) with file stream.""" """Standard channel file name.""" CHANNEL_FILE_NAME_FMT = "20111219-ARCA_XDP_IBF_{}.dat" def __init__(self, channel_path, pkt_hdr_cls=None, msg_hdr_cls=None): """Initialise the parser with the stream from opened file.""" stream = self.open_stream(channel_path) super(FileXDPChannelUnpacker, self).__init__( stream, pkt_hdr_cls, msg_hdr_cls, )
[docs] def open_stream(self, channel_path): """Return opened stream for the given file found at ``channel_path``.""" if not os.path.exists(channel_path): msg = "Channel {} not found".format(channel_path) raise ParsingError(msg) stream = io.open(channel_path, 'rb') return stream
@classmethod
[docs] def get_channel_path(cls, directory, channel, channel_file_name_fmt=None): """Get path to the channel found in the directory. Channel file name format string defaults to string set in :py:attr:`CHANNEL_FILE_NAME_FMT` but it may be changed by calling the function with appropriate ``channel_file_name_fmt`` value. """ channel_file_name_fmt = channel_file_name_fmt or cls.CHANNEL_FILE_NAME_FMT file_name = channel_file_name_fmt.format(channel) path = os.path.join(directory, file_name) return path
@classmethod
[docs] def get_channel_unpacker(cls, directory, channel_id): """Factory for channel unpacker given ``directory`` and ``channel_id``.""" channel_path = cls.get_channel_path(directory, channel_id) return cls(channel_path)