Source code for oxberrypis.net.controller.channel

"""Network components responsible for a single channel parsing/publishing."""
import threading

import zmq

from ...parsing.parsers import FileXDPChannelUnpacker as Unpacker
from .msgs_factories import StockMessagesFactory


[docs]class ChannelStockMsgsGenerator(object): """Stock messages generator from the channel. :param directory: Directory with NYSE Arca Integrated Feed channels files. :type directory: string :param channel_id: Channel ID to be parsed. :type channel_id: integer :param factory: Factory for :py:class:`StockMessage` objects, defaults to an instance of :py:class:`StockMessagesFactory`. """ def __init__(self, directory, channel_id, factory=None): self.unpacker = Unpacker.get_channel_unpacker( directory, channel_id, ) self.factory = factory or StockMessagesFactory()
[docs] def generate_stock_msgs(self): """Generate stock messages (with stock ids). Yields :py:class:`StockMessage` object for every known message found in the channel stream. """ for (pkt_hdr, msg) in self.unpacker.parse(): stock_msg = self.factory.create(pkt_hdr, msg) # Ignore messages with no handler in the factory if stock_msg is None: continue # Ignore messages without symbol index if not hasattr(msg, 'SymbolIndex'): continue stock_id = msg.SymbolIndex serialized = stock_msg.SerializeToString() yield stock_id, stock_msg
[docs]class ChannelPublisher(object): """Channel publisher. Parses a single channel and publishes :py:class:`StockMessage` messages. :param context: ZMQ context. :param uri: ZMQ URI publisher publishes to. :param directory: Directory with NYSE Arca Integrated Feed channels files. :param channel_id: Channel ID to be parsed. :type channel_id: integer """ def __init__(self, context, uri, directory, channel_id): # Setup a socket self.publisher = context.socket(zmq.PUB) self.publisher.connect(uri) self.channel_id = channel_id self.generator = ChannelStockMsgsGenerator(directory, channel_id) self.factory = StockMessagesFactory()
[docs] def run(self): """Run the publisher.""" for (symbol_index, stock_msg) in self.generator.generate_stock_msgs(): serialized = stock_msg.SerializeToString() # 2^32 (max SymbolIndex) is 10-digit number prefix = str(symbol_index).zfill(10) self.publisher.send_multipart( [prefix, str(self.channel_id), serialized] ) self.publisher.close()
[docs]class ChannelPublisherThread(threading.Thread): """Channel publisher thread.""" def __init__(self, context, uri, directory, channel_id, name=None): name = name or "ChannelPublisher{}".format(channel_id) super(ChannelPublisherThread, self).__init__(name=name) self.channel_publisher = ChannelPublisher( context, uri, directory, channel_id, )
[docs] def run(self): """Run the thread.""" self.channel_publisher.run()