Source code for oxberrypis.net.rpi.handlers

"""Handlers for stock data."""
import zmq

from ...orderbook.order import Order

from ..proto.stock_pb2 import StockMessage
from ..proto.rpi_pb2 import StockEvent


[docs]class PrintingHandler(object): """Simple printing handler. Prints stock message to the console on arrival. """ def handle_stock_data(self, symbol_index, channel_id, stock_msg): print stock_msg
[docs]class UpdateMatchingEngines(object): """Stock messages handler for updating matching engines. :param matching_engines: Mapping ``stock_index`` -> ``matching_engine``. :type matching_engines: list of :py:class:`.MatchingEngine` """ def __init__(self, matching_engines): self.matching_engines = matching_engines
[docs] def handle_stock_data(self, symbol_index, channel_id, stock_msg): """Update appropriate matching_engine based on ``stock_msg``.""" if stock_msg.type == StockMessage.ADD: add_msg = stock_msg.add stock_id = add_msg.symbol_index order_id = add_msg.order_id limit_price = add_msg.price num_shares = add_msg.volume if add_msg.side == add_msg.BUY: side = Order.BUY else: side = Order.SELL self.matching_engines[stock_id].add_order( order_id, limit_price, num_shares, side, ) return stock_id elif stock_msg.type == StockMessage.MODIFY: modify_msg = stock_msg.modify stock_id = modify_msg.symbol_index order_id = modify_msg.order_id limit_price = modify_msg.price num_shares = modify_msg.volume if modify_msg.side == modify_msg.BUY: side = Order.BUY else: side = Order.SELL self.matching_engines[stock_id].update_order( order_id, limit_price, num_shares, side, ) return stock_id elif stock_msg.type == StockMessage.DELETE: delete_msg = stock_msg.delete stock_id = delete_msg.symbol_index order_id = delete_msg.order_id if delete_msg.side == delete_msg.BUY: side = Order.BUY else: side = Order.SELL self.matching_engines[stock_id].remove_order(order_id) return stock_id elif stock_msg.type == StockMessage.EXECUTE: exec_msg = stock_msg.execution stock_id = exec_msg.symbol_index order_id = exec_msg.order_id limit_price = exec_msg.price num_shares = exec_msg.volume matching_engine = self.matching_engines[stock_id] if exec_msg.reason_code == exec_msg.FILLED: self.matching_engine.remove_order(order_id) elif exec_msg.reason_code == exec_msg.PARTIAL: self.matching_engine.decrease_order_amount_by( order_id, num_shares, ) else: # otherwise do nothing (this type of message may be useless) pass return stock_id elif stock_msg.type == StockMessage.TRADE: trade_msg = stock_msg.trade stock_id = trade_msg.symbol_index return stock_id else: raise OxBerryPisExecption("Invalid Message Type")
[docs]class ToVisualisation(object): """Stock messages handler that Sends stock evnets to visualisation. :param matching_engines: Mapping ``stock_index`` -> ``matching_engine``. :type matching_engines: list of :py:class:`.MatchingEngine` :param context: ZMQ context. :param visual_uri: ZMQ URI visualisation binds to. """ def __init__(self, matching_engines, context, visual_uri): self.matching_engines = matching_engines self.to_visualisation = context.socket(zmq.PUSH) self.to_visualisation.connect(visual_uri) self.last_top_buy = {} self.last_top_sell = {} self.seq_nums = {}
[docs] def make_price_message(self, stock_msg, stock_id, channel_id): """Makes the message to send to visualisation.""" matching_engine = self.matching_engines[stock_id] top_sell_order, top_buy_order = matching_engine.get_best_orders() if top_buy_order is not None: top_buy_price = top_buy_order.price else: top_buy_price = None if top_sell_order is not None: top_sell_price = top_sell_order.price else: top_sell_price = None if (stock_msg.type != StockMessage.TRADE and stock_id in self.last_top_buy and stock_id in self.last_top_sell and self.last_top_buy[stock_id] == top_buy_price and self.last_top_sell[stock_id] == top_sell_price): return None self.last_top_buy[stock_id] = top_buy_price self.last_top_sell[stock_id] = top_sell_price stock_event = StockEvent() stock_event.stock_id = stock_id stock_event.timestamp_s = stock_msg.packet_time stock_event.timestamp_ns = stock_msg.packet_time_ns stock_event.channel_id = channel_id if stock_id in self.seq_nums: new_seq_num = self.seq_nums[stock_id] + 1 else: new_seq_num = 0 self.seq_nums[stock_id] = new_seq_num stock_event.seq_num = new_seq_num if top_buy_price is not None: stock_event.top_buy_price = top_buy_price if top_sell_price is not None: stock_event.top_sell_price = top_sell_price if stock_msg.type == StockMessage.TRADE: stock_event.trade_price = stock_msg.trade.price return stock_event
[docs] def handle_stock_data(self, stock_id, channel_id, stock_msg): """Handle sending a single stock event to visualisation""" stock_event = self.make_price_message(stock_msg, stock_id, channel_id) if stock_event is not None: serialized_stock_event = stock_event.SerializeToString() self.to_visualisation.send(serialized_stock_event)