Source code for bcx.websocket
import json
import logging
import time
from threading import Lock, Thread
from websocket import WebSocketApp
[docs]class BlockchainWebsocket:
"""Low level API to interact with Blockchain Exchange"""
def __init__(self):
self._ws = None
self._ws_connect_lock = Lock()
self._ws_message_handler = lambda x: x
@property
def ws(self) -> WebSocketApp:
"""Connection to blockchain exchange websocket"""
return self._ws
@property
def ws_uri(self) -> str:
"""URI of blockchain exchange websocket"""
return "wss://ws.prod.blockchain.info/mercury-gateway/v1/ws"
# return "wss://ws.blockchain.com/mercury-gateway/v1/ws"
@property
def ws_origin(self) -> str:
"""Blockchain exchange websocket origin"""
return "https://exchange.blockchain.com"
@property
def ws_connect_timeout_seconds(self) -> int:
"""Wait for socket to connect before dropping connection"""
return 5
@property
def ws_connect_headers(self) -> list:
"""List of additional headers sent to blockchain exchange"""
return []
[docs] def set_ws_message_handler(self, handler: callable):
"""Set method responsible for handling messages received from blockchain exchange"""
self._ws_message_handler = handler
[docs] def send_json(self, message: dict) -> None:
"""Send message represented as python dictionary to blockchain exchange
Parameters
----------
message : Dict
"""
self.send(json.dumps(message))
[docs] def send(self, message: str) -> None:
"""Send raw string message to blockchain exchange
Parameters
----------
message : str
"""
self.connect()
self.ws.send(message)
[docs] def connect(self) -> None:
"""Connect to blockchain exchange websocket"""
if self._ws:
return
with self._ws_connect_lock:
while not self._ws:
self._connect()
if self._ws:
return
[docs] def reconnect(self) -> None:
"""Reconnect to blockchain exchange websocket"""
if self._ws is not None:
self._reconnect(self._ws)
def _connect(self) -> None:
assert not self._ws, "websocket should be closed before attempting to connect"
self._ws = WebSocketApp(
url=self.ws_uri,
on_message=self._wrap_callback(self._on_ws_message_callback),
on_close=self._wrap_callback(self._on_ws_close_callback),
on_error=self._wrap_callback(self._on_ws_error_callback),
header=self.ws_connect_headers,
)
ws_thread = Thread(target=self._run_websocket, args=(self._ws,))
ws_thread.daemon = True
ws_thread.start()
# Wait for socket to connect
ts = time.time()
while self._ws and (not self._ws.sock or not self._ws.sock.connected):
if time.time() - ts > self.ws_connect_timeout_seconds:
self._ws = None
return
time.sleep(0.1)
def _run_websocket(self, ws: WebSocketApp) -> None:
try:
ws.on_open = self._wrap_callback(self._on_ws_open_callback)
ws.run_forever(origin=self.ws_origin)
except Exception as e:
raise Exception(f'Unexpected error while running websocket: {e}')
finally:
self._reconnect(ws)
def _reconnect(self, ws: WebSocketApp) -> None:
assert ws is not None, '_reconnect should only be called with an existing ws'
if ws is self._ws:
self._ws = None
ws.close()
self.connect()
def _on_ws_message_callback(self, ws: WebSocketApp, message: str):
logging.info(message)
self._ws_message_handler(message)
def _on_ws_open_callback(self, ws: WebSocketApp):
logging.info(f"Established connection to {self.ws_uri}")
def _on_ws_close_callback(self, ws: WebSocketApp):
self._reconnect(ws)
def _on_ws_error_callback(self, ws: WebSocketApp, error: str):
logging.error(error)
self._reconnect(ws)
def _wrap_callback(self, f: callable):
def wrapped_f(ws, *args, **kwargs):
if ws is self._ws:
try:
f(ws, *args, **kwargs)
except Exception as e:
raise Exception(f'Error running websocket callback: {e}')
return wrapped_f