Skip to main content
Tweeted twitter.com/StackCodeReview/status/1207314750960472064
decorate code snippets as such, include title from hyperlink
Source Link
greybeard
  • 7.8k
  • 3
  • 21
  • 56

First, I'd like to pass the pct1 object from the parent to the child so that it only needs to be defined once. I've seen a description of this on https://thelaziestprogrammer.com/python: Pass Data to Workers w/multiprocessing-pool-expect-initret-proposalo Globals, but I'm relatively new to python and don't understand how to implement it in my code.

Second, I'd like to define the chunks without reading in the data. In chunkifychunkify() I get the start of each chunk and the number of bytes that need to be read by looking for the end of the line after reading in 1MB of data. I was hoping to use seek to move forward by 1MB and then find the end of the line, but this creates problems because later I need to read in the chunks and read treats /n'\n' as one byte, while seek treats it as two.

First, I'd like to pass the pct1 object from the parent to the child so that it only needs to be defined once. I've seen a description of this on https://thelaziestprogrammer.com/python/multiprocessing-pool-expect-initret-proposal, but I'm relatively new to python and don't understand how to implement it in my code.

Second, I'd like to define the chunks without reading in the data. In chunkify I get the start of each chunk and the number of bytes that need to be read by looking for the end of the line after reading in 1MB of data. I was hoping to use seek to move forward by 1MB and then find the end of the line, but this creates problems because later I need to read in the chunks and read treats /n as one byte, while seek treats it as two.

First, I'd like to pass the pct1 object from the parent to the child so that it only needs to be defined once. I've seen a description of this on thelaziestprogrammer.com: Pass Data to Workers w/o Globals, but I'm relatively new to python and don't understand how to implement it in my code.

Second, I'd like to define the chunks without reading in the data. In chunkify() I get the start of each chunk and the number of bytes that need to be read by looking for the end of the line after reading in 1MB of data. I was hoping to use seek to move forward by 1MB and then find the end of the line, but this creates problems because later I need to read in the chunks and read treats '\n' as one byte, while seek treats it as two.

Source Link

Create 1% Sample Using Multiprocessing in Python

I'm trying to process a large dataset (300GB, myfile.txt in the script) line by line using multiprocessing. I want to define a 1% random sample based one variable (contained in unique_ids_final.txt). My first step is to define the sample and then I want to read the data file using multiprocessing. I would like to improve the efficiency of the code in two ways:

First, I'd like to pass the pct1 object from the parent to the child so that it only needs to be defined once. I've seen a description of this on https://thelaziestprogrammer.com/python/multiprocessing-pool-expect-initret-proposal, but I'm relatively new to python and don't understand how to implement it in my code.

Second, I'd like to define the chunks without reading in the data. In chunkify I get the start of each chunk and the number of bytes that need to be read by looking for the end of the line after reading in 1MB of data. I was hoping to use seek to move forward by 1MB and then find the end of the line, but this creates problems because later I need to read in the chunks and read treats /n as one byte, while seek treats it as two.

Any other suggestions to increase efficiency would also be much appreciated!

#define sample
uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
pct1 = round(len(uid)/100)
random.seed(1)
id_pct1 = set(random.sample(uid, k=pct1))
id_pct1.add(vname)

#read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
    with open('myfile.txt') as f:
        tlines = []
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            data = line.split('*')
            if data[30] in id_pct1: tlines.append(line)
        q.put(tlines)
        return tlines

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname, 'r') as f:
        chunkEnd2 = 0
        while True:
            chunkStart = chunkEnd2
            f.seek(chunkStart)
            f.read(size)
            chunkEnd1 = f.tell()
            f.readline()
            chunkEnd2 = f.tell()
            chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
            yield chunkStart, chunkSz
            if chunkEnd2 >= fileEnd:
                break

def listener(q):
    with open('myfile1pct.txt', 'w') as out_f1:
        while True:
            m = q.get()
            if m == 'kill': break
            else:
                for line in m:
                    out_f1.write(line+'\n')
                    out_f1.flush()

def main():

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool()

    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for chunkStart, chunkSize in chunkify('myfile.txt'):
        jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))

    for job in jobs:
        job.get()

    q.put('kill')
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()