I've run into a bit of an odd situation. I have a python script that counts features across chromosomes. I've parallelized the script so it sends each CPU (set by --threads) a number of chromosomes.
When I run this script on a single BAM file using 12 threads, it takes ~30 minutes to run.
My assumption was that each parallelized process in NF running this script with 12 threads in the exact same environment and hardware should take ~30 minutes.
This isn't the case. When running multiple BAM files on NF the process takes over 8 hours.
Any idea what's going on? Is there a conflict between how NF submits cpus and how python multiprocess uses them? I'm at a loss.
I've tried setting maxForks and using less cores and setting the executor.cpus to a specific number so that NF isn't using more resources than are available.
I've run into the same issue in another script. This one is slightly different as it simulates some genome features so I'll include the code as requested below:
CODE
def simulate_fragment(ref_genome, fragment_lengths, excluded_regions, _):
while True:
length = random.choice(fragment_lengths)
chrom = random.choice(list(ref_genome.keys()))
chrom_len = len(ref_genome[chrom])
start_pos = random.randint(0, chrom_len - length)
end_pos = start_pos + length
if is_excluded(chrom, start_pos, end_pos, excluded_regions):
continue
fragment = ref_genome[chrom][start_pos:end_pos].upper()
end_motif = fragment[:4]
if 'N' not in end_motif:
return (length, end_motif)
def simulate_attributes(ref_genome, fragment_lengths, excluded_regions, num_simulations, threads):
logger = setup_logging()
logger.info(f"Simulating {num_simulations} expected attributes ... Splitting into {num_simulations // threads} per thread ...")
simulations_per_thread = num_simulations // threads
pool = Pool(threads)
random_seeds = [random.randint(0, 999) for _ in range(threads)]
simulate_partial = partial(simulate_round, ref_genome, fragment_lengths, excluded_regions, simulations_per_thread)
results = pool.map(simulate_partial, random_seeds)
pool.close()
pool.join()
simulated_attributes = defaultdict(int)
for result in results:
for key, count in result.items():
simulated_attributes[key] += count
return simulated_attributes
def simulate_round(ref_genome, fragment_lengths, excluded_regions, num_simulations, random_seed):
random.seed(random_seed)
simulated_attributes = defaultdict(int)
simulate_partial = partial(simulate_fragment, ref_genome, fragment_lengths, excluded_regions)
results = [simulate_partial(_) for _ in range(num_simulations)]
for length, end_motif in results:
simulated_attributes[(length, end_motif)] += 1
return simulated_attributes
# Simulate expected attributes
expected_attributes = simulate_attributes(ref_genome, fragment_lengths, excluded_regions, args.num_simulations, args.threads)
MODULE
process EMCORRECTION {
tag "$meta.sample_id"
label 'process_highest'
container = 'ghcguzman/emcorrection.amd64'
input:
tuple val(meta), path(bam), path(bai)
path(genome)
path(blacklist)
output:
tuple val(meta), path("*.EMtagged.bam"), emit: bam
when:
task.ext.when == null || task.ext.when
script:
def args = task.ext.args ?: ''
"""
emcorrection.py \\
-i $bam \\
-r $genome \\
-e $blacklist \\
-o ${bam.baseName}.EMtagged.bam \\
--threads $task.cpus \\
--sort_bam
"""
}
CONFIG
params {
genome_2bit = 'hg19.2bit'
motif_beds = '*.bed'
}
executor {
$local {
cpus = 160
}
}
process {
cpus = { check_max( 10 * task.attempt, 'cpus' ) }
memory = { check_max( 10.GB * task.attempt, 'memory' ) }
time = { check_max( 8.h * task.attempt, 'time' ) }
errorStrategy = { task.exitStatus in [143,137,104,134,139] ? 'retry' : 'finish' }
maxRetries = 1
maxErrors = '-1'
withLabel:process_single {
cpus = { check_max( 1 , 'cpus' ) }
memory = { check_max( 6.GB * task.attempt, 'memory' ) }
time = { check_max( 4.h * task.attempt, 'time' ) }
}
withLabel:process_low {
cpus = { check_max( 2 * task.attempt, 'cpus' ) }
memory = { check_max( 6.GB * task.attempt, 'memory' ) }
time = { check_max( 4.h * task.attempt, 'time' ) }
}
withLabel:process_medium {
cpus = { check_max( 8 * task.attempt, 'cpus' ) }
memory = { check_max( 16.GB * task.attempt, 'memory' ) }
time = { check_max( 6.h * task.attempt, 'time' ) }
}
withLabel:process_high {
cpus = { check_max( 10 * task.attempt, 'cpus' ) }
memory = { check_max( 16.GB * task.attempt, 'memory' ) }
time = { check_max( 8.h * task.attempt, 'time' ) }
}
withLabel:process_highest {
cpus = { check_max( 16 * task.attempt, 'cpus' ) }
memory = { check_max( 20.GB * task.attempt, 'memory' ) }
time = { check_max( 10.h * task.attempt, 'time' ) }
}
withLabel:process_long {
time = { check_max( 30.h * task.attempt, 'time' ) }
}
withLabel:process_high_memory {
memory = { check_max( 200.GB * task.attempt, 'memory' ) }
}
withLabel:error_ignore {
errorStrategy = 'ignore'
}
withLabel:error_retry {
errorStrategy = 'retry'
maxRetries = 2
}
}
MAIN MODULE CALL
EMCORRECTION(
SAMTOOLS_INDEX_ONE.out.bam_bai,
ch_2bit,
ch_blacklist
)
maxForksto less than the number of bams, then you will increase runtime. Also, with the 30 minute example, how does this bam file compare to the rest? You could have just picked a small and relatively invariant file.