Source code for bcx.websocket
import json
import logging
import time
from threading import Lock, Thread
from websocket import WebSocketApp
[docs]class BlockchainWebsocket:
    """Low level API to interact with Blockchain Exchange"""
    def __init__(self):
        self._ws = None
        self._ws_connect_lock = Lock()
        self._ws_message_handler = lambda x: x
    @property
    def ws(self) -> WebSocketApp:
        """Connection to blockchain exchange websocket"""
        return self._ws
    @property
    def ws_uri(self) -> str:
        """URI of blockchain exchange websocket"""
        return "wss://ws.prod.blockchain.info/mercury-gateway/v1/ws"
        # return "wss://ws.blockchain.com/mercury-gateway/v1/ws"
    @property
    def ws_origin(self) -> str:
        """Blockchain exchange websocket origin"""
        return "https://exchange.blockchain.com"
    @property
    def ws_connect_timeout_seconds(self) -> int:
        """Wait for socket to connect before dropping connection"""
        return 5
    @property
    def ws_connect_headers(self) -> list:
        """List of additional headers sent to blockchain exchange"""
        return []
[docs]    def set_ws_message_handler(self, handler: callable):
        """Set method responsible for handling messages received from blockchain exchange"""
        self._ws_message_handler = handler 
[docs]    def send_json(self, message: dict) -> None:
        """Send message represented as python dictionary to blockchain exchange
        Parameters
        ----------
        message : Dict
        """
        self.send(json.dumps(message)) 
[docs]    def send(self, message: str) -> None:
        """Send raw string message to blockchain exchange
        Parameters
        ----------
        message : str
        """
        self.connect()
        self.ws.send(message) 
[docs]    def connect(self) -> None:
        """Connect to blockchain exchange websocket"""
        if self._ws:
            return
        with self._ws_connect_lock:
            while not self._ws:
                self._connect()
                if self._ws:
                    return 
[docs]    def reconnect(self) -> None:
        """Reconnect to blockchain exchange websocket"""
        if self._ws is not None:
            self._reconnect(self._ws) 
    def _connect(self) -> None:
        assert not self._ws, "websocket should be closed before attempting to connect"
        self._ws = WebSocketApp(
            url=self.ws_uri,
            on_message=self._wrap_callback(self._on_ws_message_callback),
            on_close=self._wrap_callback(self._on_ws_close_callback),
            on_error=self._wrap_callback(self._on_ws_error_callback),
            header=self.ws_connect_headers,
        )
        ws_thread = Thread(target=self._run_websocket, args=(self._ws,))
        ws_thread.daemon = True
        ws_thread.start()
        # Wait for socket to connect
        ts = time.time()
        while self._ws and (not self._ws.sock or not self._ws.sock.connected):
            if time.time() - ts > self.ws_connect_timeout_seconds:
                self._ws = None
                return
            time.sleep(0.1)
    def _run_websocket(self, ws: WebSocketApp) -> None:
        try:
            ws.on_open = self._wrap_callback(self._on_ws_open_callback)
            ws.run_forever(origin=self.ws_origin)
        except Exception as e:
            raise Exception(f'Unexpected error while running websocket: {e}')
        finally:
            self._reconnect(ws)
    def _reconnect(self, ws: WebSocketApp) -> None:
        assert ws is not None, '_reconnect should only be called with an existing ws'
        if ws is self._ws:
            self._ws = None
            ws.close()
            self.connect()
    def _on_ws_message_callback(self, ws: WebSocketApp, message: str):
        logging.info(message)
        self._ws_message_handler(message)
    def _on_ws_open_callback(self, ws: WebSocketApp):
        logging.info(f"Established connection to {self.ws_uri}")
    def _on_ws_close_callback(self, ws: WebSocketApp):
        self._reconnect(ws)
    def _on_ws_error_callback(self, ws: WebSocketApp, error: str):
        logging.error(error)
        self._reconnect(ws)
    def _wrap_callback(self, f: callable):
        def wrapped_f(ws, *args, **kwargs):
            if ws is self._ws:
                try:
                    f(ws, *args, **kwargs)
                except Exception as e:
                    raise Exception(f'Error running websocket callback: {e}')
        return wrapped_f