allow to read multiple files interleaved

This commit is contained in:
Heng Li 2017-09-24 14:33:05 -04:00
parent 84bbc47152
commit f0951141a1
4 changed files with 66 additions and 15 deletions

35
bseq.c
View File

@ -54,11 +54,12 @@ static inline void kseq2bseq(kseq_t *ks, mm_bseq1_t *s, int with_qual)
s->l_seq = ks->seq.l; s->l_seq = ks->seq.l;
} }
mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_) mm_bseq1_t *mm_bseq_read2(mm_bseq_file_t *fp, int chunk_size, int with_qual, int check_name, int *n_)
{ {
int64_t size = 0; int64_t size = 0;
kvec_t(mm_bseq1_t) a = {0,0,0}; kvec_t(mm_bseq1_t) a = {0,0,0};
kseq_t *ks = fp->ks; kseq_t *ks = fp->ks;
*n_ = 0;
if (fp->s.seq) { if (fp->s.seq) {
kv_resize(mm_bseq1_t, 0, a, 256); kv_resize(mm_bseq1_t, 0, a, 256);
kv_push(mm_bseq1_t, 0, a, fp->s); kv_push(mm_bseq1_t, 0, a, fp->s);
@ -73,7 +74,7 @@ mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int
kseq2bseq(ks, s, with_qual); kseq2bseq(ks, s, with_qual);
size += s->l_seq; size += s->l_seq;
if (size >= chunk_size) { if (size >= chunk_size) {
if (a.a[a.n-1].l_seq < CHECK_PAIR_THRES) { if (check_name && a.a[a.n-1].l_seq < CHECK_PAIR_THRES) {
while (kseq_read(ks) >= 0) { while (kseq_read(ks) >= 0) {
kseq2bseq(ks, &fp->s, with_qual); kseq2bseq(ks, &fp->s, with_qual);
if (mm_qname_same(fp->s.name, a.a[a.n-1].name)) { if (mm_qname_same(fp->s.name, a.a[a.n-1].name)) {
@ -89,6 +90,36 @@ mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int
return a.a; return a.a;
} }
mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_)
{
return mm_bseq_read2(fp, chunk_size, with_qual, 0, n_);
}
mm_bseq1_t *mm_bseq_read_multi(int n_fp, mm_bseq_file_t **fp, int chunk_size, int with_qual, int *n_)
{
int i;
int64_t size = 0;
kvec_t(mm_bseq1_t) a = {0,0,0};
*n_ = 0;
if (n_fp < 1) return 0;
while (1) {
for (i = 0; i < n_fp; ++i)
if (kseq_read(fp[i]->ks) < 0)
break;
if (i != n_fp) break; // some file reaches the end
if (a.m == 0) kv_resize(mm_bseq1_t, 0, a, 256);
for (i = 0; i < n_fp; ++i) {
mm_bseq1_t *s;
kv_pushp(mm_bseq1_t, 0, a, &s);
kseq2bseq(fp[i]->ks, s, with_qual);
size += s->l_seq;
}
if (size >= chunk_size) break;
}
*n_ = a.n;
return a.a;
}
int mm_bseq_eof(mm_bseq_file_t *fp) int mm_bseq_eof(mm_bseq_file_t *fp)
{ {
return (ks_eof(fp->ks->f) && fp->s.seq == 0); return (ks_eof(fp->ks->f) && fp->s.seq == 0);

2
bseq.h
View File

@ -17,7 +17,9 @@ typedef struct {
mm_bseq_file_t *mm_bseq_open(const char *fn); mm_bseq_file_t *mm_bseq_open(const char *fn);
void mm_bseq_close(mm_bseq_file_t *fp); void mm_bseq_close(mm_bseq_file_t *fp);
mm_bseq1_t *mm_bseq_read2(mm_bseq_file_t *fp, int chunk_size, int with_qual, int check_name, int *n_);
mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_); mm_bseq1_t *mm_bseq_read(mm_bseq_file_t *fp, int chunk_size, int with_qual, int *n_);
mm_bseq1_t *mm_bseq_read_multi(int n_fp, mm_bseq_file_t **fp, int chunk_size, int with_qual, int *n_);
int mm_bseq_eof(mm_bseq_file_t *fp); int mm_bseq_eof(mm_bseq_file_t *fp);
extern unsigned char seq_nt4_table[256]; extern unsigned char seq_nt4_table[256];

43
map.c
View File

@ -317,18 +317,18 @@ mm_reg1_t *mm_map(const mm_idx_t *mi, int qlen, const char *seq, int *n_regs, mm
**************************/ **************************/
typedef struct { typedef struct {
int mini_batch_size, n_processed, n_threads; int mini_batch_size, n_processed, n_threads, n_fp;
const mm_mapopt_t *opt; const mm_mapopt_t *opt;
mm_bseq_file_t *fp; mm_bseq_file_t **fp;
const mm_idx_t *mi; const mm_idx_t *mi;
kstring_t str; kstring_t str;
} pipeline_t; } pipeline_t;
typedef struct { typedef struct {
const pipeline_t *p; const pipeline_t *p;
int n_seq; int n_seq, n_frag;
mm_bseq1_t *seq; mm_bseq1_t *seq;
int *n_reg; int *n_reg, *n_seg;
mm_reg1_t **reg; mm_reg1_t **reg;
mm_tbuf_t **buf; mm_tbuf_t **buf;
} step_t; } step_t;
@ -349,7 +349,8 @@ static void *worker_pipeline(void *shared, int step, void *in)
int with_qual = (!!(p->opt->flag & MM_F_OUT_SAM) && !(p->opt->flag & MM_F_NO_QUAL)); int with_qual = (!!(p->opt->flag & MM_F_OUT_SAM) && !(p->opt->flag & MM_F_NO_QUAL));
step_t *s; step_t *s;
s = (step_t*)calloc(1, sizeof(step_t)); s = (step_t*)calloc(1, sizeof(step_t));
s->seq = mm_bseq_read(p->fp, p->mini_batch_size, with_qual, &s->n_seq); if (p->n_fp > 1) s->seq = mm_bseq_read_multi(p->n_fp, p->fp, p->mini_batch_size, with_qual, &s->n_seq);
else s->seq = mm_bseq_read2(p->fp[0], p->mini_batch_size, with_qual, !!(p->opt->flag&MM_F_MULTI_SEG), &s->n_seq);
if (s->seq) { if (s->seq) {
s->p = p; s->p = p;
for (i = 0; i < s->n_seq; ++i) for (i = 0; i < s->n_seq; ++i)
@ -399,22 +400,38 @@ static void *worker_pipeline(void *shared, int step, void *in)
return 0; return 0;
} }
int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int n_threads) int mm_map_file_multi_seg(const mm_idx_t *idx, int n_segs, const char **fn, const mm_mapopt_t *opt, int n_threads)
{ {
int i, j;
pipeline_t pl; pipeline_t pl;
if (n_segs < 1) return -1;
memset(&pl, 0, sizeof(pipeline_t)); memset(&pl, 0, sizeof(pipeline_t));
pl.fp = mm_bseq_open(fn); pl.fp = (mm_bseq_file_t**)calloc(n_segs, sizeof(mm_bseq_file_t*));
if (pl.fp == 0) { for (i = 0; i < n_segs; ++i) {
if (mm_verbose >= 1) pl.fp[i] = mm_bseq_open(fn[i]);
fprintf(stderr, "ERROR: failed to open file '%s'\n", fn); if (pl.fp[i] == 0) {
return -1; if (mm_verbose >= 1)
fprintf(stderr, "ERROR: failed to open file '%s'\n", fn[i]);
for (j = 0; j < i; ++j)
mm_bseq_close(pl.fp[j]);
free(pl.fp);
return -1;
}
} }
pl.opt = opt, pl.mi = idx; pl.opt = opt, pl.mi = idx;
pl.n_threads = n_threads, pl.mini_batch_size = opt->mini_batch_size; pl.n_threads = n_threads > 1? n_threads : 1;
pl.mini_batch_size = opt->mini_batch_size;
if ((opt->flag & MM_F_OUT_SAM) && !(opt->flag & MM_F_NO_SAM_SQ)) if ((opt->flag & MM_F_OUT_SAM) && !(opt->flag & MM_F_NO_SAM_SQ))
mm_write_sam_SQ(idx); mm_write_sam_SQ(idx);
kt_pipeline(n_threads == 1? 1 : 2, worker_pipeline, &pl, 3); kt_pipeline(n_threads == 1? 1 : 2, worker_pipeline, &pl, 3);
free(pl.str.s); free(pl.str.s);
mm_bseq_close(pl.fp); for (i = 0; i < n_segs; ++i)
mm_bseq_close(pl.fp[i]);
free(pl.fp);
return 0; return 0;
} }
int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int n_threads)
{
return mm_map_file_multi_seg(idx, 1, &fn, opt, n_threads);
}

View File

@ -17,6 +17,7 @@
#define MM_F_SPLICE_REV 0x200 // match CT-AC, the reverse complement of GT-AG #define MM_F_SPLICE_REV 0x200 // match CT-AC, the reverse complement of GT-AG
#define MM_F_NO_SAM_SQ 0x800 #define MM_F_NO_SAM_SQ 0x800
#define MM_F_SR 0x1000 #define MM_F_SR 0x1000
#define MM_F_MULTI_SEG 0x2000
#define MM_IDX_MAGIC "MMI\2" #define MM_IDX_MAGIC "MMI\2"