Source code for oxberrypis.net.controller.init

"""Network initialization."""
import zmq
import threading

from ...utils import chunks

from ..components import SynchronizedPublisher

from ..proto.controller_pb2 import SetupVisualisation
from ..proto.controller_pb2 import SetupRPi


[docs]class Initializer(object): """Network initializer. Initilizer works as following: #. Wait for visualisation. #. Once visualisation connects, send back ranges of symbol indexes which will be distributed over the RapsberryPis. #. Wait for given number (:attr:`subscribers_expected`) of RaspberryPis. #. For every RaspberryPi connected send back range of symbol indexes. Each range is sent to two different RaspberryPis to allow high availability. #. Once all RaspberryPis are connected hand over to the :py:class:`.ChannelPublishersThread`. :param context: ZMQ context. :param vissync_uri: ZMQ URI for syncing with the visualisation. :param rpisync_uri: ZMQ URI for syncing with RaspberryPis. :param proxy_uri: ZMQ URI for publishers proxy's frontend. :param to_publishers_pipe: Pipe used to sync :py:class:`Initializer` with the :py:class:`.ChannelPublishersThread`. :param subscribers_expected: Number of subscribers expected to connect to the controller before publishing starts. :type subscribers_expected: integer :param mapping: List of symbol mappings. """ def __init__(self, context, vissync_uri, rpisync_uri, proxy_uri, to_publishers_pipe, subscribers_expected, mapping): self.vissync = context.socket(zmq.REP) self.vissync.bind(vissync_uri) self.to_publishers_pipe = to_publishers_pipe self.subscribers_expected = subscribers_expected self.ranges = list(chunks(mapping, subscribers_expected)) self.syncpub = SynchronizedPublisher( context, proxy_uri, rpisync_uri, subscribers_expected, self.create_setup_rpi_msg_serialized, )
[docs] def create_setup_rpi_msg(self, pi_id): """Create :py:class:`SetupRPi` message.""" assert pi_id >= 0 and pi_id < self.subscribers_expected count = len(self.ranges) range1 = self.ranges[pi_id] range2 = self.ranges[pi_id + 1 % count] setup_rpi = SetupRPi() for mapping in range1 + range2: symbol_index = mapping[1] setup_rpi.symbol_index.append(symbol_index) return setup_rpi
[docs] def create_setup_rpi_msg_serialized(self, pi_id): """Create serialized :py:class:`SetupRPi` message.""" setup_rpi = self.create_setup_rpi_msg(pi_id) return setup_rpi.SerializeToString()
[docs] def create_setup_visualisation_msg(self): """Create SetupVisualisation message.""" setup_vis = SetupVisualisation() for index_range in self.ranges: current_range = setup_vis.range.add() for mapping in index_range: current_mapping = current_range.mapping.add() current_mapping.symbol = mapping[0] current_mapping.symbol_index = mapping[1] current_mapping.price_scale_code = mapping[2] return setup_vis
[docs] def run(self): """Run the initializer.""" setup_vis = self.create_setup_visualisation_msg() setup_vis_serialized = setup_vis.SerializeToString() # Wait for signal from visualisation self.vissync.recv() self.vissync.send(setup_vis_serialized) # Wait for RPis self.syncpub.sync() # Signal to ChannelPublishersThread self.to_publishers_pipe.send('')
[docs]class InitializerThread(threading.Thread): """:py:class:`Initializer` thread. .. seealso :: For constructor parameters check :py:class:`Initializer`. """ def __init__(self, context, vissync_uri, rpisync_uri, proxy_uri, to_publishers_pipe, subscribers_expected, mapping, name="Initializer"): super(InitializerThread, self).__init__(name=name) self.initializer = Initializer( context, vissync_uri, rpisync_uri, proxy_uri, to_publishers_pipe, subscribers_expected, mapping, )
[docs] def run(self): """Run the initializer thread.""" self.initializer.run()