Skip to main content
1 of 2

Create 1% Sample Using Multiprocessing in Python

I'm trying to process a large dataset (300GB, myfile.txt in the script) line by line using multiprocessing. I want to define a 1% random sample based one variable (contained in unique_ids_final.txt). My first step is to define the sample and then I want to read the data file using multiprocessing. I would like to improve the efficiency of the code in two ways:

First, I'd like to pass the pct1 object from the parent to the child so that it only needs to be defined once. I've seen a description of this on https://thelaziestprogrammer.com/python/multiprocessing-pool-expect-initret-proposal, but I'm relatively new to python and don't understand how to implement it in my code.

Second, I'd like to define the chunks without reading in the data. In chunkify I get the start of each chunk and the number of bytes that need to be read by looking for the end of the line after reading in 1MB of data. I was hoping to use seek to move forward by 1MB and then find the end of the line, but this creates problems because later I need to read in the chunks and read treats /n as one byte, while seek treats it as two.

Any other suggestions to increase efficiency would also be much appreciated!

#define sample
uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
pct1 = round(len(uid)/100)
random.seed(1)
id_pct1 = set(random.sample(uid, k=pct1))
id_pct1.add(vname)

#read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
    with open('myfile.txt') as f:
        tlines = []
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            data = line.split('*')
            if data[30] in id_pct1: tlines.append(line)
        q.put(tlines)
        return tlines

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname, 'r') as f:
        chunkEnd2 = 0
        while True:
            chunkStart = chunkEnd2
            f.seek(chunkStart)
            f.read(size)
            chunkEnd1 = f.tell()
            f.readline()
            chunkEnd2 = f.tell()
            chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
            yield chunkStart, chunkSz
            if chunkEnd2 >= fileEnd:
                break

def listener(q):
    with open('myfile1pct.txt', 'w') as out_f1:
        while True:
            m = q.get()
            if m == 'kill': break
            else:
                for line in m:
                    out_f1.write(line+'\n')
                    out_f1.flush()

def main():

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool()

    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for chunkStart, chunkSize in chunkify('myfile.txt'):
        jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))

    for job in jobs:
        job.get()

    q.put('kill')
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()