Jaco du Plessis

A Look at the Luno Streams Python Library

python luno websockets asyncio

I have a running cryptocurrency project, written in Python, that pulls in market data from various sources, looks for arbitrage opportunities and performs the trades in an automated way. Due to the nature of the project, I use the relatively new asyncio and aiohttp Python libraries for efficient and parallel networking. The more advanced crypto platforms offer streaming APIs to retrieve real-time market data. There are many benefits to using streaming APIs vs. REST APIs, mostly due to the large payloads required to represent the full market data on exchanges. Some exchanges offer order book compression, but in particular, Luno does not.

They do, however, offer a streaming API that is currently in beta. From their docs:

It is more efficient and provides lower latency information than repeatedly polling the orderbook and recent trades but is more complicated to implement.

As I am a guy that likes efficiency, so I wrote some Python code to use this API. It is currently running in my crypto arbitrage project. This week, after realising once again that almost none of my projects could work without other peoples' open source libraries, I decided it was time to contribute something to the ecosystem. I wrapped up the code and you can add it to your own project with pip install luno_streams.

The most important part of the library is the luno_streams.Updates class. It connects to the API using websockets, receives updates published from Luno and maintains an order book that perfectly reflects that of Luno's.

Here are some important parts of the code:

class Updater:

    def __init__(self, pair_code, api_key, api_secret, hooks=None):
        self.pair_code = pair_code.upper()
        self.api_key = api_key
        self.api_secret = api_secret
        self.sequence = None
        self.bids = {}
        self.asks = {}
        self.websocket = None
        self.hooks = hooks or []
        self.time_last_connection_attempt = None

A quick comment on each attribute of the class:

Each update message transmitted from the server has a unique increasing sequence number. The message with sequence number n can be applied to state sequence n-1 to produce state sequence n. A message may contain multiple updates which must be applied atomically and in order. If an update is received out-of-sequence (for example update sequence n+2 or n-1 received after update sequence n), the client cannot continue and must reinitialize the state.

It is critical that clients implement exponential backoff for all reconnections to avoid overloading the server in case of errors.

The library currently simply limits itself to not attempt a connection more than once every 10 seconds.

Below the connection code. After checking for the rate limit, it connects and stores the initial state of the order book.

def check_backoff(self):
    if self.time_last_connection_attempt is not None:
        delta = time.time() - self.time_last_connection_attempt
        if delta < 10:
            raise BackoffException()

async def connect(self):

    if self.websocket is not None:  # reconnecting
        logger.info(f'[{self.pair_code}] Closing existing connection...')
        await self.websocket.ws_client.close()
    try:
        self.check_backoff()
    except BackoffException:
        # do not attempt connection more that once every 10 seconds
        logger.info('Waiting 10 seconds before attempting to connect...')
        await asyncio.sleep(10)
    self.time_last_connection_attempt = time.time()

    url = f'wss://ws.luno.com/api/1/stream/{self.pair_code}'
    logger.info(f'[{self.pair_code}] Connecting to {url}...')
    self.websocket = await websockets.connect(url)
    # no error handling - if connection fails, let it raise websocket Exception
    await self.websocket.send(json.dumps({
        'api_key_id': self.api_key,
        'api_key_secret': self.api_secret,
    }))

    initial = await self.websocket.recv()
    initial_data = json.loads(initial)
    self.sequence = int(initial_data['sequence'])
    self.asks = {x['id']: [Decimal(x['price']), Decimal(x['volume'])] for x in initial_data['asks']}
    self.bids = {x['id']: [Decimal(x['price']), Decimal(x['volume'])] for x in initial_data['bids']}
    logger.info(f'[{self.pair_code}] Initial state received.')

Here you can see that all numbers are represented by a decimal.Decimal object, which is very important when you need exact representations of their values. To see why this is necessary, simple type 0.1 + 0.2 in a Python terminal:

>>> 0.1 + 0.2
0.30000000000000004

Then the code for processing each update. It maps very clearly to the documentation. The run method is invoked to start.

async def run(self):
    await self.connect()
    async for message in self.websocket:
        if message == '""':
            # keep alive
            continue
        await self.handle_message(message)

async def handle_message(self, message):
    data = json.loads(message)
    new_sequence = int(data['sequence'])
    if new_sequence != self.sequence + 1:
        logger.warning(
            f'[{self.pair_code}] Sequence broken: expected "{self.sequence+1}", received "{new_sequence}".'
        )
        logger.info(f'[{self.pair_code}] Reconnecting...')
        return await self.connect()

    self.sequence = new_sequence

    if data['delete_update']:
        order_id = data['delete_update']['order_id']

        try:
            del self.bids[order_id]
        except KeyError:
            pass
        try:
            del self.asks[order_id]
        except KeyError:
            pass

    if data['create_update']:
        update = data['create_update']
        price = Decimal(update['price'])
        volume = Decimal(update['volume'])
        key = update['order_id']
        book = self.bids if update['type'] == 'BID' else self.asks
        book[key] = [price, volume]

    trades = []

    if data['trade_updates']:

        for update in data['trade_updates']:
            update['price'] = Decimal(update['counter']) / Decimal(update['base'])
            maker_order_id = update['maker_order_id']
            if maker_order_id in self.bids:
                self.update_existing_order(key='bids', update=update)
                trades.append({**update, 'type': 'sell'})
            elif maker_order_id in self.asks:
                self.update_existing_order(key='asks', update=update)
                trades.append({**update, 'type': 'buy'})

    for fn in self.hooks:
        args = [self.consolidated_order_book, trades]
        if asyncio.iscoroutinefunction(fn):
            await fn(*args)
        else:
            fn(*args)

def update_existing_order(self, key, update):
    book = getattr(self, key)
    order_id = update['maker_order_id']
    existing_order = book[order_id]
    existing_volume = existing_order[1]
    new_volume = existing_volume - Decimal(update['base'])
    if new_volume == Decimal('0'):
        del book[order_id]
    else:
        existing_order[1] -= Decimal(update['base'])

The update_existing_order method is simply a helper to avoid code duplication when processing each side of the order book.

Each hook will receive two arguments:

  1. a consolidated order book, which groups all orders by price. Rarely does one require each individual order.
  2. a list of trades that were performed during the last update. This can be used if you need to determine whether your order was fulfilled without making an API call.

Below the code for the order grouping (it also rounds and sorts the final output):

@property
def consolidated_order_book(self):

    def consolidate(orders, reverse=False):
        price_map = defaultdict(Decimal)

        for order in orders:
            price_map[order[0]] += order[1]

        rounded_list = map(lambda x: [round(x[0], ndigits=8), round(x[1], ndigits=8)], price_map.items())
        return sorted(rounded_list, key=lambda a: a[0], reverse=reverse)

    return {
        'bids': consolidate(self.bids.values(), reverse=True),  # highest bid on top
        'asks': consolidate(self.asks.values()),  # lowest ask on top
    }

The code is on GitHub — questions, comments, improvements and contributions are welcome!

Have a comment? Email me at jaco@jacoduplessis.co.za.