Source code for oxberrypis.net.controller
"""Network controller."""
from ..zhelpers import zpipe
from .init import InitializerThread
from .publisher import ProxyThread
from .publisher import ChannelPublishersThread
[docs]class Controller(object):
"""Network controller.
The controller starts 3 threads:
* :py:class:`.ProxyThread`
* :py:class:`.InitializerThread`
* :py:class:`.ChannelPublishersThread`
:param context: ZMQ context.
:param vissync_uri: ZMQ URI for syncing with the visualisation.
:param rpisync_uri: ZMQ URI for syncing with RaspberryPis.
:param pub_uri: ZMQ URI controller publishes from.
:param subscribers_expected: Number of subscribers expected to connect
to the controller before publishing starts.
:type subscribers_expected: integer
:param directory: Directory for NYSE ARCA Integrated Feed channel and
symbol index mapping files.
:type directory: string
:param channels_num: Number of channels to be processed.
:type channels_num: integer
:param mapping: symbol index mapping
"""
def __init__(self, context, vissync_uri, rpisync_uri,
pub_uri, subscribers_expected,
directory, channels_num, mapping):
# Proxy thread
proxy_uri = 'inproc://channel-publishers'
self.proxy_thread = ProxyThread(
context,
proxy_uri,
pub_uri,
)
self.proxy_thread.daemon = True
# Pipe for syncing Initializer and ChannelPublishers threads.
pipe = zpipe(context)
# Initializer thread
self.initializer_thread = InitializerThread(
context,
vissync_uri,
rpisync_uri,
proxy_uri,
pipe[0],
subscribers_expected,
mapping,
)
self.initializer_thread.daemon = True
# Publishers thread
self.publishers_thread = ChannelPublishersThread(
context,
pipe[1],
proxy_uri,
directory,
channels_num,
)
self.publishers_thread.daemon = True
[docs] def run(self):
"""Run the controller.
Starts all subthreads and loop forever.
"""
self.proxy_thread.start()
self.initializer_thread.start()
self.publishers_thread.start()
while True:
pass