I have a programm that receives Data (Trades) from the Binance API. This data will be processed and visualized in a web-app with dash and plotly.
In order to get best performance and the slightest delay my program has 3 threads:
Thread 1 - Binance API - get requests - Trades
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
...
def start_thread_1():
loop.run_until_complete(main(api_key,secret_key))
async def main(api_key,secret_key):
client = await AsyncClient.create(api_key,secret_key)
await trades_listener(client)
async def trades_listener(client):
bm = BinanceSocketManager(client)
symbol = 'BTCUSDT'
async with bm.trade_socket(symbol=symbol) as stream:
while True:
msg = await stream.recv()
event_type = msg['e']
...
trade = Trade(event_type,...)
# <-- safe trade SOMEWHERE to process in other thread ? safe to: process_trades_list
Thread 2 - Web App - Displays Trades and Processed Trades Data
web-thread = threading.Thread(target=webserver.run_server)
...
not worth to mention
Thread 3 - Process Data - Process Trades (calculate RSI, filter big trades, etc)
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
web-thread = threading.Thread(target=webserver.run_server)
process-thread = threading.Thread(target=start_thread_3)
...
.start()
.sleep()
etc.
.join()
def start_thread_3():
process_trades()
def process_trades():
global process_trades_list
while True:
while len(process_trades_list) > 0:
trade = process_trades_list[0]
process_trades_list.pop(0)
# ...do calculation etc.
HOW can I safe / hand over the data from thread_1 / async thread to thread_3?
I tried to put the trades to a list called process_trades_list
and then loop while len(process_trades_list) > 0
all trades.
In the loop I pop()
processed trades from the list - but this somehow seems to break the program without throwing errors.
What's best way to get this done?
It is possible that the async stream get's spammed by new incoming trades and I want to minimalize the load..