diff --git a/python/genomicAnnotatorScripts/GenerateTranscriptToInfo.py b/python/genomicAnnotatorScripts/GenerateTranscriptToInfo.py index 16a6700a7..5da9cad62 100755 --- a/python/genomicAnnotatorScripts/GenerateTranscriptToInfo.py +++ b/python/genomicAnnotatorScripts/GenerateTranscriptToInfo.py @@ -2,9 +2,10 @@ import sys import os import re import traceback +import shlex, subprocess from optparse import OptionParser, OptionGroup from IndentedHelpFormatterWithNL import * - +import time # Init cmd-line args description = """ @@ -24,6 +25,7 @@ parser.add_option("-l", "--locally", dest="run_locally", action="store_true", de parser.add_option("-R", "--reference", metavar="PATH", dest="reference", help="Specifies the path of the reference file to use.", default="/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta") parser.add_option("-n", "--gene-name-columns", dest="gene_name_columns", metavar="GENE_NAMES", help="Comma-separated list of column names that contain gene names. This arg is passed through to the GenomicAnnotator. The GenomicAnnotator docs have more details on this.") parser.add_option("-q", "--queue", dest="queue", metavar="QUEUE", help="Specifies the LSF queue to use.", default="solexa") +parser.add_option("-s", "--num-parallel-processes", dest="num_parallel_processes", metavar="SLOTS", help="How many processes to launch simultaneously. This is only used when the -l option is set.", default="1") (options, args) = parser.parse_args() @@ -56,6 +58,7 @@ if not os.access(reference, os.R_OK): error("Couldn't access reference file: "+ reference) queue = options.queue +num_parallel_processes = int(options.num_parallel_processes) transcript_dir = os.path.dirname(transcript_table) logs_dir = os.path.join(transcript_dir,"logs") @@ -67,11 +70,37 @@ contigs += [ "chr" + str(x) for x in contig_chars ] contigs += [ "chr" + str(x) + "_random" for x in set( contig_chars ).difference(set(['M',12,14,20,'X','Y'])) ] # There are no "_random" chromosomes for chrM,12,14,20,Y + + if run: print("Deleting any previous logs...") os.system("rm " + os.path.join(logs_dir,"bsub_*_log.txt")) os.system("mkdir " + logs_dir) + +running_processes = [] +def execute(command, stdout_filename=None): + + # Wait until a slot becomes open + while len( running_processes ) + 1 >= num_parallel_processes: + # Check if any have ended + for process in running_processes: + if process.poll() != None: + print("Process [pid=" + str(process.pid) + "] finished with exit status: " + str(process.returncode)) + running_processes.remove(process) + break + else: + time.sleep(3) # Sleep for 3 seconds before checking again + + # A slot has opened up - start another process + stdout = None + if stdout_filename: + stdout = open(stdout_filename, "w+") + + + p = subprocess.Popen(shlex.split(command), stdout=stdout, stderr=subprocess.STDOUT) + running_processes.append(p) + for contig in contigs: if contig.count("random") or contig.lower().count("chrm"): MEMORY_USAGE = 10 # Gigabytes @@ -83,18 +112,16 @@ for contig in contigs: MEMORY_USAGE = 32 EXCLUSIVE = "" - command = "java -Xmx"+str(MEMORY_USAGE)+"g -jar dist/GenomeAnalysisTK.jar -T TranscriptToInfo -l info -R " + reference + " -B transcripts,AnnotatorInputTable,"+transcript_table+" -n "+gene_name_columns+" -o "+ os.path.join(transcript_dir,output_file_prefix) +"-big-table-ucsc-%s.txt -L %s:1+ " % (contig, contig) - - - if run_locally: - command += " > " + os.path.join(logs_dir,contig+"_log.txt") - else: + command = "java -Xmx"+str(MEMORY_USAGE)+"g -jar dist/GenomeAnalysisTK.jar -T TranscriptToInfo -l info -nt " + str(num_parallel_processes) + " -R " + reference + " -B transcripts,AnnotatorInputTable,"+transcript_table+" -n "+gene_name_columns+" -o "+ os.path.join(transcript_dir,output_file_prefix) +"-big-table-ucsc-%s.txt -L %s:1+ " % (contig, contig) + if not run_locally: command = "bsub "+EXCLUSIVE+" -q " + queue + " -R \"rusage[mem="+str(MEMORY_USAGE)+"]\" -o " + os.path.join(logs_dir,contig+"_log.txt") + " " + command - - if run: print("Executing: " + command) - os.system(command) + if run_locally: + #execute(command, os.path.join(logs_dir,contig+"_log.txt")) + execute(command + " >& " + os.path.join(logs_dir,contig+"_log.txt")) + else: + os.system(command) else: print(command)