Source code for oxberrypis.net.controller.publisher
"""Network components responsible for publishing stock messages."""
import threading
from .channel import ChannelPublisherThread
from ..components import PubSubProxy
[docs]class ProxyThread(threading.Thread):
"""Pub-sub proxy thread.
:param context: ZMQ context.
:param frontend_uri: ZMQ URI proxy listens to.
:param backend_uri: ZMQ URI proxy rebpublishes to.
:param name: Name of the thread; useful for debugging.
"""
def __init__(self, context, frontend_uri, backend_uri, name="Proxy"):
super(ProxyThread, self).__init__(name=name)
self.proxy = PubSubProxy(
context,
frontend_uri,
backend_uri,
)
[docs] def run(self):
"""Run the proxy."""
self.proxy.run()
[docs]class ChannelPublishersThread(threading.Thread):
"""Channel publishers thread.
:param context: ZMQ context.
:param init_pipe: Pipe used to sync with the :py:class:`Initializer`.
:param proxy_uri: Destination channel publishers publish to.
:param directory: Directory when channel files are found.
:param channels_num: Number of channels to process. Parsing will
start with channel 1.
:param name: Name of the thread; useful for debugging.
"""
def __init__(self, context, init_pipe, proxy_uri, directory,
channels_num, name="ChannelPublishers"):
super(ChannelPublishersThread, self).__init__(name=name)
self.init_pipe = init_pipe
self.cp_threads = []
for channel_id in xrange(1, channels_num + 1):
cp_thread = ChannelPublisherThread(
context,
proxy_uri,
directory,
channel_id,
)
cp_thread.daemon = True
self.cp_threads.append(cp_thread)
[docs] def run(self):
"""Run the channel publishers thread."""
# Wait for signal from initializer
self.init_pipe.recv()
for cp_thread in self.cp_threads:
cp_thread.start()