Source code for oxberrypis.net.components

"""Reusable network components."""
import zmq
import time


[docs]class PubSubProxy(object): """Publisher-Subscriber proxy. Based on: * http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem * http://zguide.zeromq.org/py:all#-MQ-s-Built-In-Proxy-Function """ def __init__(self, context, frontend_uri, backend_uri): self.context = context self.frontend_uri = frontend_uri self.backend_uri = backend_uri # Socket facing clients self.frontend = self.context.socket(zmq.XSUB) self.frontend.bind(self.frontend_uri) # Socket facing services self.backend = self.context.socket(zmq.XPUB) self.backend.bind(self.backend_uri) def run(self): zmq.device(zmq.QUEUE, self.frontend, self.backend) # We never get here... self.frontend.close() self.backend.close()
[docs]class SynchronizedPublisher(object): """Synchronized publisher. This publisher and :py:class:`SynchronizedSubscriber` are generally based on http://zguide.zeromq.org/py:all#Node-Coordination. However docs notes that: We can't assume that the SUB connect will be finished by the time the REQ/REP dialog is complete. There are no guarantees that outbound connects will finish in any order whatsoever, if you're using any transport except inproc. So, the example does a brute force sleep of one second between subscribing, and sending the REQ/REP synchronization. A more robust model could be: * Publisher opens PUB socket and starts sending "Hello" messages (not data). * Subscribers connect SUB socket and when they receive a Hello message they tell the publisher via a REQ/REP socket pair. * When the publisher has had all the necessary confirmations, it starts to send real data. Therefore we implement above mentioned "more robust model", basin on http://thisthread.blogspot.co.uk/2012/03/pub-sub-coordination-by-req-rep.html. :param context: ZMQ context. :param publisher_uri: ZMQ URI the publisher binds to. :param syncservice_uri: ZMQ URI for publisher's syncing service REP socket. :param subscribers_expected: Number of expected subscribers. :param sync_reply: Synchronization reply function. :type sync_reply: function accepting sub_id and returning sync reply message. """ PING_MSG = 'PING' END_MSG = 'END' def __init__(self, context, publisher_uri, syncservice_uri, subscribers_expected=1, sync_reply=None): self.context = context self.subscribers_expected = subscribers_expected self.publisher_uri = publisher_uri self.syncservice_uri = syncservice_uri # Socket to talk to clients self.publisher = self.context.socket(zmq.PUB) # Normally, we should bind, but in OxBerryPis # we connect to the pub-sub proxy #self.publisher.bind(self.publisher_uri) self.publisher.connect(self.publisher_uri) # Socket to receive signals self.syncservice = self.context.socket(zmq.REP) self.syncservice.bind(self.syncservice_uri) self.sync_reply = sync_reply or (lambda id: '')
[docs] def sync(self): """Synchronize with subscribers.""" # Get synchronization from subscribers subscribers = 0 while subscribers < self.subscribers_expected: self.ping() if self.handshake(subscribers): subscribers += 1
[docs] def ping(self): """Broadcast a ping message. A "ping" message is sent on the PUB socket to show the subscribers that the publisher is up and waiting for them. Sends :attr:`PING_MSG`. """ self.publish(self.PING_MSG)
[docs] def handshake(self, sub_id): """Perform handshake with subscriber if available. This function checks for subscribers synchronization on the REP socket. :param sub_id: Subscriber-to-be id. """ # Sleep one second, to give time to the subscribers to connect. time.sleep(1) # Check on the socket for an empty message. try: msg = self.syncservice.recv(zmq.DONTWAIT) except zmq.ZMQError: return False # send synchronization reply reply = self.sync_reply(sub_id) self.syncservice.send(reply) return True
[docs] def publish(self, data): """Send data to subscribers.""" self.publisher.send(data)
[docs] def publish_enveloped(self, key, data): """Send enveloped data to subscribers. Based on http://zguide.zeromq.org/py:all#Pub-Sub-Message-Envelopes """ self.publisher.send_mulipart([key, data])
[docs] def close(self): """Announce end of publisher stream and close internal sockets. Sends :attr:`END_MSG`. """ self.publish(self.END_MSG) self.publisher.close()
[docs]class SynchronizedSubscriber(object): """Synchronized subscriber. .. seealso:: Check :py:class:`SynchronizedPublisher` for implementation details. :param context: ZMQ context. :param publisher_uri: ZMQ URI of the publisher. :param syncservice_uri: ZMQ URI of the publisher's REP socket for syncing. :param subscriptions: A list of prefixes to subscribe to. :param msg_handler: Function for handling of subscribed data. :param sync_reply_handler: Function for handling synchronization reply. """ def __init__(self, context, publisher_uri, syncservice_uri, subscriptions, msg_handler, sync_reply_handler=None): self.context = context self.publisher_uri = publisher_uri self.syncservice_uri = syncservice_uri self.subscriptions = subscriptions self.msg_handler = msg_handler self.sync_reply_handler = sync_reply_handler or (lambda _: None) # First, connect our subscriber socket self.subscriber = self.context.socket(zmq.SUB) self.subscriber.connect(self.publisher_uri) self.subscribe(SynchronizedPublisher.PING_MSG) self.subscribe(SynchronizedPublisher.END_MSG) for subscription in self.subscriptions: self.subscribe(subscription)
[docs] def subscribe(self, subscription): """Subscribe to ``subscription``.""" self.subscriber.setsockopt(zmq.SUBSCRIBE, subscription)
[docs] def unsubscribe(self, subscription): """Unsubscribe from ``subscription``.""" self.subscriber.setsockopt(zmq.UNSUBSCRIBE, subscription)
[docs] def sync(self): """Synchronize with the publisher.""" # Wait for dummy data from the SychronizedPublisher data = self.subscriber.recv() # Second, synchronize with publisher self.syncclient = self.context.socket(zmq.REQ) self.syncclient.connect(self.syncservice_uri) # send a synchronization request self.syncclient.send('') # wait for synchronization reply sync_reply = self.syncclient.recv() self.sync_reply_handler(sync_reply) self.unsubscribe(SynchronizedPublisher.PING_MSG)
[docs] def recv(self): """Receive messages and proccess them using :attr:`msg_handler` in a loop. Stops when :py:attr:`SynchronizedPublisher.END_MSG` is received. """ while True: data = self.subscriber.recv() if data == SynchronizedPublisher.END_MSG: break self.msg_handler(data)
[docs] def recv_multipart(self): """Receive mulitpart messages and process them using :attr:`msg_handler` in a loop. Stops when :py:attr:`SynchronizedPublisher.END_MSG` is received. To be used with pub-sub message envelopes; based on http://zguide.zeromq.org/py:all#Pub-Sub-Message-Envelopes """ # Third, get our updates and report how many we got while True: data = self.subscriber.recv_multipart() if data[0] == SynchronizedPublisher.END_MSG: break self.msg_handler(data)
[docs] def close(self): """Close internal sockets.""" self.syncclient.close() self.subscriber.close()