Networking

Network diagram.

Network related stuff.

Controller

Network controller.

class oxberrypis.net.controller.Controller(context, vissync_uri, rpisync_uri, pub_uri, subscribers_expected, directory, channels_num, mapping)[source]

Network controller.

The controller starts 3 threads:

Parameters:
  • context – ZMQ context.
  • vissync_uri – ZMQ URI for syncing with the visualisation.
  • rpisync_uri – ZMQ URI for syncing with Raspberry Pis.
  • pub_uri – ZMQ URI controller publishes from.
  • subscribers_expected (integer) – Number of subscribers expected to connect to the controller before publishing starts.
  • directory (string) – Directory for NYSE ARCA Integrated Feed channel and symbol index mapping files.
  • channels_num (integer) – Number of channels to be processed.
  • mapping – symbol index mapping
run()[source]

Run the controller.

Starts all subthreads and loop forever.

Init

Network initialization.

class oxberrypis.net.controller.init.Initializer(context, vissync_uri, rpisync_uri, proxy_uri, to_publishers_pipe, subscribers_expected, mapping)[source]

Network initializer.

Initilizer works as following:

  1. Wait for visualisation.
  2. Once visualisation connects, send back ranges of symbol indexes which will be distributed over the RapsberryPis.
  3. Wait for given number (subscribers_expected) of Raspberry Pis.
  4. For every Raspberry Pi connected send back range of symbol indexes. Each range is sent to two different Raspberry Pis to allow high availability.
  5. Once all Raspberry Pis are connected hand over to the ChannelPublishersThread.
Parameters:
  • context – ZMQ context.
  • vissync_uri – ZMQ URI for syncing with the visualisation.
  • rpisync_uri – ZMQ URI for syncing with Raspberry Pis.
  • proxy_uri – ZMQ URI for publishers proxy’s frontend.
  • to_publishers_pipe – Pipe used to sync Initializer with the ChannelPublishersThread.
  • subscribers_expected (integer) – Number of subscribers expected to connect to the controller before publishing starts.
  • mapping – List of symbol mappings.
create_setup_rpi_msg(pi_id)[source]

Create SetupRPi message.

create_setup_rpi_msg_serialized(pi_id)[source]

Create serialized SetupRPi message.

create_setup_visualisation_msg()[source]

Create SetupVisualisation message.

run()[source]

Run the initializer.

class oxberrypis.net.controller.init.InitializerThread(context, vissync_uri, rpisync_uri, proxy_uri, to_publishers_pipe, subscribers_expected, mapping, name='Initializer')[source]

Initializer thread.

See also

For constructor parameters check Initializer.

run()[source]

Run the initializer thread.

Publisher

Network components responsible for publishing stock messages.

class oxberrypis.net.controller.publisher.ChannelPublishersThread(context, init_pipe, proxy_uri, directory, channels_num, name='ChannelPublishers')[source]

Channel publishers thread.

Parameters:
  • context – ZMQ context.
  • init_pipe – Pipe used to sync with the Initializer.
  • proxy_uri – Destination channel publishers publish to.
  • directory – Directory when channel files are found.
  • channels_num – Number of channels to process. Parsing will start with channel 1.
  • name – Name of the thread; useful for debugging.
run()[source]

Run the channel publishers thread.

class oxberrypis.net.controller.publisher.ProxyThread(context, frontend_uri, backend_uri, name='Proxy')[source]

Pub-sub proxy thread.

Parameters:
  • context – ZMQ context.
  • frontend_uri – ZMQ URI proxy listens to.
  • backend_uri – ZMQ URI proxy rebpublishes to.
  • name – Name of the thread; useful for debugging.
run()[source]

Run the proxy.

Channel

Network components responsible for a single channel parsing/publishing.

class oxberrypis.net.controller.channel.ChannelPublisher(context, uri, directory, channel_id)[source]

Channel publisher.

Parses a single channel and publishes StockMessage messages.

Parameters:
  • context – ZMQ context.
  • uri – ZMQ URI publisher publishes to.
  • directory – Directory with NYSE Arca Integrated Feed channels files.
  • channel_id (integer) – Channel ID to be parsed.
run()[source]

Run the publisher.

class oxberrypis.net.controller.channel.ChannelPublisherThread(context, uri, directory, channel_id, name=None)[source]

Channel publisher thread.

run()[source]

Run the thread.

class oxberrypis.net.controller.channel.ChannelStockMsgsGenerator(directory, channel_id, factory=None)[source]

Stock messages generator from the channel.

Parameters:
  • directory (string) – Directory with NYSE Arca Integrated Feed channels files.
  • channel_id (integer) – Channel ID to be parsed.
  • factory – Factory for StockMessage objects, defaults to an instance of StockMessagesFactory.
generate_stock_msgs()[source]

Generate stock messages (with stock ids).

Yields StockMessage object for every known message found in the channel stream.

Messages factories

Factories for stock messages.

These factories take messages from oxberrypis.parsing.messages and turn them into instances of proto.stock_pb2.StockMessage.

class oxberrypis.net.controller.msgs_factories.StockMessageAddFactory[source]

Factory for add order stock message.

class oxberrypis.net.controller.msgs_factories.StockMessageDeleteFactory[source]

Factory for delete order stock message.

class oxberrypis.net.controller.msgs_factories.StockMessageExecutionFactory[source]

Factory for execution order stock message.

class oxberrypis.net.controller.msgs_factories.StockMessageFactory[source]

Generic factory for stock message.

class oxberrypis.net.controller.msgs_factories.StockMessageModifyFactory[source]

Factory for modify order stock message.

class oxberrypis.net.controller.msgs_factories.StockMessageTradeFactory[source]

Factory for trade stock message.

class oxberrypis.net.controller.msgs_factories.StockMessagesFactory[source]

Factory for stock messages.

Raspberry Pi

Network communcation happening at Raspberry Pi.

Subscriber

Raspberry Pi stock messages subscriber/handler.

class oxberrypis.net.rpi.subscriber.StockMessagesSubscriber(context, rpisync_uri, pub_uri, visual_uri, me_handler_cls=<class 'oxberrypis.net.rpi.handlers.UpdateMatchingEngines'>, visual_handler_cls=<class 'oxberrypis.net.rpi.handlers.ToVisualisation'>)[source]

Stock messages subscriber.

Parameters:
  • context – ZMQ context.
  • rpisync_uri – ZMQ URI for syncing with the Initializer
  • pub_uri – ZMQ URI for stock messages publisher.
  • visual_uri – ZMQ URI visualisation binds to.
  • me_handler_cls – Handler class for updating matching engines.
  • visual_handler_cls – Handler class for sending stock events to visualisation.
handle_data(data)[source]

Handle subscribed data.

run()[source]

Run the subscriber.

sync_reply_handler(data)[source]

Handler for synchronization reply message.

Handlers

Handlers for stock data.

class oxberrypis.net.rpi.handlers.PrintingHandler[source]

Simple printing handler.

Prints stock message to the console on arrival.

class oxberrypis.net.rpi.handlers.ToVisualisation(matching_engines, context, visual_uri)[source]

Stock messages handler that Sends stock evnets to visualisation.

Parameters:
  • matching_engines (list of MatchingEngine) – Mapping stock_index -> matching_engine.
  • context – ZMQ context.
  • visual_uri – ZMQ URI visualisation binds to.
handle_stock_data(stock_id, channel_id, stock_msg)[source]

Handle sending a single stock event to visualisation

make_price_message(stock_msg, stock_id, channel_id)[source]

Makes the message to send to visualisation.

class oxberrypis.net.rpi.handlers.UpdateMatchingEngines(matching_engines)[source]

Stock messages handler for updating matching engines.

Parameters:matching_engines (list of MatchingEngine) – Mapping stock_index -> matching_engine.
handle_stock_data(symbol_index, channel_id, stock_msg)[source]

Update appropriate matching_engine based on stock_msg.

Components

Reusable network components.

class oxberrypis.net.components.PubSubProxy(context, frontend_uri, backend_uri)[source]

Publisher-Subscriber proxy.

Based on:

class oxberrypis.net.components.SynchronizedPublisher(context, publisher_uri, syncservice_uri, subscribers_expected=1, sync_reply=None)[source]

Synchronized publisher.

This publisher and SynchronizedSubscriber are generally based on http://zguide.zeromq.org/py:all#Node-Coordination. However docs notes that:

We can’t assume that the SUB connect will be finished by the time the REQ/REP dialog is complete. There are no guarantees that outbound connects will finish in any order whatsoever, if you’re using any transport except inproc. So, the example does a brute force sleep of one second between subscribing, and sending the REQ/REP synchronization.

A more robust model could be:

  • Publisher opens PUB socket and starts sending “Hello” messages (not data).
  • Subscribers connect SUB socket and when they receive a Hello message they tell the publisher via a REQ/REP socket pair.
  • When the publisher has had all the necessary confirmations, it starts to send real data.

Therefore we implement above mentioned “more robust model”, basin on http://thisthread.blogspot.co.uk/2012/03/pub-sub-coordination-by-req-rep.html.

Parameters:
  • context – ZMQ context.
  • publisher_uri – ZMQ URI the publisher binds to.
  • syncservice_uri – ZMQ URI for publisher’s syncing service REP socket.
  • subscribers_expected – Number of expected subscribers.
  • sync_reply (function accepting sub_id and returning sync reply message.) – Synchronization reply function.
close()[source]

Announce end of publisher stream and close internal sockets.

Sends END_MSG.

handshake(sub_id)[source]

Perform handshake with subscriber if available.

This function checks for subscribers synchronization on the REP socket.

Parameters:sub_id – Subscriber-to-be id.
ping()[source]

Broadcast a ping message.

A “ping” message is sent on the PUB socket to show the subscribers that the publisher is up and waiting for them.

Sends PING_MSG.

publish(data)[source]

Send data to subscribers.

publish_enveloped(key, data)[source]

Send enveloped data to subscribers.

Based on http://zguide.zeromq.org/py:all#Pub-Sub-Message-Envelopes

sync()[source]

Synchronize with subscribers.

class oxberrypis.net.components.SynchronizedSubscriber(context, publisher_uri, syncservice_uri, subscriptions, msg_handler, sync_reply_handler=None)[source]

Synchronized subscriber.

See also

Check SynchronizedPublisher for implementation details.

Parameters:
  • context – ZMQ context.
  • publisher_uri – ZMQ URI of the publisher.
  • syncservice_uri – ZMQ URI of the publisher’s REP socket for syncing.
  • subscriptions – A list of prefixes to subscribe to.
  • msg_handler – Function for handling of subscribed data.
  • sync_reply_handler – Function for handling synchronization reply.
close()[source]

Close internal sockets.

recv()[source]

Receive messages and proccess them using msg_handler in a loop.

Stops when SynchronizedPublisher.END_MSG is received.

recv_multipart()[source]

Receive mulitpart messages and process them using msg_handler in a loop.

Stops when SynchronizedPublisher.END_MSG is received.

To be used with pub-sub message envelopes; based on http://zguide.zeromq.org/py:all#Pub-Sub-Message-Envelopes

subscribe(subscription)[source]

Subscribe to subscription.

sync()[source]

Synchronize with the publisher.

unsubscribe(subscription)[source]

Unsubscribe from subscription.

Table Of Contents

Previous topic

Group report

Next topic

Parsing

This Page