Source code for bcx.channels

import os
import logging
from typing import Dict, List

from bcx.utils import timestamp_to_datetime
from bcx.websocket import BlockchainWebsocket
from bcx.orders import Order


[docs]class Channel: """Base class for all channels""" def __init__(self, name: str, ws: BlockchainWebsocket): self.name = name self._ws = ws self.is_subscribed = False @property def extra_message(self) -> Dict: """Additional message to be send to server""" return dict()
[docs] def subscribe(self): """Subscribe to a channel""" self._ws.send_json({ "action": "subscribe", "channel": self.name, **self.extra_message })
[docs] def unsubscribe(self): """Unsubscribe from a channel""" self._ws.send_json({ "action": "unsubscribe", "channel": self.name, **self.extra_message })
[docs] def on_event(self, event_type: str, event_response: Dict): """Perform action based on event type received from server Parameters ---------- event_type : str event_response : Dict """ if event_type == "subscribed": self.is_subscribed = True self.on_subscribe() elif event_type == "unsubscribed": self.is_subscribed = False self.on_unsubscribe() elif event_type == "rejected": self.on_reject(event_response) elif event_type == "snapshot": self.on_snapshot(event_response) elif event_type == "updated": self.on_update(event_response)
[docs] def on_subscribe(self): """Perform action upon **subscribe** event message received from server""" pass
[docs] def on_unsubscribe(self): """Perform action upon **unsubscribe** event message received from server""" pass
[docs] def on_reject(self, event_response: Dict): """Perform action upon **reject** event message received from server Parameters ---------- event_response : Dict """ pass
[docs] def on_snapshot(self, event_response: Dict): """Perform action upon **snapshot** event message received from server Parameters ---------- event_response : Dict """ pass
[docs] def on_update(self, event_response: Dict): """Perform action upon **update** event message received from server Parameters ---------- event_response : Dict """ pass
[docs]class HeartbeatChannel(Channel): """Representation of `heartbeat <https://exchange.blockchain.com/api/#heartbeat>`_ channel Parameters ---------- name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool last_heartbeat : datetime """ def __init__(self, ws, name): super().__init__(ws=ws, name=name) self.last_heartbeat = None def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(is_subscribed={self.is_subscribed})"
[docs] def on_update(self, event_response): self.last_heartbeat = timestamp_to_datetime(event_response["timestamp"])
[docs]class OrderbookChannel(Channel): """Representation of generic order book channel Parameters ---------- symbol : str name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshot : Dict[str, List] updates : Dict[str, List] """ def __init__(self, symbol, ws, name): super().__init__(ws=ws, name=name) self.symbol = symbol self.snapshot = {"asks": [], "bids": []} self.updates = {"asks": [], "bids": []} def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(symbol={self.symbol}, is_subscribed={self.is_subscribed})" @property def extra_message(self) -> Dict: return { "symbol": self.symbol }
[docs] def on_snapshot(self, event_response): for key in self.snapshot: self.snapshot[key] = event_response.pop(key)
[docs] def on_update(self, event_response): for key in self.updates: update = event_response.pop(key) if update: self.updates[key].append(update)
[docs]class OrderbookL2Channel(OrderbookChannel): """Representation of `L2 order book <https://exchange.blockchain.com/api/#l2-order-book>`_ channel Parameters ---------- symbol : str name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshot : Dict[str, List] updates : Dict[str, List] """ def __init__(self, symbol, ws, name): super().__init__(symbol=symbol, ws=ws, name=name) @property def extra_message(self) -> Dict: return super().extra_message
[docs] def on_snapshot(self, event_response): super().on_snapshot(event_response)
[docs] def on_update(self, event_response): super().on_update(event_response)
[docs]class OrderbookL3Channel(OrderbookChannel): """Representation of `L3 order book <https://exchange.blockchain.com/api/#l3-order-book>`_ channel Parameters ---------- symbol : str name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshot : Dict[str, List] updates : Dict[str, List] """ def __init__(self, symbol, ws, name): super().__init__(symbol=symbol, ws=ws, name=name) @property def extra_message(self) -> Dict: return super().extra_message
[docs] def on_snapshot(self, event_response): super().on_snapshot(event_response)
[docs] def on_update(self, event_response): super().on_update(event_response)
[docs]class PricesChannel(Channel): """Representation of `prices <https://exchange.blockchain.com/api/#prices>`_ channel Parameters ---------- symbol : str granularity : int name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool updates : List """ def __init__(self, symbol, granularity, ws, name): super().__init__(ws=ws, name=name) self.symbol = symbol self.granularity = granularity self.updates = [] def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(symbol={self.symbol}, granularity={self.granularity}, is_subscribed={self.is_subscribed})" @property def extra_message(self) -> Dict: return { "symbol": self.symbol, "granularity": self.granularity } @property def last_price(self) -> List: """Last available price from this channel""" return self.updates[-1] if len(self.updates) > 0 else []
[docs] def on_update(self, event_response): self.updates.append(event_response.pop("price"))
[docs]class SymbolsChannel(Channel): """Representation of `symbols <https://exchange.blockchain.com/api/#symbols>`_ channel Parameters ---------- name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshot : Dict[str, List] updates : Dict[str, List] """ def __init__(self, ws, name): super().__init__(ws=ws, name=name) self.snapshot = dict() self.updates = dict() def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(is_subscribed={self.is_subscribed})"
[docs] def on_snapshot(self, event_response: Dict): self.snapshot = event_response.pop("symbols")
[docs] def on_update(self, event_response): symbol = event_response.pop("symbol") if symbol in self.updates and self.updates[symbol]: self.updates[symbol].append(event_response) else: self.updates[symbol] = list(event_response)
[docs]class TickerChannel(Channel): """Representation of `ticker <https://exchange.blockchain.com/api/#ticker>`_ channel Parameters ---------- symbol : str name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshots : List updates : List """ def __init__(self, symbol, ws, name): super().__init__(ws=ws, name=name) self.symbol = symbol self.snapshots = [] self.updates = [] def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(symbol={self.symbol}, is_subscribed={self.is_subscribed})" @property def extra_message(self) -> Dict: return { "symbol": self.symbol }
[docs] def on_snapshot(self, event_response: Dict): self.snapshots.append(event_response)
[docs] def on_update(self, event_response: Dict): self.updates.append(event_response)
[docs]class TradesChannel(Channel): """Representation of `trades <https://exchange.blockchain.com/api/#trades>`_ channel Parameters ---------- symbol : str name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool updates : List """ def __init__(self, symbol, ws, name): super().__init__(ws=ws, name=name) self.symbol = symbol self.updates = [] def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(symbol={self.symbol}, is_subscribed={self.is_subscribed})" @property def extra_message(self) -> Dict: return { "symbol": self.symbol }
[docs] def on_update(self, event_response: Dict): self.updates.append(event_response)
[docs]class AuthChannel(Channel): """Representation of `auth <https://exchange.blockchain.com/api/#authenticated-channels>`_ channel Parameters ---------- name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool api_secret : str is_authenticated : bool """ def __init__(self, ws, name): super().__init__(ws=ws, name=name) api_secret = os.environ.get("BLOCKCHAIN_API_SECRET") if not api_secret: logging.warning("Missing credentials for subscriptions to authenticated channel") self.api_secret = api_secret self.is_authenticated = False def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(is_subscribed={self.is_subscribed})" @property def extra_message(self) -> Dict: return { "token": self.api_secret }
[docs] def on_subscribe(self): self.is_authenticated = True
[docs]class TradingChannel(Channel): """Representation of `trading <https://exchange.blockchain.com/api/#trading>`_ channel Parameters ---------- name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshot : List updates : List rejects : List open_orders : set """ def __init__(self, ws, name): super().__init__(ws=ws, name=name) self.is_authenticated = False self.snapshot = [] self.updates = [] self.rejects = [] self.open_orders = set() def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(is_subscribed={self.is_subscribed})"
[docs] def on_snapshot(self, event_response: Dict): orders = event_response.pop("orders") self.snapshot = orders for order in orders: self.open_orders.add(order["orderID"])
[docs] def on_update(self, event_response: Dict): self.updates.append(event_response) if event_response["ordStatus"] == "open": self.open_orders.add(event_response["orderID"]) elif event_response["ordStatus"] == "filled": self.open_orders.remove(event_response["orderID"])
[docs] def on_reject(self, event_response: Dict): self.rejects.append(event_response)
[docs] def create_order(self, order: Order): """Send create order message Parameters ---------- order : Order """ logging.info(f"Submitting order {order.to_json()}") self._ws.send_json({ "action": "NewOrderSingle", "channel": self.name, **order.to_json() })
[docs] def cancel_order(self, order_id): """Send cancel order message Parameters ---------- order_id : str """ self._ws.send_json({ "action": "CancelOrderRequest", "channel": self.name, "orderID": order_id })
[docs] def cancel_all_orders(self): """Send messages to cancel all open orders""" for order_id in self.open_orders: self.cancel_order(order_id=order_id)
[docs]class BalancesChannel(Channel): """Representation of `balances <https://exchange.blockchain.com/api/#balances>`_ channel Parameters ---------- name : str ws : BlockchainWebsocket Attributes ---------- is_subscribed : bool snapshots : List """ def __init__(self, ws, name): super().__init__(ws=ws, name=name) self.is_authenticated = False self.snapshots = [] def __repr__(self): class_name = self.__class__.__name__ return f"{class_name}(is_subscribed={self.is_subscribed})"
[docs] def on_snapshot(self, event_response): self.snapshots.append(event_response["balances"])
[docs]class ChannelFactory: """Class to create any channel Attributes ---------- channels : Dict[str, Channel] """ def __init__(self): self.channels = { "heartbeat": HeartbeatChannel, "l2": OrderbookL2Channel, "l3": OrderbookL3Channel, "prices": PricesChannel, "symbols": SymbolsChannel, "ticker": TickerChannel, "trades": TradesChannel, "auth": AuthChannel, "trading": TradingChannel, "balances": BalancesChannel, }
[docs] def create_channel(self, name, ws, **kwargs): """Create channel Parameters ---------- name : str ws : BlockchainWebsocket kwargs : dict Returns ------- Channel """ return self.channels[name](ws=ws, name=name, **kwargs)