0

I have got I simple code modelling a more complicated problem I am to solve. Here I have 3 funcs- worker, task submitter (seek tasks and put it to queue once it gets new ones) and function creating a pool and adding new tasks to this pool. But the code doesnt happen to finish the run after queue gets empty and all the tasks in a list turn finished.I am too dump to have an idea why the hell it doesnt terminate the While loop with condition... I have tried a different ways to code the thing, nothing works

from concurrent.futures import ThreadPoolExecutor as Tpe
import time
import random
import queue
import threading


def task_submit(q):
    for i in range(7):
        threading.currentThread().setName('task_submit')
        new_task = random.randint(10, 20)
        q.put_nowait(new_task)
        print(f'                                {i} new task with argument {new_task} has been added to queue')
        time.sleep(5)


def worker(t):
    threading.currentThread().setName(f'worker {t}')
    print(f'{threading.currentThread().getName()} started')
    time.sleep(t)
    print(f'{threading.currentThread().getName()} FINISHED!')


def execution():
    executor = Tpe(max_workers=4)
    q = queue.Queue(maxsize=100)
    q_thread = executor.submit(task_submit, q)
    tasks = [executor.submit(worker, q.get())]
    execution_finished = False
    while not execution_finished:                           #all([task.done() for task in tasks]):
        if not all([task.done() for task in tasks]):
            print('             still in progress .....................')
            tasks.append(executor.submit(worker, q.get()))
        else:
            print('             all done!')
            executor.shutdown()
            execution_finished = True


execution()
4
  • You have exactly one worker going (tasks = [executor.submit(worker, q.get())]) - is that intended?
    – Timus
    Commented Nov 16, 2022 at 8:35
  • This is a very unusual implementation. Is it the case that you want to start additional "worker" threads continuously whenever you determine that previous "worker" threads are yet to finish? Also, note that you're calling q.get() which is blocking. If the queue is exhausted (empty) then the q.get() call will block ad infinitum Commented Nov 16, 2022 at 8:49
  • yes it is exactly what I want, I wish to add new tasks continiously while the pool is being in progress.
    – user20426821
    Commented Nov 16, 2022 at 8:55
  • I have just tried to make get nonblocking by replacing it with get_nowait, it raises an exception
    – user20426821
    Commented Nov 16, 2022 at 8:57

1 Answer 1

0

It doesn't terminate because you are trying to remove an item from an empty queue. The problem is here:

while not execution_finished:                           
    if not all([task.done() for task in tasks]):
        print('             still in progress .....................')
        tasks.append(executor.submit(worker, q.get()))

The last line here submits a new work item to the executor. Suppose that happens to be the last item in the queue. At that moment, the executor is not finished and will not be finished for a few seconds. Your main thread goes back to the while not execution_finished line, and the if statement evaluates true because some of the tasks are still running. So you try to submit one more item but you can't, because the queue is now empty. The call to q.get blocks the main loop until the queue contains an item, which never happens. The other threads finish but the program doesn't exit because the main thread is blocked.

Perhaps you should check for an empty queue, but I'm not sure that's the right idea because I probably don't understand your requirements. In any case, that's why your script doesn't exit.