0

I've run into a bit of an odd situation. I have a python script that counts features across chromosomes. I've parallelized the script so it sends each CPU (set by --threads) a number of chromosomes.

When I run this script on a single BAM file using 12 threads, it takes ~30 minutes to run.

My assumption was that each parallelized process in NF running this script with 12 threads in the exact same environment and hardware should take ~30 minutes.

This isn't the case. When running multiple BAM files on NF the process takes over 8 hours.

Any idea what's going on? Is there a conflict between how NF submits cpus and how python multiprocess uses them? I'm at a loss.

I've tried setting maxForks and using less cores and setting the executor.cpus to a specific number so that NF isn't using more resources than are available.

I've run into the same issue in another script. This one is slightly different as it simulates some genome features so I'll include the code as requested below:

CODE

def simulate_fragment(ref_genome, fragment_lengths, excluded_regions, _):
    while True:
        length = random.choice(fragment_lengths)
        chrom = random.choice(list(ref_genome.keys()))
        chrom_len = len(ref_genome[chrom])
        start_pos = random.randint(0, chrom_len - length)
        end_pos = start_pos + length
        if is_excluded(chrom, start_pos, end_pos, excluded_regions):
            continue
        fragment = ref_genome[chrom][start_pos:end_pos].upper()
        end_motif = fragment[:4]
        if 'N' not in end_motif:
            return (length, end_motif)

def simulate_attributes(ref_genome, fragment_lengths, excluded_regions, num_simulations, threads):
    logger = setup_logging()
    logger.info(f"Simulating {num_simulations} expected attributes ... Splitting into {num_simulations // threads} per thread ...")

    simulations_per_thread = num_simulations // threads
    pool = Pool(threads)
    random_seeds = [random.randint(0, 999) for _ in range(threads)]
    simulate_partial = partial(simulate_round, ref_genome, fragment_lengths, excluded_regions, simulations_per_thread)
    results = pool.map(simulate_partial, random_seeds)
    pool.close()
    pool.join()

    simulated_attributes = defaultdict(int)
    for result in results:
        for key, count in result.items():
            simulated_attributes[key] += count

    return simulated_attributes

def simulate_round(ref_genome, fragment_lengths, excluded_regions, num_simulations, random_seed):
    random.seed(random_seed)
    simulated_attributes = defaultdict(int)
    simulate_partial = partial(simulate_fragment, ref_genome, fragment_lengths, excluded_regions)
    results = [simulate_partial(_) for _ in range(num_simulations)]
    for length, end_motif in results:
        simulated_attributes[(length, end_motif)] += 1
    return simulated_attributes

 # Simulate expected attributes
    expected_attributes = simulate_attributes(ref_genome, fragment_lengths, excluded_regions, args.num_simulations, args.threads)

MODULE

process EMCORRECTION {
    tag "$meta.sample_id"
    label 'process_highest'

    container = 'ghcguzman/emcorrection.amd64'

    input:
    tuple val(meta), path(bam), path(bai)
    path(genome)
    path(blacklist)

    output:
    tuple val(meta), path("*.EMtagged.bam"), emit: bam

    when:
    task.ext.when == null || task.ext.when

    script:
    def args = task.ext.args ?: ''
    """
    emcorrection.py \\
    -i $bam \\
    -r $genome \\
    -e $blacklist \\
    -o ${bam.baseName}.EMtagged.bam \\
    --threads $task.cpus \\
    --sort_bam
    """
}

CONFIG

params {
    genome_2bit = 'hg19.2bit'
    motif_beds  = '*.bed'
}

executor {
    $local {
        cpus = 160
    }
}

process {

    cpus   = { check_max( 10    * task.attempt, 'cpus'   ) }
    memory = { check_max( 10.GB * task.attempt, 'memory' ) }
    time   = { check_max( 8.h  * task.attempt, 'time'   ) }

    errorStrategy = { task.exitStatus in [143,137,104,134,139] ? 'retry' : 'finish' }
    maxRetries    = 1
    maxErrors     = '-1'

    withLabel:process_single {
        cpus   = { check_max( 1                  , 'cpus'    ) }
        memory = { check_max( 6.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 4.h  * task.attempt, 'time'    ) }
    }
    withLabel:process_low {
        cpus   = { check_max( 2     * task.attempt, 'cpus'    ) }
        memory = { check_max( 6.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 4.h   * task.attempt, 'time'    ) }
    }
    withLabel:process_medium {
        cpus   = { check_max( 8     * task.attempt, 'cpus'    ) }
        memory = { check_max( 16.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 6.h   * task.attempt, 'time'    ) }
    }
    withLabel:process_high {
        cpus   = { check_max( 10    * task.attempt, 'cpus'    ) }
        memory = { check_max( 16.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 8.h  * task.attempt, 'time'    ) }
    }
    withLabel:process_highest {
        cpus   = { check_max( 16    * task.attempt, 'cpus'    ) }
        memory = { check_max( 20.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 10.h  * task.attempt, 'time'    ) }
    }
    withLabel:process_long {
        time   = { check_max( 30.h  * task.attempt, 'time'    ) }
    }
    withLabel:process_high_memory {
        memory = { check_max( 200.GB * task.attempt, 'memory' ) }
    }
    withLabel:error_ignore {
        errorStrategy = 'ignore'
    }
    withLabel:error_retry {
        errorStrategy = 'retry'
        maxRetries    = 2
    }
}

MAIN MODULE CALL

EMCORRECTION(
            SAMTOOLS_INDEX_ONE.out.bam_bai,
            ch_2bit,
            ch_blacklist
        )
7
  • Show the relevant parts of your code. Are you allowing the nextflow process that runs your python script to use 12 threads? Commented Jun 4, 2024 at 7:47
  • As suggested, please add the code for the process, workflow declaration, and config file. If you've set maxForks to less than the number of bams, then you will increase runtime. Also, with the 30 minute example, how does this bam file compare to the rest? You could have just picked a small and relatively invariant file. Commented Jun 4, 2024 at 8:57
  • @Pallie I've added the relevant code that was requested! Hopefully this helps. Commented Jun 4, 2024 at 20:58
  • @dthorbur sorry forgot to @ both of you. Commented Jun 4, 2024 at 20:59
  • Additionally, I wanted to say that I only set maxForks to see if setting a lower number of NF processes would fix the issue. It does not. For the 30 minute sample, I actually chose the largest BAM file I had to do the tests on so this BAM file IS abnormal, but it is larger than all the other ones that are being asked to be processed. Commented Jun 4, 2024 at 21:38

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.