From 1a55227d5a97d8fe5e20b431b6f511b4f5497e6d Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 14 Jul 2018 12:15:10 -0400 Subject: [PATCH 1/8] write hits to tmp files (unfinished) --- Makefile | 2 +- index.c | 2 +- main.c | 2 ++ map.c | 73 +++++++++++++++++++++++++++++++++--------------------- minimap.h | 5 +++- misc.c | 20 +++++++++++++++ mmpriv.h | 4 +++ splitidx.c | 26 +++++++++++++++++++ 8 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 splitidx.c diff --git a/Makefile b/Makefile index f30c7b3..f3b96dc 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ CFLAGS= -g -Wall -O2 -Wc++-compat #-Wextra CPPFLAGS= -DHAVE_KALLOC INCLUDES= -OBJS= kthread.o kalloc.o misc.o bseq.o sketch.o sdust.o options.o index.o chain.o align.o hit.o map.o format.o pe.o esterr.o ksw2_ll_sse.o +OBJS= kthread.o kalloc.o misc.o bseq.o sketch.o sdust.o options.o index.o chain.o align.o hit.o map.o format.o pe.o esterr.o splitidx.o ksw2_ll_sse.o PROG= minimap2 PROG_EXTRA= sdust minimap2-lite LIBS= -lm -lz -lpthread diff --git a/index.c b/index.c index 3aa8df8..cfdef19 100644 --- a/index.c +++ b/index.c @@ -563,7 +563,7 @@ mm_idx_t *mm_idx_reader_read(mm_idx_reader_t *r, int n_threads) mi = mm_idx_gen(r->fp.seq, r->opt.w, r->opt.k, r->opt.bucket_bits, r->opt.flag, r->opt.mini_batch_size, n_threads, r->opt.batch_size); if (mi) { if (r->fp_out) mm_idx_dump(r->fp_out, mi); - ++r->n_parts; + mi->index = r->n_parts++; } return mi; } diff --git a/main.c b/main.c index 22d55e3..69e5dcc 100644 --- a/main.c +++ b/main.c @@ -61,6 +61,7 @@ static struct option long_options[] = { { "score-N", required_argument, 0, 0 }, // 31 { "eqx", no_argument, 0, 0 }, // 32 { "paf-no-hit", no_argument, 0, 0 }, // 33 + { "split-prefix", required_argument, 0, 0 }, // 34 { "help", no_argument, 0, 'h' }, { "max-intron-len", required_argument, 0, 'G' }, { "version", no_argument, 0, 'V' }, @@ -181,6 +182,7 @@ int main(int argc, char *argv[]) else if (c == 0 && long_idx ==31) opt.sc_ambi = atoi(optarg); // --score-N else if (c == 0 && long_idx ==32) opt.flag |= MM_F_EQX; // --eqx else if (c == 0 && long_idx ==33) opt.flag |= MM_F_PAF_NO_HIT; // --paf-no-hit + else if (c == 0 && long_idx ==34) opt.split_prefix = optarg; // --split-prefix else if (c == 0 && long_idx == 14) { // --frag yes_or_no(&opt, MM_F_FRAG_MODE, long_idx, optarg, 1); } else if (c == 0 && long_idx == 15) { // --secondary diff --git a/map.c b/map.c index 0864c09..9362f6a 100644 --- a/map.c +++ b/map.c @@ -262,7 +262,7 @@ static mm_reg1_t *align_regs(const mm_mapopt_t *opt, const mm_idx_t *mi, void *k return regs; } -void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) +int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) { int i, j, rep_len, qlen_sum, n_regs0, n_mini_pos; int max_chain_gap_qry, max_chain_gap_ref, is_splice = !!(opt->flag & MM_F_SPLICE), is_sr = !!(opt->flag & MM_F_SR); @@ -277,7 +277,7 @@ void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char ** for (i = 0, qlen_sum = 0; i < n_segs; ++i) qlen_sum += qlens[i], n_regs[i] = 0, regs[i] = 0; - if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return; + if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return 0; hash = qname? __ac_X31_hash_string(qname) : 0; hash ^= __ac_Wang_hash(qlen_sum) + __ac_Wang_hash(opt->seed); @@ -375,6 +375,7 @@ void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char ** b->km = km_init(); } } + return rep_len; } mm_reg1_t *mm_map(const mm_idx_t *mi, int qlen, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) @@ -394,13 +395,14 @@ typedef struct { mm_bseq_file_t **fp; const mm_idx_t *mi; kstring_t str; + FILE *fp_split; } pipeline_t; typedef struct { const pipeline_t *p; int n_seq, n_frag; mm_bseq1_t *seq; - int *n_reg, *seg_off, *n_seg; + int *n_reg, *seg_off, *n_seg, *rep_len; mm_reg1_t **reg; mm_tbuf_t **buf; } step_t; @@ -422,9 +424,12 @@ static void worker_for(void *_data, long i, int tid) // kt_for() callback } if (s->p->opt->flag & MM_F_INDEPEND_SEG) { for (j = 0; j < s->n_seg[i]; ++j) - mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); + s->rep_len[off + j] = mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); } else { - mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); + int rep_len; + rep_len = mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); + for (j = 0; j < s->n_seg[i]; ++j) + s->rep_len[off + j] = rep_len; } for (j = 0; j < s->n_seg[i]; ++j) // flip the query strand and coordinate to the original read strand if (s->n_seg[i] == 2 && ((j == 0 && (pe_ori>>1&1)) || (j == 1 && (pe_ori&1)))) { @@ -459,9 +464,10 @@ static void *worker_pipeline(void *shared, int step, void *in) s->buf = (mm_tbuf_t**)calloc(p->n_threads, sizeof(mm_tbuf_t*)); for (i = 0; i < p->n_threads; ++i) s->buf[i] = mm_tbuf_init(); - s->n_reg = (int*)calloc(3 * s->n_seq, sizeof(int)); - s->seg_off = s->n_reg + s->n_seq; // seg_off and n_seg are allocated together with n_reg + s->n_reg = (int*)calloc(4 * s->n_seq, sizeof(int)); + s->seg_off = s->n_reg + s->n_seq; // seg_off, n_seg and rep_len are allocated together with n_reg s->n_seg = s->seg_off + s->n_seq; + s->rep_len = s->n_seg + s->n_seq; s->reg = (mm_reg1_t**)calloc(s->n_seq, sizeof(mm_reg1_t*)); for (i = 1, j = 0; i <= s->n_seq; ++i) if (i == s->n_seq || !frag_mode || !mm_qname_same(s->seq[i-1].name, s->seq[i].name)) { @@ -485,28 +491,35 @@ static void *worker_pipeline(void *shared, int step, void *in) int seg_st = s->seg_off[k], seg_en = s->seg_off[k] + s->n_seg[k]; for (i = seg_st; i < seg_en; ++i) { mm_bseq1_t *t = &s->seq[i]; - for (j = 0; j < s->n_reg[i]; ++j) { - mm_reg1_t *r = &s->reg[i][j]; - assert(!r->sam_pri || r->id == r->parent); - if ((p->opt->flag & MM_F_NO_PRINT_2ND) && r->id != r->parent) - continue; - if (p->opt->flag & MM_F_OUT_SAM) - mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); - else - mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); - mm_err_puts(p->str.s); + if (p->opt->split_prefix) { + mm_err_fwrite(&s->n_reg[i], sizeof(s->n_reg[i]), 1, p->fp_split); + mm_err_fwrite(&s->rep_len[i], sizeof(s->rep_len[i]), 1, p->fp_split); } - if (s->n_reg[i] == 0) { - if (p->opt->flag & MM_F_OUT_SAM) { - mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); - mm_err_puts(p->str.s); - } else if (p->opt->flag & MM_F_PAF_NO_HIT) { - mm_write_paf(&p->str, mi, t, 0, 0, p->opt->flag); - mm_err_puts(p->str.s); + if (s->n_reg[i] > 0) { + for (j = 0; j < s->n_reg[i]; ++j) { + mm_reg1_t *r = &s->reg[i][j]; + assert(!r->sam_pri || r->id == r->parent); + if ((p->opt->flag & MM_F_NO_PRINT_2ND) && r->id != r->parent) + continue; + if (p->opt->split_prefix) { + mm_err_fwrite(r, sizeof(mm_reg1_t), 1, p->fp_split); + if (p->opt->flag & MM_F_CIGAR) { + mm_err_fwrite(&r->p->capacity, 4, 1, p->fp_split); + mm_err_fwrite(r->p, r->p->capacity, 4, p->fp_split); + } + } else { + if (p->opt->flag & MM_F_OUT_SAM) + mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + else + mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); + mm_err_puts(p->str.s); + } } - } - if (s->n_reg[i] == 0 && (p->opt->flag & MM_F_OUT_SAM)) { // write an unmapped record - mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + } else if (p->opt->flag & (MM_F_OUT_SAM|MM_F_PAF_NO_HIT)) { + if (p->opt->flag & MM_F_OUT_SAM) + mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + else + mm_write_paf(&p->str, mi, t, 0, 0, p->opt->flag); mm_err_puts(p->str.s); } } @@ -517,7 +530,7 @@ static void *worker_pipeline(void *shared, int step, void *in) if (s->seq[i].qual) free(s->seq[i].qual); } } - free(s->reg); free(s->n_reg); free(s->seq); // seg_off and n_seg were allocated with reg; no memory leak here + free(s->reg); free(s->n_reg); free(s->seq); // seg_off, n_seg and rep_len were allocated with reg; no memory leak here km_destroy(km); if (mm_verbose >= 3) fprintf(stderr, "[M::%s::%.3f*%.2f] mapped %d sequences\n", __func__, realtime() - mm_realtime0, cputime() / (realtime() - mm_realtime0), s->n_seq); @@ -548,9 +561,13 @@ int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_ pl.opt = opt, pl.mi = idx; pl.n_threads = n_threads > 1? n_threads : 1; pl.mini_batch_size = opt->mini_batch_size; + if (opt->split_prefix) + pl.fp_split = mm_split_init(opt->split_prefix, idx); pl_threads = n_threads == 1? 1 : (opt->flag&MM_F_2_IO_THREADS)? 3 : 2; kt_pipeline(pl_threads, worker_pipeline, &pl, 3); free(pl.str.s); + if (opt->split_prefix) + fclose(pl.fp_split); for (i = 0; i < n_segs; ++i) mm_bseq_close(pl.fp[i]); free(pl.fp); diff --git a/minimap.h b/minimap.h index c1a9267..e288d71 100644 --- a/minimap.h +++ b/minimap.h @@ -60,6 +60,7 @@ typedef struct { typedef struct { int32_t b, w, k, flag; uint32_t n_seq; // number of reference sequences + int32_t index; mm_idx_seq_t *seq; // sequence name, length and offset uint32_t *S; // 4-bit packed sequence struct mm_idx_bucket_s *B; // index (hidden) @@ -136,6 +137,8 @@ typedef struct { int32_t mid_occ; // ignore seeds with occurrences above this threshold int32_t max_occ; int mini_batch_size; // size of a batch of query bases to process in parallel + + const char *split_prefix; } mm_mapopt_t; // index reader @@ -318,7 +321,7 @@ void mm_tbuf_destroy(mm_tbuf_t *b); */ mm_reg1_t *mm_map(const mm_idx_t *mi, int l_seq, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *name); -void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); +int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); /** * Align a fasta/fastq file and print alignments to stdout diff --git a/misc.c b/misc.c index f7a43cb..930e6bf 100644 --- a/misc.c +++ b/misc.c @@ -131,6 +131,26 @@ void mm_err_puts(const char *str) } } +void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp) +{ + int ret; + ret = fwrite(p, size, nitems, fp); + if (ret == EOF) { + fprintf(stderr, "[ERROR] failed to write data\n"); + exit(EXIT_FAILURE); + } +} + +void mm_err_fread(void *p, size_t size, size_t nitems, FILE *fp) +{ + int ret; + ret = fread(p, size, nitems, fp); + if (ret == EOF) { + fprintf(stderr, "[ERROR] failed to read data\n"); + exit(EXIT_FAILURE); + } +} + #include "ksort.h" #define sort_key_128x(a) ((a).x) diff --git a/mmpriv.h b/mmpriv.h index 6ec4a21..a823d96 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -86,7 +86,11 @@ mm_seg_t *mm_seg_gen(void *km, uint32_t hash, int n_segs, const int *qlens, int void mm_seg_free(void *km, int n_segs, mm_seg_t *segs); void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs); +FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); + void mm_err_puts(const char *str); +void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp); +void mm_err_fread(void *p, size_t size, size_t nitems, FILE *fp); #ifdef __cplusplus } diff --git a/splitidx.c b/splitidx.c new file mode 100644 index 0000000..9c738a3 --- /dev/null +++ b/splitidx.c @@ -0,0 +1,26 @@ +#include +#include +#include +#include +#include "mmpriv.h" + +FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) +{ + char *fn; + FILE *fp; + uint32_t i; + fn = (char*)calloc(strlen(prefix) + 10, 1); + sprintf(fn, "%s.%.4d.tmp", prefix, mi->index); + fp = fopen(fn, "wb"); + assert(fp); + mm_err_fwrite(&mi->n_seq, 4, 1, fp); + for (i = 0; i < mi->n_seq; ++i) { + uint8_t l; + l = strlen(mi->seq[i].name); + mm_err_fwrite(&l, 1, 1, fp); + mm_err_fwrite(mi->seq[i].name, 1, l, fp); + mm_err_fwrite(&mi->seq[i].len, 4, 1, fp); + } + free(fn); + return fp; +} From e5277dbf5ca02c00e56d1820fedc9b3a95414a0b Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 14 Jul 2018 22:52:36 -0400 Subject: [PATCH 2/8] code backup --- main.c | 6 +- map.c | 158 ++++++++++++++++++++++++++++++++++++++++------------- mmpriv.h | 5 ++ splitidx.c | 51 ++++++++++++++++- 4 files changed, 181 insertions(+), 39 deletions(-) diff --git a/main.c b/main.c index 69e5dcc..d32fff0 100644 --- a/main.c +++ b/main.c @@ -102,7 +102,7 @@ int main(int argc, char *argv[]) const char *opt_str = "2aSDw:k:K:t:r:f:Vv:g:G:I:d:XT:s:x:Hcp:M:n:z:A:B:O:E:m:N:Qu:R:hF:LC:yY"; mm_mapopt_t opt; mm_idxopt_t ipt; - int i, c, n_threads = 3, long_idx; + int i, c, n_threads = 3, n_parts, long_idx; char *fnw = 0, *rg = 0, *s; FILE *fp_help = stderr; mm_idx_reader_t *idx_rdr; @@ -346,8 +346,12 @@ int main(int argc, char *argv[]) } mm_idx_destroy(mi); } + n_parts = idx_rdr->n_parts; mm_idx_reader_close(idx_rdr); + if (opt.split_prefix) + mm_split_merge(argc - (optind + 1), (const char**)&argv[optind + 1], &opt, n_parts); + if (fflush(stdout) == EOF) { fprintf(stderr, "[ERROR] failed to write the results\n"); exit(EXIT_FAILURE); diff --git a/map.c b/map.c index 9362f6a..8dc367d 100644 --- a/map.c +++ b/map.c @@ -395,7 +395,9 @@ typedef struct { mm_bseq_file_t **fp; const mm_idx_t *mi; kstring_t str; - FILE *fp_split; + + int n_parts; + FILE *fp_split, **fp_parts; } pipeline_t; typedef struct { @@ -445,6 +447,53 @@ static void worker_for(void *_data, long i, int tid) // kt_for() callback } } +static void merge_hits(step_t *s) +{ + int f, i, k = 0, *n_reg_part, *rep_len_part; + void *km; + FILE **fp = s->p->fp_parts; + const mm_mapopt_t *opt = s->p->opt; + + km = km_init(); + n_reg_part = CALLOC(int, s->p->n_parts * 2); + rep_len_part = n_reg_part + s->p->n_parts; + for (f = 0; s->n_frag; ++f) { + for (i = 0; i < s->n_seg[f]; ++i, ++k) { + int j, l, t, rep_len = 0; + for (j = 0, s->n_reg[k] = 0; j < s->p->n_parts; ++j) { + mm_err_fread(&n_reg_part[j], sizeof(int), 1, fp[j]); + mm_err_fread(&rep_len_part[j], sizeof(int), 1, fp[j]); + s->n_reg[k] += n_reg_part[j]; + if (rep_len < rep_len_part[j]) + rep_len = rep_len_part[j]; + } + s->reg[k] = CALLOC(mm_reg1_t, s->n_reg[k]); + for (j = 0, l = 0; j < s->p->n_parts; ++j) { + for (t = 0; t < n_reg_part[j]; ++t, ++l) { + mm_reg1_t *r = &s->reg[k][l]; + uint32_t capacity; + mm_err_fread(r, sizeof(mm_reg1_t), 1, fp[j]); + if (opt->flag & MM_F_CIGAR) { + mm_err_fread(&capacity, 4, 1, fp[j]); + r->p = (mm_extra_t*)calloc(capacity, 4); + r->p->capacity = capacity; + mm_err_fread(r->p, r->p->capacity, 4, fp[j]); + } + } + } + mm_set_parent(km, opt->mask_level, s->n_reg[k], s->reg[k], opt->a * 2 + opt->b); + if (!(opt->flag & MM_F_ALL_CHAINS)) { + mm_select_sub(km, opt->pri_ratio, s->p->mi->k*2, opt->best_n, &s->n_reg[k], s->reg[k]); + mm_set_sam_pri(s->n_reg[k], s->reg[k]); + } + mm_set_mapq(km, s->n_reg[k], s->reg[k], opt->min_chain_score, opt->a, rep_len, !!(opt->flag & MM_F_SR)); + //mm_pair(b->km, max_chain_gap_ref, opt->pe_bonus, opt->a * 2 + opt->b, opt->a, qlens, n_regs, regs); // TODO: this needs to be called, but not here + } + } + free(n_reg_part); + km_destroy(km); +} + static void *worker_pipeline(void *shared, int step, void *in) { int i, j, k; @@ -478,7 +527,8 @@ static void *worker_pipeline(void *shared, int step, void *in) return s; } else free(s); } else if (step == 1) { // step 1: map - kt_for(p->n_threads, worker_for, in, ((step_t*)in)->n_frag); + if (p->n_parts > 0) merge_hits((step_t*)in); + else kt_for(p->n_threads, worker_for, in, ((step_t*)in)->n_frag); return in; } else if (step == 2) { // step 2: output void *km = 0; @@ -491,31 +541,30 @@ static void *worker_pipeline(void *shared, int step, void *in) int seg_st = s->seg_off[k], seg_en = s->seg_off[k] + s->n_seg[k]; for (i = seg_st; i < seg_en; ++i) { mm_bseq1_t *t = &s->seq[i]; - if (p->opt->split_prefix) { - mm_err_fwrite(&s->n_reg[i], sizeof(s->n_reg[i]), 1, p->fp_split); - mm_err_fwrite(&s->rep_len[i], sizeof(s->rep_len[i]), 1, p->fp_split); - } - if (s->n_reg[i] > 0) { + if (p->opt->split_prefix && p->n_parts == 0) { // then write to temporary files + mm_err_fwrite(&s->n_reg[i], sizeof(int), 1, p->fp_split); + mm_err_fwrite(&s->rep_len[i], sizeof(int), 1, p->fp_split); + for (j = 0; j < s->n_reg[i]; ++j) { + mm_reg1_t *r = &s->reg[i][j]; + mm_err_fwrite(r, sizeof(mm_reg1_t), 1, p->fp_split); + if (p->opt->flag & MM_F_CIGAR) { + mm_err_fwrite(&r->p->capacity, 4, 1, p->fp_split); + mm_err_fwrite(r->p, r->p->capacity, 4, p->fp_split); + } + } + } else if (s->n_reg[i] > 0) { // the query has at least one hit for (j = 0; j < s->n_reg[i]; ++j) { mm_reg1_t *r = &s->reg[i][j]; assert(!r->sam_pri || r->id == r->parent); if ((p->opt->flag & MM_F_NO_PRINT_2ND) && r->id != r->parent) continue; - if (p->opt->split_prefix) { - mm_err_fwrite(r, sizeof(mm_reg1_t), 1, p->fp_split); - if (p->opt->flag & MM_F_CIGAR) { - mm_err_fwrite(&r->p->capacity, 4, 1, p->fp_split); - mm_err_fwrite(r->p, r->p->capacity, 4, p->fp_split); - } - } else { - if (p->opt->flag & MM_F_OUT_SAM) - mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); - else - mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); - mm_err_puts(p->str.s); - } + if (p->opt->flag & MM_F_OUT_SAM) + mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + else + mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); + mm_err_puts(p->str.s); } - } else if (p->opt->flag & (MM_F_OUT_SAM|MM_F_PAF_NO_HIT)) { + } else if (p->opt->flag & (MM_F_OUT_SAM|MM_F_PAF_NO_HIT)) { // output an empty hit, if requested if (p->opt->flag & MM_F_OUT_SAM) mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); else @@ -539,25 +588,33 @@ static void *worker_pipeline(void *shared, int step, void *in) return 0; } +static mm_bseq_file_t **open_bseqs(int n, const char **fn) +{ + mm_bseq_file_t **fp; + int i, j; + fp = (mm_bseq_file_t**)calloc(n, sizeof(mm_bseq_file_t*)); + for (i = 0; i < n; ++i) { + if ((fp[i] = mm_bseq_open(fn[i])) == 0) { + if (mm_verbose >= 1) + fprintf(stderr, "ERROR: failed to open file '%s'\n", fn[i]); + for (j = 0; j < i; ++j) + mm_bseq_close(fp[j]); + free(fp); + return 0; + } + } + return fp; +} + int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_mapopt_t *opt, int n_threads) { - int i, j, pl_threads; + int i, pl_threads; pipeline_t pl; if (n_segs < 1) return -1; memset(&pl, 0, sizeof(pipeline_t)); pl.n_fp = n_segs; - pl.fp = (mm_bseq_file_t**)calloc(n_segs, sizeof(mm_bseq_file_t*)); - for (i = 0; i < n_segs; ++i) { - pl.fp[i] = mm_bseq_open(fn[i]); - if (pl.fp[i] == 0) { - if (mm_verbose >= 1) - fprintf(stderr, "ERROR: failed to open file '%s'\n", fn[i]); - for (j = 0; j < i; ++j) - mm_bseq_close(pl.fp[j]); - free(pl.fp); - return -1; - } - } + pl.fp = open_bseqs(pl.n_fp, fn); + if (pl.fp == 0) return -1; pl.opt = opt, pl.mi = idx; pl.n_threads = n_threads > 1? n_threads : 1; pl.mini_batch_size = opt->mini_batch_size; @@ -565,10 +622,10 @@ int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_ pl.fp_split = mm_split_init(opt->split_prefix, idx); pl_threads = n_threads == 1? 1 : (opt->flag&MM_F_2_IO_THREADS)? 3 : 2; kt_pipeline(pl_threads, worker_pipeline, &pl, 3); + free(pl.str.s); - if (opt->split_prefix) - fclose(pl.fp_split); - for (i = 0; i < n_segs; ++i) + if (pl.fp_split) fclose(pl.fp_split); + for (i = 0; i < pl.n_fp; ++i) mm_bseq_close(pl.fp[i]); free(pl.fp); return 0; @@ -578,3 +635,30 @@ int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int { return mm_map_file_frag(idx, 1, &fn, opt, n_threads); } + +int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx) +{ + int i; + pipeline_t pl; + mm_idx_t *mi; + if (n_segs < 1 || n_split_idx < 1) return -1; + memset(&pl, 0, sizeof(pipeline_t)); + pl.n_fp = n_segs; + pl.fp = open_bseqs(pl.n_fp, fn); + if (pl.fp == 0) return -1; + pl.opt = opt; + pl.n_parts = n_split_idx; + pl.fp_parts = mm_split_merge_prep(opt->split_prefix, n_split_idx, &mi); + if (pl.fp_parts == 0) return -1; + pl.mi = mi; + pl.mini_batch_size = opt->mini_batch_size; + kt_pipeline(2, worker_pipeline, &pl, 3); + free(pl.str.s); + for (i = 0; i < n_split_idx; ++i) + fclose(pl.fp_parts[i]); + free(pl.fp_parts); + for (i = 0; i < pl.n_fp; ++i) + mm_bseq_close(pl.fp[i]); + free(pl.fp); + return 0; +} diff --git a/mmpriv.h b/mmpriv.h index a823d96..99272a0 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -28,6 +28,9 @@ #define mm_seq4_set(s, i, c) ((s)[(i)>>3] |= (uint32_t)(c) << (((i)&7)<<2)) #define mm_seq4_get(s, i) ((s)[(i)>>3] >> (((i)&7)<<2) & 0xf) +#define MALLOC(type, len) ((type*)malloc((len) * sizeof(type))) +#define CALLOC(type, len) ((type*)calloc((len), sizeof(type))) + #ifdef __cplusplus extern "C" { #endif @@ -87,6 +90,8 @@ void mm_seg_free(void *km, int n_segs, mm_seg_t *segs); void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs); FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); +FILE **mm_split_merge_prep(const char *prefix, int n_splits, mm_idx_t **mi_); +int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx); void mm_err_puts(const char *str); void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp); diff --git a/splitidx.c b/splitidx.c index 9c738a3..db0217c 100644 --- a/splitidx.c +++ b/splitidx.c @@ -8,11 +8,12 @@ FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) { char *fn; FILE *fp; - uint32_t i; + uint32_t i, k = mi->k; fn = (char*)calloc(strlen(prefix) + 10, 1); sprintf(fn, "%s.%.4d.tmp", prefix, mi->index); fp = fopen(fn, "wb"); assert(fp); + mm_err_fwrite(&k, 4, 1, fp); mm_err_fwrite(&mi->n_seq, 4, 1, fp); for (i = 0; i < mi->n_seq; ++i) { uint8_t l; @@ -24,3 +25,51 @@ FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) free(fn); return fp; } + +FILE **mm_split_merge_prep(const char *prefix, int n_splits, mm_idx_t **mi_) +{ + mm_idx_t *mi = 0; + FILE **fp; + char *fn; + int i, j; + uint32_t m_seq = 0; + + if (n_splits < 1) return 0; + *mi_ = 0; + fp = (FILE**)calloc(n_splits, sizeof(FILE*)); + fn = (char*)calloc(strlen(prefix) + 10, 1); + for (i = 0; i < n_splits; ++i) { + sprintf(fn, "%s.%.4d.tmp", prefix, i); + if ((fp[i] = fopen(fn, "rb")) == 0) { + if (mm_verbose >= 1) + fprintf(stderr, "ERROR: failed to open temporary file '%s'\n", fn); + for (j = 0; j < i; ++j) + fclose(fp[j]); + free(fp); + return 0; + } + } + free(fn); + mi = (mm_idx_t*)calloc(1, sizeof(mm_idx_t)); + for (i = 0; i < n_splits; ++i) { + uint32_t k, n; + mm_err_fread(&k, 4, 1, fp[i]); + mm_err_fread(&n, 4, 1, fp[i]); + mi->k = k; + if (mi->n_seq + n > m_seq) { + m_seq = mi->n_seq + n; + kroundup32(m_seq); + mi->seq = (mm_idx_seq_t*)realloc(mi->seq, sizeof(mm_idx_seq_t) * m_seq); + } + for (k = 0; k < n; ++i) { + uint8_t l; + mm_err_fread(&l, 1, 1, fp[i]); + mi->seq[mi->n_seq].name = (char*)calloc(l + 1, 1); + mm_err_fread(mi->seq[mi->n_seq].name, 1, l, fp[i]); + mm_err_fread(&mi->seq[mi->n_seq].len, 4, 1, fp[i]); + ++mi->n_seq; + } + } + *mi_ = mi; + return fp; +} From 3545e35a429d221c1f3ee20e6728b9a1945ea04a Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 14 Jul 2018 23:43:34 -0400 Subject: [PATCH 3/8] pairing in the split-idx mode --- map.c | 60 +++++++++++++++++++++++++++++++++++-------------------- minimap.h | 2 +- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/map.c b/map.c index 8dc367d..5ce8523 100644 --- a/map.c +++ b/map.c @@ -11,6 +11,7 @@ struct mm_tbuf_s { void *km; + int rep_len, frag_gap; }; mm_tbuf_t *mm_tbuf_init(void) @@ -262,7 +263,7 @@ static mm_reg1_t *align_regs(const mm_mapopt_t *opt, const mm_idx_t *mi, void *k return regs; } -int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) +void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) { int i, j, rep_len, qlen_sum, n_regs0, n_mini_pos; int max_chain_gap_qry, max_chain_gap_ref, is_splice = !!(opt->flag & MM_F_SPLICE), is_sr = !!(opt->flag & MM_F_SR); @@ -277,7 +278,7 @@ int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **s for (i = 0, qlen_sum = 0; i < n_segs; ++i) qlen_sum += qlens[i], n_regs[i] = 0, regs[i] = 0; - if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return 0; + if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return; hash = qname? __ac_X31_hash_string(qname) : 0; hash ^= __ac_Wang_hash(qlen_sum) + __ac_Wang_hash(opt->seed); @@ -330,6 +331,8 @@ int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **s a = mm_chain_dp(max_chain_gap_ref, max_chain_gap_qry, opt->bw, opt->max_chain_skip, opt->min_cnt, opt->min_chain_score, is_splice, n_segs, n_a, a, &n_regs0, &u, b->km); } } + b->frag_gap = max_chain_gap_ref; + b->rep_len = rep_len; regs0 = mm_gen_regs(b->km, hash, qlen_sum, n_regs0, u, a); @@ -375,7 +378,6 @@ int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **s b->km = km_init(); } } - return rep_len; } mm_reg1_t *mm_map(const mm_idx_t *mi, int qlen, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) @@ -404,7 +406,7 @@ typedef struct { const pipeline_t *p; int n_seq, n_frag; mm_bseq1_t *seq; - int *n_reg, *seg_off, *n_seg, *rep_len; + int *n_reg, *seg_off, *n_seg, *rep_len, *frag_gap; mm_reg1_t **reg; mm_tbuf_t **buf; } step_t; @@ -425,13 +427,17 @@ static void worker_for(void *_data, long i, int tid) // kt_for() callback qseqs[j] = s->seq[off + j].seq; } if (s->p->opt->flag & MM_F_INDEPEND_SEG) { - for (j = 0; j < s->n_seg[i]; ++j) - s->rep_len[off + j] = mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); + for (j = 0; j < s->n_seg[i]; ++j) { + mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); + s->rep_len[off + j] = b->rep_len; + s->frag_gap[off + j] = b->frag_gap; + } } else { - int rep_len; - rep_len = mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); - for (j = 0; j < s->n_seg[i]; ++j) - s->rep_len[off + j] = rep_len; + mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); + for (j = 0; j < s->n_seg[i]; ++j) { + s->rep_len[off + j] = b->rep_len; + s->frag_gap[off + j] = b->frag_gap; + } } for (j = 0; j < s->n_seg[i]; ++j) // flip the query strand and coordinate to the original read strand if (s->n_seg[i] == 2 && ((j == 0 && (pe_ori>>1&1)) || (j == 1 && (pe_ori&1)))) { @@ -449,20 +455,27 @@ static void worker_for(void *_data, long i, int tid) // kt_for() callback static void merge_hits(step_t *s) { - int f, i, k = 0, *n_reg_part, *rep_len_part; + int f, i, k0, k, max_seg = 0, *n_reg_part, *rep_len_part, *frag_gap_part, *qlens; void *km; FILE **fp = s->p->fp_parts; const mm_mapopt_t *opt = s->p->opt; km = km_init(); - n_reg_part = CALLOC(int, s->p->n_parts * 2); + for (f = 0; f < s->n_frag; ++f) + max_seg = max_seg > s->n_seg[f]? max_seg : s->n_seg[f]; + qlens = CALLOC(int, max_seg + s->p->n_parts * 3); + n_reg_part = qlens + max_seg; rep_len_part = n_reg_part + s->p->n_parts; - for (f = 0; s->n_frag; ++f) { + frag_gap_part = rep_len_part + s->p->n_parts; + for (f = 0, k = k0 = 0; f < s->n_frag; ++f) { + k0 = k; for (i = 0; i < s->n_seg[f]; ++i, ++k) { int j, l, t, rep_len = 0; + qlens[i] = s->seq[k].l_seq; for (j = 0, s->n_reg[k] = 0; j < s->p->n_parts; ++j) { - mm_err_fread(&n_reg_part[j], sizeof(int), 1, fp[j]); - mm_err_fread(&rep_len_part[j], sizeof(int), 1, fp[j]); + mm_err_fread(&n_reg_part[j], sizeof(int), 1, fp[j]); + mm_err_fread(&rep_len_part[j], sizeof(int), 1, fp[j]); + mm_err_fread(&frag_gap_part[j], sizeof(int), 1, fp[j]); s->n_reg[k] += n_reg_part[j]; if (rep_len < rep_len_part[j]) rep_len = rep_len_part[j]; @@ -487,10 +500,11 @@ static void merge_hits(step_t *s) mm_set_sam_pri(s->n_reg[k], s->reg[k]); } mm_set_mapq(km, s->n_reg[k], s->reg[k], opt->min_chain_score, opt->a, rep_len, !!(opt->flag & MM_F_SR)); - //mm_pair(b->km, max_chain_gap_ref, opt->pe_bonus, opt->a * 2 + opt->b, opt->a, qlens, n_regs, regs); // TODO: this needs to be called, but not here } + if (s->n_seg[f] == 2 && opt->pe_ori >= 0 && (opt->flag&MM_F_CIGAR)) + mm_pair(km, frag_gap_part[0], opt->pe_bonus, opt->a * 2 + opt->b, opt->a, qlens, &s->n_reg[k0], &s->reg[k0]); } - free(n_reg_part); + free(qlens); km_destroy(km); } @@ -513,10 +527,11 @@ static void *worker_pipeline(void *shared, int step, void *in) s->buf = (mm_tbuf_t**)calloc(p->n_threads, sizeof(mm_tbuf_t*)); for (i = 0; i < p->n_threads; ++i) s->buf[i] = mm_tbuf_init(); - s->n_reg = (int*)calloc(4 * s->n_seq, sizeof(int)); - s->seg_off = s->n_reg + s->n_seq; // seg_off, n_seg and rep_len are allocated together with n_reg + s->n_reg = (int*)calloc(5 * s->n_seq, sizeof(int)); + s->seg_off = s->n_reg + s->n_seq; // seg_off, n_seg, rep_len and frag_gap are allocated together with n_reg s->n_seg = s->seg_off + s->n_seq; s->rep_len = s->n_seg + s->n_seq; + s->frag_gap = s->rep_len + s->n_seq; s->reg = (mm_reg1_t**)calloc(s->n_seq, sizeof(mm_reg1_t*)); for (i = 1, j = 0; i <= s->n_seq; ++i) if (i == s->n_seq || !frag_mode || !mm_qname_same(s->seq[i-1].name, s->seq[i].name)) { @@ -542,8 +557,9 @@ static void *worker_pipeline(void *shared, int step, void *in) for (i = seg_st; i < seg_en; ++i) { mm_bseq1_t *t = &s->seq[i]; if (p->opt->split_prefix && p->n_parts == 0) { // then write to temporary files - mm_err_fwrite(&s->n_reg[i], sizeof(int), 1, p->fp_split); - mm_err_fwrite(&s->rep_len[i], sizeof(int), 1, p->fp_split); + mm_err_fwrite(&s->n_reg[i], sizeof(int), 1, p->fp_split); + mm_err_fwrite(&s->rep_len[i], sizeof(int), 1, p->fp_split); + mm_err_fwrite(&s->frag_gap[i], sizeof(int), 1, p->fp_split); for (j = 0; j < s->n_reg[i]; ++j) { mm_reg1_t *r = &s->reg[i][j]; mm_err_fwrite(r, sizeof(mm_reg1_t), 1, p->fp_split); @@ -579,7 +595,7 @@ static void *worker_pipeline(void *shared, int step, void *in) if (s->seq[i].qual) free(s->seq[i].qual); } } - free(s->reg); free(s->n_reg); free(s->seq); // seg_off, n_seg and rep_len were allocated with reg; no memory leak here + free(s->reg); free(s->n_reg); free(s->seq); // seg_off, n_seg, rep_len and frag_gap were allocated with reg; no memory leak here km_destroy(km); if (mm_verbose >= 3) fprintf(stderr, "[M::%s::%.3f*%.2f] mapped %d sequences\n", __func__, realtime() - mm_realtime0, cputime() / (realtime() - mm_realtime0), s->n_seq); diff --git a/minimap.h b/minimap.h index e288d71..8d708db 100644 --- a/minimap.h +++ b/minimap.h @@ -321,7 +321,7 @@ void mm_tbuf_destroy(mm_tbuf_t *b); */ mm_reg1_t *mm_map(const mm_idx_t *mi, int l_seq, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *name); -int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); +void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); /** * Align a fasta/fastq file and print alignments to stdout From 951c0d1d358044153af37def229ebad46bb4ec35 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 14 Jul 2018 23:47:44 -0400 Subject: [PATCH 4/8] apparently mm_append_cigar() wastes some memory --- align.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/align.c b/align.c index b3314eb..c86e4c1 100644 --- a/align.c +++ b/align.c @@ -199,12 +199,12 @@ static void mm_append_cigar(mm_reg1_t *r, uint32_t n_cigar, uint32_t *cigar) // mm_extra_t *p; if (n_cigar == 0) return; if (r->p == 0) { - uint32_t capacity = n_cigar + sizeof(mm_extra_t); // TODO: should this be "n_cigar + sizeof(mm_extra_t)/4" instead? + uint32_t capacity = n_cigar + sizeof(mm_extra_t)/4; // TODO: should this be "n_cigar + sizeof(mm_extra_t)/4" instead? kroundup32(capacity); r->p = (mm_extra_t*)calloc(capacity, 4); r->p->capacity = capacity; - } else if (r->p->n_cigar + n_cigar + sizeof(mm_extra_t) > r->p->capacity) { - r->p->capacity = r->p->n_cigar + n_cigar + sizeof(mm_extra_t); + } else if (r->p->n_cigar + n_cigar + sizeof(mm_extra_t)/4 > r->p->capacity) { + r->p->capacity = r->p->n_cigar + n_cigar + sizeof(mm_extra_t)/4; kroundup32(r->p->capacity); r->p = (mm_extra_t*)realloc(r->p, r->p->capacity * 4); } From 4b707aac9226aaf5fbaea45ed6d78f797f8c8432 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 15 Jul 2018 10:55:00 -0400 Subject: [PATCH 5/8] working with toy examples --- align.c | 4 ++-- hit.c | 14 ++++++++++---- map.c | 24 ++++++++++++++++++++---- mmpriv.h | 4 ++-- splitidx.c | 37 +++++++++++++++---------------------- 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/align.c b/align.c index c86e4c1..b22a3f2 100644 --- a/align.c +++ b/align.c @@ -199,7 +199,7 @@ static void mm_append_cigar(mm_reg1_t *r, uint32_t n_cigar, uint32_t *cigar) // mm_extra_t *p; if (n_cigar == 0) return; if (r->p == 0) { - uint32_t capacity = n_cigar + sizeof(mm_extra_t)/4; // TODO: should this be "n_cigar + sizeof(mm_extra_t)/4" instead? + uint32_t capacity = n_cigar + sizeof(mm_extra_t)/4; kroundup32(capacity); r->p = (mm_extra_t*)calloc(capacity, 4); r->p->capacity = capacity; @@ -878,6 +878,6 @@ mm_reg1_t *mm_align_skeleton(void *km, const mm_mapopt_t *opt, const mm_idx_t *m kfree(km, qseq0[0]); kfree(km, ez.cigar); mm_filter_regs(opt, qlen, n_regs_, regs); - mm_hit_sort_by_dp(km, n_regs_, regs); + mm_hit_sort(km, n_regs_, regs); return regs; } diff --git a/hit.c b/hit.c index 972bdb6..9616bf5 100644 --- a/hit.c +++ b/hit.c @@ -164,9 +164,9 @@ set_parent_test: kfree(km, w); } -void mm_hit_sort_by_dp(void *km, int *n_regs, mm_reg1_t *r) +void mm_hit_sort(void *km, int *n_regs, mm_reg1_t *r) { - int32_t i, n_aux, n = *n_regs; + int32_t i, n_aux, n = *n_regs, has_cigar = 0, no_cigar = 0; mm128_t *aux; mm_reg1_t *t; @@ -175,14 +175,20 @@ void mm_hit_sort_by_dp(void *km, int *n_regs, mm_reg1_t *r) t = (mm_reg1_t*)kmalloc(km, n * sizeof(mm_reg1_t)); for (i = n_aux = 0; i < n; ++i) { if (r[i].inv || r[i].cnt > 0) { // squeeze out elements with cnt==0 (soft deleted) - assert(r[i].p); - aux[n_aux].x = (uint64_t)r[i].p->dp_max << 32 | r[i].hash; + if (r[i].p) { + aux[n_aux].x = (uint64_t)r[i].p->dp_max << 32 | r[i].hash; + has_cigar = 1; + } else { + aux[n_aux].x = (uint64_t)r[i].score << 32 | r[i].hash; + no_cigar = 1; + } aux[n_aux++].y = i; } else if (r[i].p) { free(r[i].p); r[i].p = 0; } } + assert(has_cigar + no_cigar == 1); radix_sort_128x(aux, aux + n_aux); for (i = n_aux - 1; i >= 0; --i) t[n_aux - 1 - i] = r[aux[i].y]; diff --git a/map.c b/map.c index 5ce8523..be4ae74 100644 --- a/map.c +++ b/map.c @@ -399,6 +399,7 @@ typedef struct { kstring_t str; int n_parts; + uint32_t *rid_shift; FILE *fp_split, **fp_parts; } pipeline_t; @@ -486,6 +487,7 @@ static void merge_hits(step_t *s) mm_reg1_t *r = &s->reg[k][l]; uint32_t capacity; mm_err_fread(r, sizeof(mm_reg1_t), 1, fp[j]); + r->rid += s->p->rid_shift[j]; if (opt->flag & MM_F_CIGAR) { mm_err_fread(&capacity, 4, 1, fp[j]); r->p = (mm_extra_t*)calloc(capacity, 4); @@ -494,6 +496,7 @@ static void merge_hits(step_t *s) } } } + mm_hit_sort(km, &s->n_reg[k], s->reg[k]); mm_set_parent(km, opt->mask_level, s->n_reg[k], s->reg[k], opt->a * 2 + opt->b); if (!(opt->flag & MM_F_ALL_CHAINS)) { mm_select_sub(km, opt->pri_ratio, s->p->mi->k*2, opt->best_n, &s->n_reg[k], s->reg[k]); @@ -663,13 +666,26 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp pl.fp = open_bseqs(pl.n_fp, fn); if (pl.fp == 0) return -1; pl.opt = opt; - pl.n_parts = n_split_idx; - pl.fp_parts = mm_split_merge_prep(opt->split_prefix, n_split_idx, &mi); - if (pl.fp_parts == 0) return -1; - pl.mi = mi; pl.mini_batch_size = opt->mini_batch_size; + + pl.n_parts = n_split_idx; + pl.fp_parts = CALLOC(FILE*, pl.n_parts); + pl.rid_shift = CALLOC(uint32_t, pl.n_parts); + pl.mi = mm_split_merge_prep(opt->split_prefix, n_split_idx, pl.fp_parts, pl.rid_shift); + if (pl.mi == 0) { + free(pl.fp_parts); + free(pl.rid_shift); + return -1; + } + for (i = n_split_idx - 1; i > 0; --i) + pl.rid_shift[i] = pl.rid_shift[i - 1]; + for (pl.rid_shift[0] = 0, i = 1; i < n_split_idx; ++i) + pl.rid_shift[i] += pl.rid_shift[i - 1]; + kt_pipeline(2, worker_pipeline, &pl, 3); + free(pl.str.s); + free(pl.rid_shift); for (i = 0; i < n_split_idx; ++i) fclose(pl.fp_parts[i]); free(pl.fp_parts); diff --git a/mmpriv.h b/mmpriv.h index 99272a0..a02a1b7 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -80,7 +80,7 @@ void mm_select_sub(void *km, float pri_ratio, int min_diff, int best_n, int *n_, void mm_select_sub_multi(void *km, float pri_ratio, float pri1, float pri2, int max_gap_ref, int min_diff, int best_n, int n_segs, const int *qlens, int *n_, mm_reg1_t *r); void mm_filter_regs(const mm_mapopt_t *opt, int qlen, int *n_regs, mm_reg1_t *regs); void mm_join_long(void *km, const mm_mapopt_t *opt, int qlen, int *n_regs, mm_reg1_t *regs, mm128_t *a); -void mm_hit_sort_by_dp(void *km, int *n_regs, mm_reg1_t *r); +void mm_hit_sort(void *km, int *n_regs, mm_reg1_t *r); void mm_set_mapq(void *km, int n_regs, mm_reg1_t *regs, int min_chain_sc, int match_sc, int rep_len, int is_sr); void mm_est_err(const mm_idx_t *mi, int qlen, int n_regs, mm_reg1_t *regs, const mm128_t *a, int32_t n, const uint64_t *mini_pos); @@ -90,7 +90,7 @@ void mm_seg_free(void *km, int n_segs, mm_seg_t *segs); void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs); FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); -FILE **mm_split_merge_prep(const char *prefix, int n_splits, mm_idx_t **mi_); +mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part); int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx); void mm_err_puts(const char *str); diff --git a/splitidx.c b/splitidx.c index db0217c..7d8627d 100644 --- a/splitidx.c +++ b/splitidx.c @@ -26,17 +26,13 @@ FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) return fp; } -FILE **mm_split_merge_prep(const char *prefix, int n_splits, mm_idx_t **mi_) +mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part) { mm_idx_t *mi = 0; - FILE **fp; char *fn; int i, j; - uint32_t m_seq = 0; if (n_splits < 1) return 0; - *mi_ = 0; - fp = (FILE**)calloc(n_splits, sizeof(FILE*)); fn = (char*)calloc(strlen(prefix) + 10, 1); for (i = 0; i < n_splits; ++i) { sprintf(fn, "%s.%.4d.tmp", prefix, i); @@ -45,31 +41,28 @@ FILE **mm_split_merge_prep(const char *prefix, int n_splits, mm_idx_t **mi_) fprintf(stderr, "ERROR: failed to open temporary file '%s'\n", fn); for (j = 0; j < i; ++j) fclose(fp[j]); - free(fp); + free(fn); return 0; } } free(fn); + mi = (mm_idx_t*)calloc(1, sizeof(mm_idx_t)); for (i = 0; i < n_splits; ++i) { - uint32_t k, n; - mm_err_fread(&k, 4, 1, fp[i]); - mm_err_fread(&n, 4, 1, fp[i]); - mi->k = k; - if (mi->n_seq + n > m_seq) { - m_seq = mi->n_seq + n; - kroundup32(m_seq); - mi->seq = (mm_idx_seq_t*)realloc(mi->seq, sizeof(mm_idx_seq_t) * m_seq); - } - for (k = 0; k < n; ++i) { + mm_err_fread(&mi->k, 4, 1, fp[i]); // TODO: check if k is all the same + mm_err_fread(&n_seq_part[i], 4, 1, fp[i]); + mi->n_seq += n_seq_part[i]; + } + mi->seq = CALLOC(mm_idx_seq_t, mi->n_seq); + for (i = j = 0; i < n_splits; ++i) { + uint32_t k; + for (k = 0; k < n_seq_part[i]; ++k, ++j) { uint8_t l; mm_err_fread(&l, 1, 1, fp[i]); - mi->seq[mi->n_seq].name = (char*)calloc(l + 1, 1); - mm_err_fread(mi->seq[mi->n_seq].name, 1, l, fp[i]); - mm_err_fread(&mi->seq[mi->n_seq].len, 4, 1, fp[i]); - ++mi->n_seq; + mi->seq[j].name = (char*)calloc(l + 1, 1); + mm_err_fread(mi->seq[j].name, 1, l, fp[i]); + mm_err_fread(&mi->seq[j].len, 4, 1, fp[i]); } } - *mi_ = mi; - return fp; + return mi; } From a655cbef86f24619675da97eae99ee76bd929029 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 15 Jul 2018 11:03:18 -0400 Subject: [PATCH 6/8] print SAM header; remove tmp files --- main.c | 2 +- map.c | 5 ++++- mmpriv.h | 1 + splitidx.c | 16 ++++++++++++++-- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/main.c b/main.c index d32fff0..064e2fc 100644 --- a/main.c +++ b/main.c @@ -329,7 +329,7 @@ int main(int argc, char *argv[]) mm_write_sam_hdr(mi, rg, MM_VERSION, argc, argv); } else { mm_write_sam_hdr(0, rg, MM_VERSION, argc, argv); - if (mm_verbose >= 2) + if (opt.split_prefix == 0 && mm_verbose >= 2) fprintf(stderr, "[WARNING]\033[1;31m For a multi-part index, no @SQ lines will be outputted.\033[0m\n"); } } diff --git a/map.c b/map.c index be4ae74..1a6ff1a 100644 --- a/map.c +++ b/map.c @@ -659,7 +659,6 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp { int i; pipeline_t pl; - mm_idx_t *mi; if (n_segs < 1 || n_split_idx < 1) return -1; memset(&pl, 0, sizeof(pipeline_t)); pl.n_fp = n_segs; @@ -681,6 +680,9 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp pl.rid_shift[i] = pl.rid_shift[i - 1]; for (pl.rid_shift[0] = 0, i = 1; i < n_split_idx; ++i) pl.rid_shift[i] += pl.rid_shift[i - 1]; + if (opt->flag & MM_F_OUT_SAM) + for (i = 0; i < pl.mi->n_seq; ++i) + printf("@SQ\tSN:%s\tLN:%d\n", pl.mi->seq[i].name, pl.mi->seq[i].len); kt_pipeline(2, worker_pipeline, &pl, 3); @@ -692,5 +694,6 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp for (i = 0; i < pl.n_fp; ++i) mm_bseq_close(pl.fp[i]); free(pl.fp); + mm_split_rm_tmp(opt->split_prefix, n_split_idx); return 0; } diff --git a/mmpriv.h b/mmpriv.h index a02a1b7..f847445 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -92,6 +92,7 @@ void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part); int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx); +void mm_split_rm_tmp(const char *prefix, int n_splits); void mm_err_puts(const char *str); void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp); diff --git a/splitidx.c b/splitidx.c index 7d8627d..6d12071 100644 --- a/splitidx.c +++ b/splitidx.c @@ -33,7 +33,7 @@ mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint3 int i, j; if (n_splits < 1) return 0; - fn = (char*)calloc(strlen(prefix) + 10, 1); + fn = CALLOC(char, strlen(prefix) + 10); for (i = 0; i < n_splits; ++i) { sprintf(fn, "%s.%.4d.tmp", prefix, i); if ((fp[i] = fopen(fn, "rb")) == 0) { @@ -47,7 +47,7 @@ mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint3 } free(fn); - mi = (mm_idx_t*)calloc(1, sizeof(mm_idx_t)); + mi = CALLOC(mm_idx_t, 1); for (i = 0; i < n_splits; ++i) { mm_err_fread(&mi->k, 4, 1, fp[i]); // TODO: check if k is all the same mm_err_fread(&n_seq_part[i], 4, 1, fp[i]); @@ -66,3 +66,15 @@ mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint3 } return mi; } + +void mm_split_rm_tmp(const char *prefix, int n_splits) +{ + int i; + char *fn; + fn = CALLOC(char, strlen(prefix) + 10); + for (i = 0; i < n_splits; ++i) { + sprintf(fn, "%s.%.4d.tmp", prefix, i); + remove(fn); + } + free(fn); +} From 830da7fa278d416a4fd8f18a784aa01bc69c78a1 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 15 Jul 2018 11:48:14 -0400 Subject: [PATCH 7/8] r814: resumed versioning --- main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.c b/main.c index 064e2fc..ee12823 100644 --- a/main.c +++ b/main.c @@ -10,7 +10,7 @@ #include "getopt.h" #endif -#define MM_VERSION "2.11-r797" +#define MM_VERSION "2.11-r814-dirty" #ifdef __linux__ #include From 395c8d678a099b4485b4cc20677f581762e84405 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 15 Jul 2018 22:11:32 -0400 Subject: [PATCH 8/8] r815: fixed a memory leak --- index.c | 10 ++++++---- main.c | 4 ++-- map.c | 4 +++- minimap2.1 | 5 ++++- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/index.c b/index.c index cfdef19..d7ddc6e 100644 --- a/index.c +++ b/index.c @@ -48,10 +48,12 @@ void mm_idx_destroy(mm_idx_t *mi) uint32_t i; if (mi == 0) return; if (mi->h) kh_destroy(str, (khash_t(str)*)mi->h); - for (i = 0; i < 1U<b; ++i) { - free(mi->B[i].p); - free(mi->B[i].a.a); - kh_destroy(idx, (idxhash_t*)mi->B[i].h); + if (mi->B) { + for (i = 0; i < 1U<b; ++i) { + free(mi->B[i].p); + free(mi->B[i].a.a); + kh_destroy(idx, (idxhash_t*)mi->B[i].h); + } } if (!mi->km) { for (i = 0; i < mi->n_seq; ++i) diff --git a/main.c b/main.c index ee12823..00324ed 100644 --- a/main.c +++ b/main.c @@ -10,7 +10,7 @@ #include "getopt.h" #endif -#define MM_VERSION "2.11-r814-dirty" +#define MM_VERSION "2.11-r815-dirty" #ifdef __linux__ #include @@ -330,7 +330,7 @@ int main(int argc, char *argv[]) } else { mm_write_sam_hdr(0, rg, MM_VERSION, argc, argv); if (opt.split_prefix == 0 && mm_verbose >= 2) - fprintf(stderr, "[WARNING]\033[1;31m For a multi-part index, no @SQ lines will be outputted.\033[0m\n"); + fprintf(stderr, "[WARNING]\033[1;31m For a multi-part index, no @SQ lines will be outputted. Please use --split-prefix.\033[0m\n"); } } if (mm_verbose >= 3) diff --git a/map.c b/map.c index 1a6ff1a..ba04d32 100644 --- a/map.c +++ b/map.c @@ -659,6 +659,7 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp { int i; pipeline_t pl; + mm_idx_t *mi; if (n_segs < 1 || n_split_idx < 1) return -1; memset(&pl, 0, sizeof(pipeline_t)); pl.n_fp = n_segs; @@ -670,7 +671,7 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp pl.n_parts = n_split_idx; pl.fp_parts = CALLOC(FILE*, pl.n_parts); pl.rid_shift = CALLOC(uint32_t, pl.n_parts); - pl.mi = mm_split_merge_prep(opt->split_prefix, n_split_idx, pl.fp_parts, pl.rid_shift); + pl.mi = mi = mm_split_merge_prep(opt->split_prefix, n_split_idx, pl.fp_parts, pl.rid_shift); if (pl.mi == 0) { free(pl.fp_parts); free(pl.rid_shift); @@ -687,6 +688,7 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp kt_pipeline(2, worker_pipeline, &pl, 3); free(pl.str.s); + mm_idx_destroy(mi); free(pl.rid_shift); for (i = 0; i < n_split_idx; ++i) fclose(pl.fp_parts[i]); diff --git a/minimap2.1 b/minimap2.1 index 0d240d1..ced6c8c 100644 --- a/minimap2.1 +++ b/minimap2.1 @@ -239,7 +239,7 @@ Disable the long gap patching heuristic. When this option is applied, the maximum alignment gap is mostly controlled by .BR -r . .TP -.B --lj-min-ratio \ FLOAT +.BI --lj-min-ratio \ FLOAT Fraction of query sequence length required to bridge a long gap [0.5]. A smaller value helps to recover longer gaps, at the cost of more false gaps. .TP @@ -252,6 +252,9 @@ applies a second round of chaining with a higher minimizer occurrence threshold if no good chain is found. In addition, minimap2 attempts to patch gaps between seeds with ungapped alignment. .TP +.BI --split-prefix \ STR +Prefix to create temporary files. Typically used for a multi-part index. +.TP .BR --frag = no | yes Whether to enable the fragment mode [no] .TP