2

I am trying to parallelize a data-processing task where I've wrapped my logic inside a class. Each instance of the class is supposed to log its own progress to a centralized log file. However, as soon as I call pool.map(), I get a traceback that I don't quite understand.

I have a DataProcessor class that holds a logger instance. I want to create several instances of this class and process them in parallel using a multiprocessing.Pool.

import logging
import multiprocessing

class DataProcessor:
    def __init__(self, name):
        self.name = name
        self.logger = logging.getLogger("ProcessorLogger")
        self.logger.setLevel(logging.INFO)

    def run(self, data):
        self.logger.info(f"{self.name} is processing {data}")
        return data * 2

def worker(obj_and_data):
    obj, data = obj_and_data
    return obj.run(data)

if __name__ == "__main__":
    tasks = [(DataProcessor(f"Proc-{i}"), i) for i in range(5)]
    
    with multiprocessing.Pool(processes=4) as pool:
        # This line triggers the error
        results = pool.map(worker, tasks)
        print(results)

When I run this, I get the following traceback:

Traceback (most recent call last):
  File "script.py", line 22, in <module>
    results = pool.map(worker, tasks)
  ...
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
AttributeError: Can't pickle _thread.lock objects

What I've Tried

  1. I tried making the logger a global variable, but I need different configurations for different processors.

  2. I tried using pathos.multiprocessing, which uses dill, but I still get serialization issues or the logs simply don't show up.

Why does the presence of self.logger prevent the class from being sent to the pool, and what is the "correct" way to handle logging in a class that needs to be pickled for multiprocessing?

New contributor
ssd is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.

1 Answer 1

3

Instead of keeping self.logger, fetch it inside run():

class DataProcessor:
    def __init__(self, name):
        self.name = name

    def run(self, data):
        logger = logging.getLogger("ProcessorLogger")
        logger.info(f"{self.name} is processing {data}")
        return data * 2
New contributor
ssd is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.