-2

What specifically needs to change in the Python 3.12 code below in order for each and every one of the calls to the write_to_file(linesBuffer) function to run in parallel instead of running sequentially?

In other words,

  1. We want for the program execution to continue without waiting for write_to_file(linesBuffer) to return,
  2. But we also want to make sure that each call to write_to_file(linesBuffer) does eventually return.

Each call to write_to_file(linesBuffer) should start at a different time, and return after whatever different duration might be required in order for each call to successfully complete its work. And there should never be delays waiting for one call to write_to_file(linesBuffer) to complete before the next call to write_to_file(linesBuffer) is initiated.

When we remove await from the write_to_file(linesBuffer) line, the result is that none of the print commands inside the write_to_file(linesBuffer) function ever get executed. So we cannot simply change await write_to_file(linesBuffer) to write_to_file(linesBuffer).

The problem in the code is that the many sequential calls to the await write_to_file(linesBuffer) function cause the program to become very slow.

Here is the code:

import os
import platform
import asyncio

numLines = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(linesBuffer):
    print("inside Writing to file...")
    with open('newFile.txt', 'a') as new_destination_file:
        for line in linesBuffer:
            new_destination_file.write(line)
    #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
    directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
    print("directory_name: ", directory_name)
    linesBuffer.clear()
    #print every 1 second for 2 seconds.
    for i in range(2):
        print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
        await asyncio.sleep(1)
    print("inside done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    linesBuffer = []
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)
    with open(source_file_path, 'r') as source_file:
        source_file.seek(0, os.SEEK_END)
        while True:
            line = source_file.readline()
            new_file_size = os.path.getsize(source_file_path)
            if new_file_size < file_size:
                print("The file has been truncated.")
                source_file.seek(0, os.SEEK_SET)
                file_size = new_file_size
                linesBuffer.clear()
                counter = 0
                print("new_file_size: ", new_file_size)
            if len(line) > 0:
              new_line = str(counter) + " line: " + line
              print(new_line)
              linesBuffer.append(new_line)
              print("len(linesBuffer): ", len(linesBuffer))
              if len(linesBuffer) >= numLines:
                print("Writing to file...")
                await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.    
                print("awaiting Writing to file...")
                linesBuffer.clear()
              counter += 1
              print("counter: ", counter)
            if not line:
                await asyncio.sleep(0.1)
                continue
            #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
            if source_file.tell() == file_size:
                print("LAST LINE IN FILE FOUND.  Writing to file...")
                await write_to_file(linesBuffer)
                print("awaiting Writing to file...")
                linesBuffer.clear()
                counter = 0
        
async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())
2
  • asyncio does not support async file I/O. You may want to use a threads instead. Commented Oct 15, 2024 at 1:05
  • @yurikilochek Do you feel like giving a working example?
    – CodeMed
    Commented Oct 15, 2024 at 1:19

1 Answer 1

3

A couple of points:

First, as been commented upon, the file I/O you are doing is not asynchronous and asyncio does not support asynchronous file I/O. For this I would suggest you install from the PyPI repository the aiofiles module.

Second, you have the following:

await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.

Actually, without the await the function write_to_file never gets called. The expression write_to_file(linesBuffer) only results in returning a coroutine that must be awaited if you want to call it, as you are currently doing. But this call is actually synchronous in that the caller is suspended, the coroutine is called and once it completes and returns a value (even if it is the implicit None if there is no return statement) the caller then resumes with await write_to_file(linesBuffer) evaluating to that return value.

But you want write_to_file to run asynchronously (concurrently) with your read_source_file coroutine. For that, you need to create a separate task. See asyncio.create_task for details. Pay particular attention about saving the task instance returned by this call to prevent the task from prematurely terminating due to it being garbage collected.

So basically your modified code would be as follows (I have not verified that its overall logic is correct):

import os
import platform
import asyncio
import aiofiles

numLines = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(linesBuffer):
    print("inside Writing to file...")
    async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
        for line in linesBuffer:
            await new_destination_file.write(line)
    #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
    directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
    print("directory_name: ", directory_name)
    linesBuffer.clear()
    #print every 1 second for 2 seconds.
    for i in range(2):
        print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
        await asyncio.sleep(1)
    print("inside done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    linesBuffer = []
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)
    
    background_tasks = set()
    
    async with aiofiles.open(source_file_path, 'r') as source_file:
        await source_file.seek(0, os.SEEK_END)
        while True:
            line = await source_file.readline()
            new_file_size = os.path.getsize(source_file_path)
            if new_file_size < file_size:
                print("The file has been truncated.")
                await source_file.seek(0, os.SEEK_SET)
                file_size = new_file_size
                linesBuffer.clear()
                counter = 0
                print("new_file_size: ", new_file_size)
            if len(line) > 0:
              new_line = str(counter) + " line: " + line
              print(new_line)
              linesBuffer.append(new_line)
              print("len(linesBuffer): ", len(linesBuffer))
              if len(linesBuffer) >= numLines:
                print("Writing to file...")
                task = asyncio.create_task(write_to_file(linesBuffer))
                background_tasks.add(task)
                task.add_done_callback(background_tasks.discard)
                linesBuffer.clear()
              counter += 1
              print("counter: ", counter)
            if not line:
                await asyncio.sleep(0.1)
                continue
            #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
            if await source_file.tell() == file_size:
                print("LAST LINE IN FILE FOUND.  Writing to file...")
                task = asyncio.create_task(write_to_file(linesBuffer))
                background_tasks.add(task)
                task.add_done_callback(background_tasks.discard)
                linesBuffer.clear()
                counter = 0
        
async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())

Update

Maybe you want something like this. This code essentially look to see if lines have been appended to an input file and if so, they are accumulated in a batch and sent to another task for adding to an output file.

If you see that the input file has been truncated, you execute:

await source_file.seek(0, os.SEEK_SET)

That positions you to the beginning of the file. Is that what you really want? I don't get it. Since you clearly know what it is you want, you will be in a better position to make adjustments to this code. If it's not even in the ballpark, then I surrender.

import os
import platform
import asyncio
import aiofiles

BATCH_SIZE = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(queue):
    async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
        while True:
            lines = queue.get()
            print("Writing to file...")
            for line in lines:
                await new_destination_file.write(line)
            print("Done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)

    queue = asyncio.Queue()
    write_task = asyncio.create_task(write_to_file)

    async with aiofiles.open(source_file_path, 'r') as source_file:
        await source_file.seek(0, os.SEEK_END)
        linesBuffer = []
        while True:
            # Always make sure that file_size is the current size:
            old_file_size = file_size
            file_size = os.path.getsize(source_file_path)
            if file_size < old_file_size:
                print("The file has been truncated.")
                await source_file.seek(0, os.SEEK_SET)
                # Allocate a new list instead of clearing the current one
                linesBuffer = []
                counter = 0
                print("new_file_size: ", new_file_size)
                continue

            line = await source_file.readline()
            if line:
                new_line = str(counter) + " line: " + line
                print(new_line)
                linesBuffer.append(new_line)
                print("len(linesBuffer): ", len(linesBuffer))

                if len(linesBuffer) == BATCH_SIZE:
                    print("Writing batch to file...")
                    await queue.put(linesBuffer)
                    linesBuffer = []
                    counter += 1
                    print("counter: ", counter)

                #detect whether or not the present line is the last line in the file.
                # If it is the last line in the file, then write whatever batch
                # we have even if it is not complete.
                if await source_file.tell() == file_size:
                    print("LAST LINE IN FILE FOUND.")
                    if linesBuffer:
                        # Write even though it's not a full batch:
                        await queue.put(linesBuffer)
                        linesBuffer = []
                    counter = 0
            else:
                await asyncio.sleep(0.1)

async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())

If you are actually sending your batches to an api and you want these to run concurrently, then:

import os
import platform
import asyncio
import aiofiles

BATCH_SIZE = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def send_to_api(linesBuffer):
    ...

async def read_source_file():
    source_file_path = get_source_file_path()
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)

    background_tasks = set()

    async with aiofiles.open(source_file_path, 'r') as source_file:
        await source_file.seek(0, os.SEEK_END)
        linesBuffer = []
        while True:
            # Always make sure that file_size is the current size:
            old_file_size = file_size
            file_size = os.path.getsize(source_file_path)
            if file_size < old_file_size:
                print("The file has been truncated.")
                await source_file.seek(0, os.SEEK_SET)
                # Allocate a new list instead of clearing the current one
                linesBuffer = []
                counter = 0
                print("new_file_size: ", new_file_size)
                continue

            line = await source_file.readline()
            if line:
                new_line = str(counter) + " line: " + line
                print(new_line)
                linesBuffer.append(new_line)
                print("len(linesBuffer): ", len(linesBuffer))

                if len(linesBuffer) == BATCH_SIZE:
                    print("sending to api...")
                    task = asyncio.create_task(send_to_api(linesBuffer))
                    background_tasks.add(task)
                    task.add_done_callback(background_tasks.discard)
                    # Do not clear the buffer; allocate a new one:
                    linesBuffer = []
                    counter += 1
                    print("counter: ", counter)

                #detect whether or not the present line is the last line in the file.
                # If it is the last line in the file, then write whatever batch
                # we have even if it is not complete.
                if await source_file.tell() == file_size:
                    print("LAST LINE IN FILE FOUND.")
                    if linesBuffer:
                        # Send even though it's not a full batch:
                        task = asyncio.create_task(send_to_api(linesBuffer))
                        background_tasks.add(task)
                        task.add_done_callback(background_tasks.discard)
                        linesBuffer = []
                    counter = 0
            else:
                await asyncio.sleep(0.1)

async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())
9
  • Nice answer. It is interesting to note that the ordering of the resulting output lines is not guaranteed with this code. (It would take a couple hours experiencing to see what exactly happens in this situation as it is. AFAIK aiofiles itself uses a threadpool to its file operations write (and not an async-mechanism like Select or epool) )
    – jsbueno
    Commented Oct 15, 2024 at 17:55
  • @jsbueno The OP now creates a write_to_file task that will eventually run but the linesBuffer that is being passed to the task is almost immediately cleared. So I suspect what the OP is trying to accomplish (this is never stated) is not what will be achieved with this change. I would probably create a single write_to_file task that loops indefinitely getting data from a queue and writing it to the file (and the main task sends new linesBuffer contents to the queue now instead of creating a new task). But does this accomplish the OP's goal?
    – Booboo
    Commented Oct 15, 2024 at 18:33
  • Sure - I'd also go for this separate writing task approach - and I agree that suggesting that is more than what the O.P. asked for, and we can't be even sure of their purpose anyway.
    – jsbueno
    Commented Oct 15, 2024 at 19:02
  • @Booboo Thank you, but there are problems with your code. First, background_tasks.add(task) instead of background_tasks.append(task). But then, the resulting text file only contains about 5% of the expected number of lines. And the printed lines in the resulting text file seemed to be sampled arbitrarily from all over the source, based on the line numbers that were attached to each line while the code is running. How can these problems be resolved?
    – CodeMed
    Commented Oct 15, 2024 at 19:18
  • 3
    @CodeMed: you may not be aware that issuing sour complaints about freely supplied help may be considered quite rude. If you have reasons for exhibiting this sort of behaviour (e.g. being on a neuro-diverse spectrum), I wonder if might help to add a note in your profile. Your questions are quite interesting and clear, but they suffer from a similar problem of noticeable entitlement.
    – halfer
    Commented Oct 16, 2024 at 23:30

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.