Source code for bcx.manager

import json
import logging
from typing import Dict, List

from bcx.websocket import BlockchainWebsocket
from bcx.channels import ChannelFactory, Channel


[docs]class ChannelManager: """Class to manage connections to blockchain exchange channels""" def __init__(self): self._ws = BlockchainWebsocket() self._channels_factory = ChannelFactory() self._channels = {channel_name: dict() for channel_name in self._channels_factory.channels} self._ws.set_ws_message_handler( handler=self._handle_messages ) @property def available_channel_names(self) -> List[str]: """List of channel names this manager is responsible for""" return list(self._channels.keys()) def _encode_channel(self, name, channel_params: Dict) -> str: """Custom channel UID""" encoding = f"{name}" for key in sorted(channel_params.keys()): encoding = f"{encoding}-{channel_params[key]}" return encoding
[docs] def get_channel(self, name, **kwargs) -> Channel: """Get connection to a channel of interest""" channel_id = self._encode_channel(name, kwargs) if channel_id in self._channels[name]: channel = self._channels[name][channel_id] else: channel = self._channels_factory.create_channel( name=name, ws=self._ws, **kwargs ) if name == "prices": for existing_channel in self._channels[name].values(): if existing_channel.symbol == channel.symbol: logging.error("Can subscribe for a single granularity per channel. " f"Already subscribed to {existing_channel}") return None self._channels[name][channel_id] = channel return channel
[docs] def get_all_channels(self) -> List[Channel]: """Get list of all opened connections to channels""" all_channels = [] for channel_type, channels in self._channels.items(): all_channels += [channel for channel in channels.values()] return all_channels
def _handle_messages(self, message: str): """A simple logic for handling message received from blockchain websocket""" msg: Dict = json.loads(message) event_type = msg.pop("event") channel_name = msg.pop("channel") channel_params = {} for key in ["symbol", "granularity"]: if key in msg and channel_name != "trading": channel_params[key] = msg.pop(key) channel = self.get_channel(channel_name, **channel_params) channel.on_event(event_type, msg)