import time
import logging
from datetime import datetime
from typing import List, Dict
from bcx.orders import Order, MarketOrder, LimitOrder
from bcx.manager import ChannelManager
from bcx.channels import Channel, TradingChannel, HeartbeatChannel, AuthChannel, PricesChannel
[docs]class BlockchainWebsocketClient:
"""High level API to interact with Blockchain Exchange
Attributes
----------
channel_manager : ChannelManager
"""
def __init__(self):
self.channel_manager = ChannelManager()
def _subscribe_to_channel(self, name: str, **channel_params):
"""Generic interface to subscribe to channels"""
channel = self.get_channel(name, **channel_params)
if channel and not channel.is_subscribed:
channel.subscribe()
def _unsubscribe_from_channel(self, name: str, **channel_params):
"""Generic interface to unsubscribe from channels"""
channel = self.get_channel(name, **channel_params)
if channel and channel.is_subscribed:
channel.unsubscribe()
@property
def _is_authenticated(self) -> bool:
"""Check if client can connect to authenticated channels"""
channel: AuthChannel = self.get_channel("auth")
return channel.is_authenticated
def _auth(self):
"""Subscribe to `auth <https://exchange.blockchain.com/api/#authenticated-channels>`_ channel"""
self._subscribe_to_channel(
name="auth",
)
[docs] def subscribe_to_heartbeat(self):
"""Subscribe to `heartbeat <https://exchange.blockchain.com/api/#heartbeat>`_ channel"""
self._subscribe_to_channel(
name="heartbeat"
)
[docs] def subscribe_to_orderbook_l2(self, symbol: str):
"""Subscribe to `L2 order book <https://exchange.blockchain.com/api/#l2-order-book>`_ channel"""
self._subscribe_to_channel(
name="l2",
symbol=symbol,
)
[docs] def subscribe_to_orderbook_l3(self, symbol: str):
"""Subscribe to `L3 order book <https://exchange.blockchain.com/api/#l3-order-book>`_ channel"""
self._subscribe_to_channel(
name="l3",
symbol=symbol,
)
[docs] def subscribe_to_prices(self, symbol: str, granularity: int):
"""Subscribe to `prices <https://exchange.blockchain.com/api/#prices>`_ channel"""
supported_granularities = [60, 300, 900, 3600, 21600, 86400]
if granularity not in supported_granularities:
logging.error(f"Granularity '{granularity}' is not supported. Should be one of {supported_granularities}.")
else:
self._subscribe_to_channel(
name="prices",
symbol=symbol,
granularity=granularity,
)
[docs] def subscribe_to_symbols(self):
"""Subscribe to `symbols <https://exchange.blockchain.com/api/#symbols>`_ channel"""
self._subscribe_to_channel(
name="symbols",
)
[docs] def subscribe_to_ticker(self, symbol: str):
"""Subscribe to `ticker <https://exchange.blockchain.com/api/#ticker>`_ channel"""
channel_params = {
"symbol": symbol
}
self._subscribe_to_channel(
name="ticker",
**channel_params
)
[docs] def subscribe_to_trades(self, symbol: str):
"""Subscribe to `trades <https://exchange.blockchain.com/api/#trades>`_ channel"""
self._subscribe_to_channel(
name="trades",
symbol=symbol,
)
[docs] def subscribe_to_trading(self):
"""Subscribe to `trading <https://exchange.blockchain.com/api/#trading>`_ channel"""
self._auth()
while not self._is_authenticated:
logging.info("Waiting for authentication")
time.sleep(0.5)
self._subscribe_to_channel(
name="trading",
)
[docs] def subscribe_to_balances(self):
"""Subscribe to `balances <https://exchange.blockchain.com/api/#balances>`_ channel"""
self._auth()
while not self._is_authenticated:
logging.info("Waiting for authentication")
time.sleep(0.5)
self._subscribe_to_channel(
name="balances",
)
@property
def available_channels(self) -> List[str]:
"""List of all available channels on Blockchain Exchange"""
return self.channel_manager.available_channel_names
@property
def connected_channels(self) -> List[Channel]:
"""List of all channels that you can interact with"""
return self.channel_manager.get_all_channels()
[docs] def get_channel(self, name: str, **channel_params) -> Channel:
"""Get connection to a channel of interest
Parameters
----------
name: str
Name of the channel
channel_params: Dict
Parameters used to subscribe to channel
Returns
-------
channel: Channel
"""
channel = None
if name not in self.available_channels:
logging.error(f"Channel '{name}' is not supported. Select one from {self.available_channels}")
else:
channel = self.channel_manager.get_channel(name=name, **channel_params)
return channel
[docs] def get_last_heartbeat(self) -> datetime:
"""Get last heartbeat"""
channel: HeartbeatChannel = self.get_channel("heartbeat")
return channel.last_heartbeat
[docs] def get_trading_channel(self) -> TradingChannel:
"""Get connection to `trading <https://exchange.blockchain.com/api/#trading>`_ channel
Returns
-------
channel : TradingChannel
"""
channel = self.get_channel("trading")
while not channel.is_subscribed:
time.sleep(0.5)
logging.warning("You need to be subscribed to 'trading' channel before you can communicate with it")
return channel
[docs] def get_prices_channel(self, symbol:str, granularity: int) -> PricesChannel:
"""Get connection to `prices <https://exchange.blockchain.com/api/#prices>`_ channel
Parameters
----------
symbol
granularity
Returns
-------
channel : PricesChannel
"""
channel = self.get_channel(
"prices",
symbol=symbol,
granularity=granularity,
)
while not channel.is_subscribed:
time.sleep(0.5)
logging.warning("You need to be subscribed to 'prices' channel before you can communicate with it")
return channel
[docs] def create_order(self, order: Order):
"""Create generic order
Parameters
----------
order : Order
"""
if order.is_valid:
channel = self.get_trading_channel()
channel.create_order(order=order)
else:
logging.error(f"Order is not valid: {order.to_json()}")
[docs] def create_market_order(self, symbol: str, side: str, quantity: float, time_in_force: str, order_id: str = None):
"""Create market order
Parameters
----------
symbol
side
quantity
time_in_force
order_id
"""
order = MarketOrder(
symbol=symbol,
side=side,
quantity=quantity,
time_in_force=time_in_force,
order_id=order_id,
)
self.create_order(order=order)
[docs] def create_limit_order(self, price: float, symbol: str, side: str, quantity: float, time_in_force: str, order_id: str = None):
"""Create limit order
Parameters
----------
price
symbol
side
quantity
time_in_force
order_id
"""
order = LimitOrder(
price=price,
symbol=symbol,
side=side,
quantity=quantity,
time_in_force=time_in_force,
order_id=order_id,
)
self.create_order(order=order)
[docs] def cancel_order(self, order_id):
"""Cancel order
Parameters
----------
order_id
"""
channel = self.get_trading_channel()
channel.cancel_order(order_id=order_id)
[docs] def cancel_all_orders(self):
"""Cancel all orders"""
channel = self.get_trading_channel()
channel.cancel_all_orders()