I am trying to do image processing in all cores available in my machine(which has 4 cores and 8 processors). I chose to do Multiprocessing because it's a kind of CPU bound workload. Now, explaining the data I have a CSV file that has file paths recorded(local path), Image Category(explain what image is). The CSV has exactly 9258 categories. My Idea is to do batch processing. Assign 10 categories to each processor and loop through the images one by one, wait till all the processors complete its job, and assign the next batch.
The categories are stored in this format as_batches = [[C1, C2, ..., C10], [C11, C12, C13, ..., C20], [Cn-10, Cn-9,..., Cn]]
Here is the function that starts the process.
def get_n_process(as_batches, processes, df, q):
p = []
for i in range(processes):
work = Process(target=submit_job, args=(df, as_batches[i], q, i))
p.append(work)
work.start()
as_batches = as_batches[processes:]
return p, as_batches
Here is the main loop,
while(len(as_batches) > 0):
t = []
#dynamically check the lists
if len(as_batches) > 8:
n_process = 8
else:
n_process = len(as_batches)
print("For this it Requries {} Process".format(n_process))
process_obj_inlist, as_batches = get_n_process(as_batches, n_process, df, q)
for ind_process in process_obj_inlist:
ind_process.join()
with open("logs.txt", "a") as f:
f.write("\n")
f.write("Log Recording at: {timestamp}, Remaining N = {remaining} yet to be processed".format(
timestamp=datetime.datetime.now(),
remaining = len(as_batches)
))
f.close()
For log purposes, I am writing into a text file to see how many categories are there to process yet. And here is the main function
def do_something(fromprocess):
time.sleep(1)
print("Operation Ended for Process:{}, process Id:{}".format(
current_process().name, os.getpid()
))
return "msg"
def submit_job(df, list_items, q, fromprocess):
a = []
for items in list_items:
oneitemdf = df[df['MinorCategory']==items]['FilePath'].values.tolist()
oneitemdf = [x for x in oneitemdf if x.endswith('.png')]
result = do_something(fromprocess)
a.append(result)
q.put(a)
For now, I am just printing in the console, but in real code, I will be using KAZE algorithm to extract features from the images, store it in a list and append it to the Queue(Shared Memory) from all the processors. Now the script is running for few minutes but after some time the script is halted. It didn't run further. I tried to exit it but I couldn't. I think some deadlock might happen but I am not sure. I read online sources but couldn't figure out the solution and reason why it's happening?
For the full code, here is the gist link Full Source Code Link. What I am doing wrong here? I am new to Multiprocessing and MultiThreading. I would like to understand the concept in-depth. Links/Resources related to this topic are much appreciated.
UPDATE - The same code working perfectly on Mac OS.