From 1a55227d5a97d8fe5e20b431b6f511b4f5497e6d Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sat, 14 Jul 2018 12:15:10 -0400 Subject: [PATCH] write hits to tmp files (unfinished) --- Makefile | 2 +- index.c | 2 +- main.c | 2 ++ map.c | 73 +++++++++++++++++++++++++++++++++--------------------- minimap.h | 5 +++- misc.c | 20 +++++++++++++++ mmpriv.h | 4 +++ splitidx.c | 26 +++++++++++++++++++ 8 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 splitidx.c diff --git a/Makefile b/Makefile index f30c7b3..f3b96dc 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ CFLAGS= -g -Wall -O2 -Wc++-compat #-Wextra CPPFLAGS= -DHAVE_KALLOC INCLUDES= -OBJS= kthread.o kalloc.o misc.o bseq.o sketch.o sdust.o options.o index.o chain.o align.o hit.o map.o format.o pe.o esterr.o ksw2_ll_sse.o +OBJS= kthread.o kalloc.o misc.o bseq.o sketch.o sdust.o options.o index.o chain.o align.o hit.o map.o format.o pe.o esterr.o splitidx.o ksw2_ll_sse.o PROG= minimap2 PROG_EXTRA= sdust minimap2-lite LIBS= -lm -lz -lpthread diff --git a/index.c b/index.c index 3aa8df8..cfdef19 100644 --- a/index.c +++ b/index.c @@ -563,7 +563,7 @@ mm_idx_t *mm_idx_reader_read(mm_idx_reader_t *r, int n_threads) mi = mm_idx_gen(r->fp.seq, r->opt.w, r->opt.k, r->opt.bucket_bits, r->opt.flag, r->opt.mini_batch_size, n_threads, r->opt.batch_size); if (mi) { if (r->fp_out) mm_idx_dump(r->fp_out, mi); - ++r->n_parts; + mi->index = r->n_parts++; } return mi; } diff --git a/main.c b/main.c index 22d55e3..69e5dcc 100644 --- a/main.c +++ b/main.c @@ -61,6 +61,7 @@ static struct option long_options[] = { { "score-N", required_argument, 0, 0 }, // 31 { "eqx", no_argument, 0, 0 }, // 32 { "paf-no-hit", no_argument, 0, 0 }, // 33 + { "split-prefix", required_argument, 0, 0 }, // 34 { "help", no_argument, 0, 'h' }, { "max-intron-len", required_argument, 0, 'G' }, { "version", no_argument, 0, 'V' }, @@ -181,6 +182,7 @@ int main(int argc, char *argv[]) else if (c == 0 && long_idx ==31) opt.sc_ambi = atoi(optarg); // --score-N else if (c == 0 && long_idx ==32) opt.flag |= MM_F_EQX; // --eqx else if (c == 0 && long_idx ==33) opt.flag |= MM_F_PAF_NO_HIT; // --paf-no-hit + else if (c == 0 && long_idx ==34) opt.split_prefix = optarg; // --split-prefix else if (c == 0 && long_idx == 14) { // --frag yes_or_no(&opt, MM_F_FRAG_MODE, long_idx, optarg, 1); } else if (c == 0 && long_idx == 15) { // --secondary diff --git a/map.c b/map.c index 0864c09..9362f6a 100644 --- a/map.c +++ b/map.c @@ -262,7 +262,7 @@ static mm_reg1_t *align_regs(const mm_mapopt_t *opt, const mm_idx_t *mi, void *k return regs; } -void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) +int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) { int i, j, rep_len, qlen_sum, n_regs0, n_mini_pos; int max_chain_gap_qry, max_chain_gap_ref, is_splice = !!(opt->flag & MM_F_SPLICE), is_sr = !!(opt->flag & MM_F_SR); @@ -277,7 +277,7 @@ void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char ** for (i = 0, qlen_sum = 0; i < n_segs; ++i) qlen_sum += qlens[i], n_regs[i] = 0, regs[i] = 0; - if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return; + if (qlen_sum == 0 || n_segs <= 0 || n_segs > MM_MAX_SEG) return 0; hash = qname? __ac_X31_hash_string(qname) : 0; hash ^= __ac_Wang_hash(qlen_sum) + __ac_Wang_hash(opt->seed); @@ -375,6 +375,7 @@ void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char ** b->km = km_init(); } } + return rep_len; } mm_reg1_t *mm_map(const mm_idx_t *mi, int qlen, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname) @@ -394,13 +395,14 @@ typedef struct { mm_bseq_file_t **fp; const mm_idx_t *mi; kstring_t str; + FILE *fp_split; } pipeline_t; typedef struct { const pipeline_t *p; int n_seq, n_frag; mm_bseq1_t *seq; - int *n_reg, *seg_off, *n_seg; + int *n_reg, *seg_off, *n_seg, *rep_len; mm_reg1_t **reg; mm_tbuf_t **buf; } step_t; @@ -422,9 +424,12 @@ static void worker_for(void *_data, long i, int tid) // kt_for() callback } if (s->p->opt->flag & MM_F_INDEPEND_SEG) { for (j = 0; j < s->n_seg[i]; ++j) - mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); + s->rep_len[off + j] = mm_map_frag(s->p->mi, 1, &qlens[j], &qseqs[j], &s->n_reg[off+j], &s->reg[off+j], b, s->p->opt, s->seq[off+j].name); } else { - mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); + int rep_len; + rep_len = mm_map_frag(s->p->mi, s->n_seg[i], qlens, qseqs, &s->n_reg[off], &s->reg[off], b, s->p->opt, s->seq[off].name); + for (j = 0; j < s->n_seg[i]; ++j) + s->rep_len[off + j] = rep_len; } for (j = 0; j < s->n_seg[i]; ++j) // flip the query strand and coordinate to the original read strand if (s->n_seg[i] == 2 && ((j == 0 && (pe_ori>>1&1)) || (j == 1 && (pe_ori&1)))) { @@ -459,9 +464,10 @@ static void *worker_pipeline(void *shared, int step, void *in) s->buf = (mm_tbuf_t**)calloc(p->n_threads, sizeof(mm_tbuf_t*)); for (i = 0; i < p->n_threads; ++i) s->buf[i] = mm_tbuf_init(); - s->n_reg = (int*)calloc(3 * s->n_seq, sizeof(int)); - s->seg_off = s->n_reg + s->n_seq; // seg_off and n_seg are allocated together with n_reg + s->n_reg = (int*)calloc(4 * s->n_seq, sizeof(int)); + s->seg_off = s->n_reg + s->n_seq; // seg_off, n_seg and rep_len are allocated together with n_reg s->n_seg = s->seg_off + s->n_seq; + s->rep_len = s->n_seg + s->n_seq; s->reg = (mm_reg1_t**)calloc(s->n_seq, sizeof(mm_reg1_t*)); for (i = 1, j = 0; i <= s->n_seq; ++i) if (i == s->n_seq || !frag_mode || !mm_qname_same(s->seq[i-1].name, s->seq[i].name)) { @@ -485,28 +491,35 @@ static void *worker_pipeline(void *shared, int step, void *in) int seg_st = s->seg_off[k], seg_en = s->seg_off[k] + s->n_seg[k]; for (i = seg_st; i < seg_en; ++i) { mm_bseq1_t *t = &s->seq[i]; - for (j = 0; j < s->n_reg[i]; ++j) { - mm_reg1_t *r = &s->reg[i][j]; - assert(!r->sam_pri || r->id == r->parent); - if ((p->opt->flag & MM_F_NO_PRINT_2ND) && r->id != r->parent) - continue; - if (p->opt->flag & MM_F_OUT_SAM) - mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); - else - mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); - mm_err_puts(p->str.s); + if (p->opt->split_prefix) { + mm_err_fwrite(&s->n_reg[i], sizeof(s->n_reg[i]), 1, p->fp_split); + mm_err_fwrite(&s->rep_len[i], sizeof(s->rep_len[i]), 1, p->fp_split); } - if (s->n_reg[i] == 0) { - if (p->opt->flag & MM_F_OUT_SAM) { - mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); - mm_err_puts(p->str.s); - } else if (p->opt->flag & MM_F_PAF_NO_HIT) { - mm_write_paf(&p->str, mi, t, 0, 0, p->opt->flag); - mm_err_puts(p->str.s); + if (s->n_reg[i] > 0) { + for (j = 0; j < s->n_reg[i]; ++j) { + mm_reg1_t *r = &s->reg[i][j]; + assert(!r->sam_pri || r->id == r->parent); + if ((p->opt->flag & MM_F_NO_PRINT_2ND) && r->id != r->parent) + continue; + if (p->opt->split_prefix) { + mm_err_fwrite(r, sizeof(mm_reg1_t), 1, p->fp_split); + if (p->opt->flag & MM_F_CIGAR) { + mm_err_fwrite(&r->p->capacity, 4, 1, p->fp_split); + mm_err_fwrite(r->p, r->p->capacity, 4, p->fp_split); + } + } else { + if (p->opt->flag & MM_F_OUT_SAM) + mm_write_sam2(&p->str, mi, t, i - seg_st, j, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + else + mm_write_paf(&p->str, mi, t, r, km, p->opt->flag); + mm_err_puts(p->str.s); + } } - } - if (s->n_reg[i] == 0 && (p->opt->flag & MM_F_OUT_SAM)) { // write an unmapped record - mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + } else if (p->opt->flag & (MM_F_OUT_SAM|MM_F_PAF_NO_HIT)) { + if (p->opt->flag & MM_F_OUT_SAM) + mm_write_sam2(&p->str, mi, t, i - seg_st, -1, s->n_seg[k], &s->n_reg[seg_st], (const mm_reg1_t*const*)&s->reg[seg_st], km, p->opt->flag); + else + mm_write_paf(&p->str, mi, t, 0, 0, p->opt->flag); mm_err_puts(p->str.s); } } @@ -517,7 +530,7 @@ static void *worker_pipeline(void *shared, int step, void *in) if (s->seq[i].qual) free(s->seq[i].qual); } } - free(s->reg); free(s->n_reg); free(s->seq); // seg_off and n_seg were allocated with reg; no memory leak here + free(s->reg); free(s->n_reg); free(s->seq); // seg_off, n_seg and rep_len were allocated with reg; no memory leak here km_destroy(km); if (mm_verbose >= 3) fprintf(stderr, "[M::%s::%.3f*%.2f] mapped %d sequences\n", __func__, realtime() - mm_realtime0, cputime() / (realtime() - mm_realtime0), s->n_seq); @@ -548,9 +561,13 @@ int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_ pl.opt = opt, pl.mi = idx; pl.n_threads = n_threads > 1? n_threads : 1; pl.mini_batch_size = opt->mini_batch_size; + if (opt->split_prefix) + pl.fp_split = mm_split_init(opt->split_prefix, idx); pl_threads = n_threads == 1? 1 : (opt->flag&MM_F_2_IO_THREADS)? 3 : 2; kt_pipeline(pl_threads, worker_pipeline, &pl, 3); free(pl.str.s); + if (opt->split_prefix) + fclose(pl.fp_split); for (i = 0; i < n_segs; ++i) mm_bseq_close(pl.fp[i]); free(pl.fp); diff --git a/minimap.h b/minimap.h index c1a9267..e288d71 100644 --- a/minimap.h +++ b/minimap.h @@ -60,6 +60,7 @@ typedef struct { typedef struct { int32_t b, w, k, flag; uint32_t n_seq; // number of reference sequences + int32_t index; mm_idx_seq_t *seq; // sequence name, length and offset uint32_t *S; // 4-bit packed sequence struct mm_idx_bucket_s *B; // index (hidden) @@ -136,6 +137,8 @@ typedef struct { int32_t mid_occ; // ignore seeds with occurrences above this threshold int32_t max_occ; int mini_batch_size; // size of a batch of query bases to process in parallel + + const char *split_prefix; } mm_mapopt_t; // index reader @@ -318,7 +321,7 @@ void mm_tbuf_destroy(mm_tbuf_t *b); */ mm_reg1_t *mm_map(const mm_idx_t *mi, int l_seq, const char *seq, int *n_regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *name); -void mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); +int mm_map_frag(const mm_idx_t *mi, int n_segs, const int *qlens, const char **seqs, int *n_regs, mm_reg1_t **regs, mm_tbuf_t *b, const mm_mapopt_t *opt, const char *qname); /** * Align a fasta/fastq file and print alignments to stdout diff --git a/misc.c b/misc.c index f7a43cb..930e6bf 100644 --- a/misc.c +++ b/misc.c @@ -131,6 +131,26 @@ void mm_err_puts(const char *str) } } +void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp) +{ + int ret; + ret = fwrite(p, size, nitems, fp); + if (ret == EOF) { + fprintf(stderr, "[ERROR] failed to write data\n"); + exit(EXIT_FAILURE); + } +} + +void mm_err_fread(void *p, size_t size, size_t nitems, FILE *fp) +{ + int ret; + ret = fread(p, size, nitems, fp); + if (ret == EOF) { + fprintf(stderr, "[ERROR] failed to read data\n"); + exit(EXIT_FAILURE); + } +} + #include "ksort.h" #define sort_key_128x(a) ((a).x) diff --git a/mmpriv.h b/mmpriv.h index 6ec4a21..a823d96 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -86,7 +86,11 @@ mm_seg_t *mm_seg_gen(void *km, uint32_t hash, int n_segs, const int *qlens, int void mm_seg_free(void *km, int n_segs, mm_seg_t *segs); void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs); +FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); + void mm_err_puts(const char *str); +void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp); +void mm_err_fread(void *p, size_t size, size_t nitems, FILE *fp); #ifdef __cplusplus } diff --git a/splitidx.c b/splitidx.c new file mode 100644 index 0000000..9c738a3 --- /dev/null +++ b/splitidx.c @@ -0,0 +1,26 @@ +#include +#include +#include +#include +#include "mmpriv.h" + +FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) +{ + char *fn; + FILE *fp; + uint32_t i; + fn = (char*)calloc(strlen(prefix) + 10, 1); + sprintf(fn, "%s.%.4d.tmp", prefix, mi->index); + fp = fopen(fn, "wb"); + assert(fp); + mm_err_fwrite(&mi->n_seq, 4, 1, fp); + for (i = 0; i < mi->n_seq; ++i) { + uint8_t l; + l = strlen(mi->seq[i].name); + mm_err_fwrite(&l, 1, 1, fp); + mm_err_fwrite(mi->seq[i].name, 1, l, fp); + mm_err_fwrite(&mi->seq[i].len, 4, 1, fp); + } + free(fn); + return fp; +}