2

I have some working code that I need to improve the run time on dramatically and I am pretty lost. Essentially, I will get zip folders containing tens of thousands of json files, each containing roughly 1,000 json messages. There are about 15 different types of json objects interspersed in each of these files and some of those objects have lists of dictionaries inside of them while others are pretty simple. I need to read in all the data, parse the objects and pull out the relevant information, and then pass that parsed data back and insert it into a different program using an API for a third party software (kind of a wrapper around a proprietary implementation of SQL).

So I have code that does all of that. The problem is it takes around 4-5 hours to run each time and I need to get that closer to 30 minutes.

My current code relies heavily on asyncio. I use that to get some concurrency, particularly while reading the json files. I have also started to profile my code and have so far moved to using orjson to read in the data from each file and rewrote each of my parser functions in cython to get some improvements on that side as well. However, I use asyncio queues to pass stuff back and forth and my profiler shows a lot of time is spent just in the queue.get and queue.put calls. I also looked into msgspec to improving reading in the json data and while that was faster, it became slower when I had to send the msgspec.Struct objects into my cython code and use them instead of just a dictionary.

So was just hoping for some general help on how to improve this process. I have read about multiprocessing both with multiprocessing.pools and concurrent.futures but both of those turned out to be slower than my current implementation. I was thinking maybe I need to change how I pass stuff through the queues so I passed the full json data for each file instead of each individual message (about 1,000 documents each) but that didn't help.

I have read so many SO questions/answers but it seems like a lot of people have very uniform json data (not 15 different message types). I looked into batching but I don't fully understand how that changes things - that was what I was doing using concurrent.futures but again it actually took longer.

Overall I would like to keep it as queues because in the future I would like to run this same process on streaming data, so that part would just take the place of the json reading and instead each message received over the stream would be put into the queue and everything else would work the same.

Some psuedo-code is included below.

main.py

import asyncio
from glob import glob
import orjson
from parser_dispatcher import ParserDispatcher
from sql_dispatcher import SqlDispatcher

async def load_json(file_path, queue):
    async with aiofiles.open(file_path, mode="rb") as f:
        data = await f.read()
        json_data = await asyncio.to_thread(orjson.loads(data))
        for msg in json_data:
            await queue.put(msg)

async def load_all_json_files(base_path, queue):
    file_list = glob(f"{base_path}/*.json")
    tasks = [load_json(file_path, queue) for file_path in file_list]
    await asyncio.gather(*tasks)
    await queue.put(None) # to end the processing

def main()
    base_path = "\path\to\json\folder"
    paser_queue = asyncio.queue()
    sql_queue = asyncio.queue()
    
    parser_dispatch = ParserDispatcher()
    sql_dispatch = SqlDispatcher()

    load_task = load_all_json_files(base_path, parser_queue)
    parser_task = parser_dispatch.process_queue(parser_queue, sql_queue)
    sql_task = sql_dispatch.process_queue(sql_queue)

    await asyncio.gather(load_task, parser_task, sqlr_task)

if __name__ -- "__main__":
    asyncio.run(main))

parser_dispatcher.py

import asyncio
import message_parsers as mp

class ParserDispatcher:
    def __init__(self):
        self.parsers = {
            ("1", "2", "3"): mp.parser1,
            .... etc
        } # this is a dictionary where keys are tuples and values are the parser functions

    def dispatch(self, msg):
        parser_key = tuple(msg.get("type"), msg.get("source"), msg.get("channel"))
        parser = self.parsers.get(parser_key)
        if parser:
            new_msg = parser(msg)
        else:
            new_msg = []
        return new_msg
    
    async def process_queue(self, parser_queue, sql_queue):
        while True:
            msg = await process_queue.get()
            if msg is None:
                await sql_put.put(None)
            process_queue.task_done()
            parsed_messages = self.dispatch(msg)
            for parsed_message in parsed_messages:
                await sql_queue.put(parsed_message)

sql_dispatcher.py

import asycnio
import proprietarySqlLibrary as sql

class SqlDispatcher:
    def __init__(self):
        # do all the connections to the DB in here

    async def process_queue(self, sql_queue):
        while True:
            msg = await sql_queue.get()
            # then go through and add this data to the DB
            # this part is also relatively slow but I'm focusing on the first half for now
            # since I don't have control over the DB stuff

5
  • ...is there a hard requirement that it be written in Python? If speed is an issue and the code isn't complicated there are plenty of other options. Commented Jan 15 at 21:14
  • Your application is currently too slow by a factor of ten. Standard practice for optimizing program performance is to determine where the bottlenecks are. Perhaps you could measure how long it takes to process one single message, independent of any network or I/O overhead. Say you have 18 million messages (that's seems roughly right). To process those in 1800 seconds (30 minutes) you need to handle each message in 100 microseconds. Can you do that? If you can then you need to focus on the I/O part. Commented Jan 16 at 2:29
  • Depending on your disk storage (solid state?), concurrently reading multiple files may offer worse performance than reading files serially. Benchmarking this could be tricky since your platform might provide disk caching.
    – Booboo
    Commented Jan 16 at 12:11
  • (1) erdp_task is not defined. (2) In process_queue if msg is None, you are still dispatching it. Shouldn't you return on this? (3) You call process_queue.task_done() without anyone joining the queue. This is a needless call. Moreover, task_done should be called only after you have finished processing a queue item, not when you have just retrieved it because the task is not yet done at this point.
    – Booboo
    Commented Jan 16 at 12:44
  • no hard requirement for python but this is part of a larger project and all of that is python. when I profile the parsers, simple messages take around 1-10 microseconds and complex ones around 40-70 microseconds in cython. I was a little off with my estimate, many of these zip files have hundreds of thousands of json docs so hundreds of millions of json objects to parse. and I just saw your answer with the concurrent reading possibly hurting so I will try that. Many of the errors you caught were just me trying to reduce the code for this question. Commented Jan 16 at 14:25

2 Answers 2

1

This might improve performance, but whether significantly enough is still an open question:

The parsing of JSON data is a CPU-bound task and by concurrently doing this parsing in a thread pool will not buy you anything unless orjson is implemented in C (probably) and releases the GIL (very questionable; see this).

The code should therefore be re-arranged to submit the parsing of messages to a concurrent.futures.ProcessPoolExecutor instance in batches. This is the general idea, which could not be tested since I do not have access to the data. Note that I am not attempting to read in the JSON files concurrently, since it is unclear whether doing so would actually hurt performance instead of helping. You can always modify the code to use multiple tasks to perform this reading.

...
from concurrent.futures import ProcessPoolExecutor

def process_batch(file_data):
    msgs = []
    for data in file_data:
        msgs.extend(orjson.loads(data))
    return msgs

async def load_json(executor, file_data, queue):
    loop = asyncio.get_running_loop()
    msgs = await loop.run_in_executor(executor, process_batch, file_data)
    for msg in msgs:
        await queue.put(msg)

async def load_all_json_files(base_path, queue):
    # If you have the memory, ideally BATCH_SIZE should be:
    # ceil(number_of_files / (4 * number_of_cores)))
    # So if you had 50_000 files and 10 cores, then
    # BATCH_SIZE should be 1250
    
    BATCH_SIZE = 1_000

    # Use a multiprocessing pool:
    executor = ProcessPoolExecutor()

    tasks = []
    file_data = [] 
    file_list = glob(f"{base_path}/*.json")

    for file_path in file_list:
        async with aiofiles.open(file_path, mode="rb") as f:
            file_data.append(await f.read())
        if len(file_data) == BATCH_SIZE:
            tasks.append(asyncio.create_task(load_json(executor, file_data, queue)))
            file_data = []
    if file_data:
        tasks.append(asyncio.create_task(load_json(executor, file_data, queue)))

    await asyncio.gather(*tasks)   
    await queue.put(None) # to end the processing

...
3
  • thank you for the advice. I tried this on a smaller dataset of 10k files and resulted in 60% of the original run time which is significant. I had tried a ProcessPoolExecutor in the parser_dispatcher script and that didn't help as much but I might not have implemented it correctly. I guess there are two parsing steps: the orjson.loads and then what I call parsing in the parser_dispatcher.py file. I will have to look into including my parser_dispatcher.py in the multiprocessing as well. Commented Jan 16 at 16:35
  • Great! Did you experiment with BATCH_SIZE? Too large (e.g. 10K for your test case), then only one pool process will be working on the batches. Ideally, you would like each pool process to handle ~4 batches. That is, you want fewer but larger writes to the pool's internal job queue. If you make the batch too large, then the work may not be evenly distributed among the processes. This is why the pool's map method has a chunksize argument (which builds the batches automatically for you). But if you use the submit method, then you have to do your own "chunking", as we are doing here.
    – Booboo
    Commented Jan 16 at 18:43
  • Yes I have been playing around with batch size. I can't have too many open because then as you mentioned I max out my RAM (I probably need to look into memory management too) so right now I have it around 250 files on 8 cores. Interestingly, that will max out my CPU for a few seconds and then it goes down to 80% for a while before repeating. It seems the orjson.loads is very cpu intensive and my own parsing is not as bad as I thought. I will have to give msgspec another go because that was much more efficient loading but made the cython parsers harder. Commented Jan 16 at 19:24
0

My first instinct is parallel processing from one machine won't be much faster than single-threaded. I would guess the bulk of the time is spent processing the data rather than reading it from disk (which you could prove/disprove by running your programming and commenting out the pieces which process data, leaving only the pieces which read).

This question might be relevant to yours. Python 3.13 introduced certain features to getting around the performance limitations of the GIL.

If you have access to multiple machines you might consider farming out some of the processing to multiple machines using a queue (I like https://python-rq.org/). Even if this does not get you all the way home, the jobs on the queue might read the original files and write pre-digested ones back to disk, allowing your program to process pre-digested files which should theoretically run faster than your current program.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.