Source code for oxberrypis.net.rpi.subscriber

"""RaspberryPi stock messages subscriber/handler."""
import zmq

from ..components import SynchronizedSubscriber

from ..proto.stock_pb2 import StockMessage
from ..proto.controller_pb2 import SetupRPi

from ...orderbook.matching_engine import MatchingEngine

from .handlers import PrintingHandler
from .handlers import ToVisualisation
from .handlers import UpdateMatchingEngines


[docs]class StockMessagesSubscriber(object): """Stock messages subscriber. :param context: ZMQ context. :param rpisync_uri: ZMQ URI for syncing with the :py:class:`.Initializer` :param pub_uri: ZMQ URI for stock messages publisher. :param visual_uri: ZMQ URI visualisation binds to. :param me_handler_cls: Handler class for updating matching engines. :param visual_handler_cls: Handler class for sending stock events to visualisation. """ def __init__(self, context, rpisync_uri, pub_uri, visual_uri, me_handler_cls=UpdateMatchingEngines, visual_handler_cls=ToVisualisation): self.matching_engines = {} printing_handler = PrintingHandler() ordebook_handler = me_handler_cls(self.matching_engines) visual_handler = visual_handler_cls( self.matching_engines, context, visual_uri, ) self.handlers = [ printing_handler, ordebook_handler, visual_handler, ] # For the time being we don't know what to subscribe to. # We will resubscribe in sync_reply_handler subscriptions = [] self.sub = SynchronizedSubscriber( context, pub_uri, rpisync_uri, subscriptions, self.handle_data, self.sync_reply_handler, )
[docs] def sync_reply_handler(self, data): """Handler for synchronization reply message.""" setup_rpi = SetupRPi() setup_rpi.ParseFromString(data) for symbol_index in setup_rpi.symbol_index: self.matching_engines[symbol_index] = MatchingEngine() # 2^32 (max SymbolIndex) is 10-digit number prefix = str(symbol_index).zfill(10) self.sub.subscribe(prefix)
[docs] def run(self): """Run the subscriber.""" self.sub.sync() self.sub.recv_multipart()
[docs] def handle_data(self, data): """Handle subscribed data.""" symbol_index, channel_id, serialized_stock_msg = data symbol_index = int(symbol_index) channel_id = int(channel_id) stock_msg = StockMessage() stock_msg.ParseFromString(serialized_stock_msg) for handler in self.handlers: handler.handle_stock_data(symbol_index, channel_id, stock_msg)