Added parallelization options - when running locally, multiple processes can be spawned, or a -nt arg can be specified to run each TranscriptToInfo instance multi-threaded

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@3507 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
weisburd 2010-06-08 22:48:07 +00:00
parent 92c72d3361
commit 410afcdf2c
1 changed files with 37 additions and 10 deletions

View File

@ -2,9 +2,10 @@ import sys
import os
import re
import traceback
import shlex, subprocess
from optparse import OptionParser, OptionGroup
from IndentedHelpFormatterWithNL import *
import time
# Init cmd-line args
description = """
@ -24,6 +25,7 @@ parser.add_option("-l", "--locally", dest="run_locally", action="store_true", de
parser.add_option("-R", "--reference", metavar="PATH", dest="reference", help="Specifies the path of the reference file to use.", default="/seq/references/Homo_sapiens_assembly18/v0/Homo_sapiens_assembly18.fasta")
parser.add_option("-n", "--gene-name-columns", dest="gene_name_columns", metavar="GENE_NAMES", help="Comma-separated list of column names that contain gene names. This arg is passed through to the GenomicAnnotator. The GenomicAnnotator docs have more details on this.")
parser.add_option("-q", "--queue", dest="queue", metavar="QUEUE", help="Specifies the LSF queue to use.", default="solexa")
parser.add_option("-s", "--num-parallel-processes", dest="num_parallel_processes", metavar="SLOTS", help="How many processes to launch simultaneously. This is only used when the -l option is set.", default="1")
(options, args) = parser.parse_args()
@ -56,6 +58,7 @@ if not os.access(reference, os.R_OK):
error("Couldn't access reference file: "+ reference)
queue = options.queue
num_parallel_processes = int(options.num_parallel_processes)
transcript_dir = os.path.dirname(transcript_table)
logs_dir = os.path.join(transcript_dir,"logs")
@ -67,11 +70,37 @@ contigs += [ "chr" + str(x) for x in contig_chars ]
contigs += [ "chr" + str(x) + "_random" for x in set( contig_chars ).difference(set(['M',12,14,20,'X','Y'])) ] # There are no "_random" chromosomes for chrM,12,14,20,Y
if run:
print("Deleting any previous logs...")
os.system("rm " + os.path.join(logs_dir,"bsub_*_log.txt"))
os.system("mkdir " + logs_dir)
running_processes = []
def execute(command, stdout_filename=None):
# Wait until a slot becomes open
while len( running_processes ) + 1 >= num_parallel_processes:
# Check if any have ended
for process in running_processes:
if process.poll() != None:
print("Process [pid=" + str(process.pid) + "] finished with exit status: " + str(process.returncode))
running_processes.remove(process)
break
else:
time.sleep(3) # Sleep for 3 seconds before checking again
# A slot has opened up - start another process
stdout = None
if stdout_filename:
stdout = open(stdout_filename, "w+")
p = subprocess.Popen(shlex.split(command), stdout=stdout, stderr=subprocess.STDOUT)
running_processes.append(p)
for contig in contigs:
if contig.count("random") or contig.lower().count("chrm"):
MEMORY_USAGE = 10 # Gigabytes
@ -83,18 +112,16 @@ for contig in contigs:
MEMORY_USAGE = 32
EXCLUSIVE = ""
command = "java -Xmx"+str(MEMORY_USAGE)+"g -jar dist/GenomeAnalysisTK.jar -T TranscriptToInfo -l info -R " + reference + " -B transcripts,AnnotatorInputTable,"+transcript_table+" -n "+gene_name_columns+" -o "+ os.path.join(transcript_dir,output_file_prefix) +"-big-table-ucsc-%s.txt -L %s:1+ " % (contig, contig)
if run_locally:
command += " > " + os.path.join(logs_dir,contig+"_log.txt")
else:
command = "java -Xmx"+str(MEMORY_USAGE)+"g -jar dist/GenomeAnalysisTK.jar -T TranscriptToInfo -l info -nt " + str(num_parallel_processes) + " -R " + reference + " -B transcripts,AnnotatorInputTable,"+transcript_table+" -n "+gene_name_columns+" -o "+ os.path.join(transcript_dir,output_file_prefix) +"-big-table-ucsc-%s.txt -L %s:1+ " % (contig, contig)
if not run_locally:
command = "bsub "+EXCLUSIVE+" -q " + queue + " -R \"rusage[mem="+str(MEMORY_USAGE)+"]\" -o " + os.path.join(logs_dir,contig+"_log.txt") + " " + command
if run:
print("Executing: " + command)
os.system(command)
if run_locally:
#execute(command, os.path.join(logs_dir,contig+"_log.txt"))
execute(command + " >& " + os.path.join(logs_dir,contig+"_log.txt"))
else:
os.system(command)
else:
print(command)