From f0951141a14f6298ae1cccb8d47ef7fdb71d5590 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 24 Sep 2017 14:33:05 -0400 Subject: [PATCH] allow to read multiple files interleaved --- bseq.c | 35 +++++++++++++++++++++++++++++++++-- bseq.h | 2 ++ map.c | 43 ++++++++++++++++++++++++++++++------------- minimap.h | 1 + 4 files changed, 66 insertions(+), 15 deletions(-) diff --git a/bseq.c b/bseq.c index 95f063d..87da0c3 100644 --- a/bseq.c +++ b/bseq.c @@ -54,11 +54,12 @@ static inline void kseq2bseq(kseq_t *ks, mm_bseq1_t *s, int with_qual) s->l_seq = ks->seq.l; } -mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_) +mm_bseq1_t *mm_bseq_read2(mm_bseq_file_t *fp, int chunk_size, int with_qual, int check_name, int *n_) { int64_t size = 0; kvec_t(mm_bseq1_t) a = {0,0,0}; kseq_t *ks = fp->ks; + *n_ = 0; if (fp->s.seq) { kv_resize(mm_bseq1_t, 0, a, 256); kv_push(mm_bseq1_t, 0, a, fp->s); @@ -73,7 +74,7 @@ mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int kseq2bseq(ks, s, with_qual); size += s->l_seq; if (size >= chunk_size) { - if (a.a[a.n-1].l_seq < CHECK_PAIR_THRES) { + if (check_name && a.a[a.n-1].l_seq < CHECK_PAIR_THRES) { while (kseq_read(ks) >= 0) { kseq2bseq(ks, &fp->s, with_qual); if (mm_qname_same(fp->s.name, a.a[a.n-1].name)) { @@ -89,6 +90,36 @@ mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int return a.a; } +mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_) +{ + return mm_bseq_read2(fp, chunk_size, with_qual, 0, n_); +} + +mm_bseq1_t *mm_bseq_read_multi(int n_fp, mm_bseq_file_t **fp, int chunk_size, int with_qual, int *n_) +{ + int i; + int64_t size = 0; + kvec_t(mm_bseq1_t) a = {0,0,0}; + *n_ = 0; + if (n_fp < 1) return 0; + while (1) { + for (i = 0; i < n_fp; ++i) + if (kseq_read(fp[i]->ks) < 0) + break; + if (i != n_fp) break; // some file reaches the end + if (a.m == 0) kv_resize(mm_bseq1_t, 0, a, 256); + for (i = 0; i < n_fp; ++i) { + mm_bseq1_t *s; + kv_pushp(mm_bseq1_t, 0, a, &s); + kseq2bseq(fp[i]->ks, s, with_qual); + size += s->l_seq; + } + if (size >= chunk_size) break; + } + *n_ = a.n; + return a.a; +} + int mm_bseq_eof(mm_bseq_file_t *fp) { return (ks_eof(fp->ks->f) && fp->s.seq == 0); diff --git a/bseq.h b/bseq.h index 2d065f9..31ed256 100644 --- a/bseq.h +++ b/bseq.h @@ -17,7 +17,9 @@ typedef struct { mm_bseq_file_t *mm_bseq_open(const char *fn); void mm_bseq_close(mm_bseq_file_t *fp); +mm_bseq1_t *mm_bseq_read2(mm_bseq_file_t *fp, int chunk_size, int with_qual, int check_name, int *n_); mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_); +mm_bseq1_t *mm_bseq_read_multi(int n_fp, mm_bseq_file_t **fp, int chunk_size, int with_qual, int *n_); int mm_bseq_eof(mm_bseq_file_t *fp); extern unsigned char seq_nt4_table[256]; diff --git a/map.c b/map.c index 262d1a2..37e8c22 100644 --- a/map.c +++ b/map.c @@ -317,18 +317,18 @@ mm_reg1_t *mm_map(const mm_idx_t *mi, int qlen, const char *seq, int *n_regs, mm **************************/ typedef struct { - int mini_batch_size, n_processed, n_threads; + int mini_batch_size, n_processed, n_threads, n_fp; const mm_mapopt_t *opt; - mm_bseq_file_t *fp; + mm_bseq_file_t **fp; const mm_idx_t *mi; kstring_t str; } pipeline_t; typedef struct { const pipeline_t *p; - int n_seq; + int n_seq, n_frag; mm_bseq1_t *seq; - int *n_reg; + int *n_reg, *n_seg; mm_reg1_t **reg; mm_tbuf_t **buf; } step_t; @@ -349,7 +349,8 @@ static void *worker_pipeline(void *shared, int step, void *in) int with_qual = (!!(p->opt->flag & MM_F_OUT_SAM) && !(p->opt->flag & MM_F_NO_QUAL)); step_t *s; s = (step_t*)calloc(1, sizeof(step_t)); - s->seq = mm_bseq_read(p->fp, p->mini_batch_size, with_qual, &s->n_seq); + if (p->n_fp > 1) s->seq = mm_bseq_read_multi(p->n_fp, p->fp, p->mini_batch_size, with_qual, &s->n_seq); + else s->seq = mm_bseq_read2(p->fp[0], p->mini_batch_size, with_qual, !!(p->opt->flag&MM_F_MULTI_SEG), &s->n_seq); if (s->seq) { s->p = p; for (i = 0; i < s->n_seq; ++i) @@ -399,22 +400,38 @@ static void *worker_pipeline(void *shared, int step, void *in) return 0; } -int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int n_threads) +int mm_map_file_multi_seg(const mm_idx_t *idx, int n_segs, const char **fn, const mm_mapopt_t *opt, int n_threads) { + int i, j; pipeline_t pl; + if (n_segs < 1) return -1; memset(&pl, 0, sizeof(pipeline_t)); - pl.fp = mm_bseq_open(fn); - if (pl.fp == 0) { - if (mm_verbose >= 1) - fprintf(stderr, "ERROR: failed to open file '%s'\n", fn); - return -1; + pl.fp = (mm_bseq_file_t**)calloc(n_segs, sizeof(mm_bseq_file_t*)); + for (i = 0; i < n_segs; ++i) { + pl.fp[i] = mm_bseq_open(fn[i]); + if (pl.fp[i] == 0) { + if (mm_verbose >= 1) + fprintf(stderr, "ERROR: failed to open file '%s'\n", fn[i]); + for (j = 0; j < i; ++j) + mm_bseq_close(pl.fp[j]); + free(pl.fp); + return -1; + } } pl.opt = opt, pl.mi = idx; - pl.n_threads = n_threads, pl.mini_batch_size = opt->mini_batch_size; + pl.n_threads = n_threads > 1? n_threads : 1; + pl.mini_batch_size = opt->mini_batch_size; if ((opt->flag & MM_F_OUT_SAM) && !(opt->flag & MM_F_NO_SAM_SQ)) mm_write_sam_SQ(idx); kt_pipeline(n_threads == 1? 1 : 2, worker_pipeline, &pl, 3); free(pl.str.s); - mm_bseq_close(pl.fp); + for (i = 0; i < n_segs; ++i) + mm_bseq_close(pl.fp[i]); + free(pl.fp); return 0; } + +int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int n_threads) +{ + return mm_map_file_multi_seg(idx, 1, &fn, opt, n_threads); +} diff --git a/minimap.h b/minimap.h index 74bf2ae..46e66e7 100644 --- a/minimap.h +++ b/minimap.h @@ -17,6 +17,7 @@ #define MM_F_SPLICE_REV 0x200 // match CT-AC, the reverse complement of GT-AG #define MM_F_NO_SAM_SQ 0x800 #define MM_F_SR 0x1000 +#define MM_F_MULTI_SEG 0x2000 #define MM_IDX_MAGIC "MMI\2"