Source code for bcx.manager
import json
import logging
from typing import Dict, List
from bcx.websocket import BlockchainWebsocket
from bcx.channels import ChannelFactory, Channel
[docs]class ChannelManager:
    """Class to manage connections to blockchain exchange channels"""
    def __init__(self):
        self._ws = BlockchainWebsocket()
        self._channels_factory = ChannelFactory()
        self._channels = {channel_name: dict() for channel_name in self._channels_factory.channels}
        self._ws.set_ws_message_handler(
            handler=self._handle_messages
        )
    @property
    def available_channel_names(self) -> List[str]:
        """List of channel names this manager is responsible for"""
        return list(self._channels.keys())
    def _encode_channel(self, name, channel_params: Dict) -> str:
        """Custom channel UID"""
        encoding = f"{name}"
        for key in sorted(channel_params.keys()):
            encoding = f"{encoding}-{channel_params[key]}"
        return encoding
[docs]    def get_channel(self, name, **kwargs) -> Channel:
        """Get connection to a channel of interest"""
        channel_id = self._encode_channel(name, kwargs)
        if channel_id in self._channels[name]:
            channel = self._channels[name][channel_id]
        else:
            channel = self._channels_factory.create_channel(
                name=name,
                ws=self._ws,
                **kwargs
            )
            if name == "prices":
                for existing_channel in self._channels[name].values():
                    if existing_channel.symbol == channel.symbol:
                        logging.error("Can subscribe for a single granularity per channel. "
                                      f"Already subscribed to {existing_channel}")
                        return None
            self._channels[name][channel_id] = channel
        return channel 
[docs]    def get_all_channels(self) -> List[Channel]:
        """Get list of all opened connections to channels"""
        all_channels = []
        for channel_type, channels in self._channels.items():
            all_channels += [channel for channel in channels.values()]
        return all_channels 
    def _handle_messages(self, message: str):
        """A simple logic for handling message received from blockchain websocket"""
        msg: Dict = json.loads(message)
        event_type = msg.pop("event")
        channel_name = msg.pop("channel")
        channel_params = {}
        for key in ["symbol", "granularity"]:
            if key in msg and channel_name != "trading":
                channel_params[key] = msg.pop(key)
        channel = self.get_channel(channel_name, **channel_params)
        channel.on_event(event_type, msg)