0

I have a programm that receives Data (Trades) from the Binance API. This data will be processed and visualized in a web-app with dash and plotly.

In order to get best performance and the slightest delay my program has 3 threads:

Thread 1 - Binance API - get requests - Trades

if __name__ == "__main__":
    try:
         loop = asyncio.get_event_loop()
         binance-thread = threading.Thread(target=start_thread_1)
         ...

def start_thread_1():
     loop.run_until_complete(main(api_key,secret_key))

async def main(api_key,secret_key):
     client = await AsyncClient.create(api_key,secret_key)
     await trades_listener(client)

async def trades_listener(client):
    bm = BinanceSocketManager(client)
    symbol = 'BTCUSDT'
    async with bm.trade_socket(symbol=symbol) as stream:
        while True:
            msg = await stream.recv()

            event_type = msg['e']
            ...
            trade = Trade(event_type,...)
            # <-- safe trade SOMEWHERE to process in other thread ? safe to: process_trades_list

Thread 2 - Web App - Displays Trades and Processed Trades Data

web-thread = threading.Thread(target=webserver.run_server)
...
not worth to mention

Thread 3 - Process Data - Process Trades (calculate RSI, filter big trades, etc)

if __name__ == "__main__":
    try:
         loop = asyncio.get_event_loop()
         binance-thread = threading.Thread(target=start_thread_1)
         web-thread = threading.Thread(target=webserver.run_server)
         process-thread = threading.Thread(target=start_thread_3)
         ...
         .start()
         .sleep()
         etc.
         .join()

def start_thread_3():
    process_trades()

def process_trades():
    global process_trades_list
    while True:
        while len(process_trades_list) > 0:
            trade = process_trades_list[0]
            process_trades_list.pop(0)
            # ...do calculation etc.

HOW can I safe / hand over the data from thread_1 / async thread to thread_3? I tried to put the trades to a list called process_trades_list and then loop while len(process_trades_list) > 0 all trades. In the loop I pop() processed trades from the list - but this somehow seems to break the program without throwing errors. What's best way to get this done?

It is possible that the async stream get's spammed by new incoming trades and I want to minimalize the load..

2
  • The concurrent access to variables should be controlled by synchronization primitives, like semaphores and lock objects. Please visit the threading documentation in docs.python.org and chose one that fits the purposes of your code. Commented Feb 4, 2022 at 14:36
  • @Hilton: I am not sure if I actually need a concurrent access to variable - thread_1 just should put the trade somewhere, where thread_3 can process it. is there nothing like call thread_3.addTradeToList(trade) function I can call in thread_1?
    – Benix6
    Commented Feb 4, 2022 at 14:48

1 Answer 1

0

Here you want a queue.Queue instead of a list. Your last code snippet would look something like this:

import queue

if __name__ == "__main__":
    try:
         q = queue.Queue()
         binance_thread = threading.Thread(target=start_thread_1,
                                           args=(q,))
         web_thread = threading.Thread(target=webserver.run_server)
         process)thread = threading.Thread(target=process_trades, 
                                           args=(q,), daemon=True)
         ...
         .start()
         .sleep()
         etc.
         .join()

def process_trades(q):
    while True:
        trade = q.get()
        # ...do calculation etc.

I eliminated the call to get_event_loop since you didn't use the returned object. I eliminated the start_thread_3 function, which is not necessary.

I made thread-3 a daemon, so it will not keep your application open if everything else is finished.

The queue should be created once, in the main thread, and passed explicitly to each thread that needs to access it. That eliminates the need for a global variable.

The process trade function becomes much simpler. The q.get() call blocks until an object is available. It also pops the object off the queue.

Next you must also modify thread-1 to put objects onto the queue, like this:

def start_thread_1(q):
     asyncio.run(main(api_key,secret_key, q))

async def main(api_key,secret_key, q):
     client = await AsyncClient.create(api_key,secret_key)
     await trades_listener(client, q)

async def trades_listener(client, q):
    bm = BinanceSocketManager(client)
    symbol = 'BTCUSDT'
    async with bm.trade_socket(symbol=symbol) as stream:
        while True:
            msg = await stream.recv()

            event_type = msg['e']
            ...
            trade = Trade(event_type,...)
            q.put(trade)

The q.put function is how you safely put a trade object into the queue, which will then result in activity in thread-3.

I modified the start_thread1 function: here is a good place to start the event loop mechanism for this thread.

You ask about avoiding spam attacks on your program. Queues have methods that allow you to limit their size, and possibly throw away trades if they become full.

I don't understand what you are trying to do with the if __name__ == '__main__' logic in thread-1. The program can have only one entry point, and only one module named '__main__'. It looks to me like that has to be thread-3.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.