添加了可同时读写的pipeline,优化了时间统计

This commit is contained in:
zzh 2024-03-24 04:40:09 +08:00
parent d728ddbe2c
commit 1e3965cb7d
16 changed files with 1281 additions and 195 deletions

4
.vscode/launch.json vendored
View File

@ -18,8 +18,8 @@
"-R",
"'@RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa'",
"~/reference/human_g1k_v37_decoy.fasta",
"~/data/fastq/ds/d1_1.fq",
"~/data/fastq/ds/d1_2.fq",
"~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq",
"~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq",
"-o",
"/dev/null"
],

View File

@ -10,6 +10,12 @@
"limits": "c",
"bit": "c",
"numeric": "c",
"typeinfo": "c"
"typeinfo": "c",
"yarn.h": "c",
"malloc_wrap.h": "c",
"emmintrin.h": "c",
"bwamem.h": "c",
"utils.h": "c",
"stdio.h": "c"
}
}

View File

@ -3,12 +3,18 @@ CC= gcc
# CFLAGS= -g -Wall -Wno-unused-function -O2
CFLAGS= -g -Wall -Wno-unused-function -mavx2 -O2
WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS
SHOW_PERF= #-DSHOW_PERF
FILTER_FULL_MATCH=#-DFILTER_FULL_MATCH
SHOW_PERF= -DSHOW_PERF
SHOW_DATA_PERF= #-DSHOW_DATA_PERF
DEBUG_OUTPUT= #-DDEBUG_OUTPUT
DEBUG_SW_EXTEND= #-DDEBUG_SW_EXTEND
FILTER_FULL_MATCH= #-DFILTER_FULL_MATCH
USE_MT_READ= #-DUSE_MT_READ
AR= ar
DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC) $(SHOW_PERF) $(FILTER_FULL_MATCH) -DUSE_AVX2 -DKSW_EQUAL -DUSE_MT_READ
DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC) $(SHOW_PERF) $(SHOW_DATA_PERF) $(DEBUG_OUTPUT) $(DEBUG_SW_EXTEND) $(FILTER_FULL_MATCH) $(USE_MT_READ) -DUSE_AVX2 -DKSW_EQUAL
LOBJS= utils.o kthread.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o bwamem_extra.o malloc_wrap.o \
QSufSort.o bwt_gen.o rope.o rle.o is.o bwtindex.o
QSufSort.o bwt_gen.o rope.o rle.o is.o bwtindex.o yarn.o
AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \
bwape.o kopen.o pemerge.o maxk.o \
bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \

62
bwa.c
View File

@ -118,9 +118,9 @@ static void *thread_bseq_read(void *data) {
return 0;
}
#define READ_ONE_SEQ(ks) \
trim_readno(&ks->name); \
kseq2bseq1(ks, &seqs[n], copy_comment); \
#define READ_ONE_SEQ(ksin) \
trim_readno(&(ksin)->name); \
kseq2bseq1(ksin, &seqs[n], copy_comment); \
seqs[n].id = n; \
size += seqs[n++].l_seq;
@ -132,11 +132,13 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c
bseq1_t *seqs = *seqs_ptr;
read_data_t d[2];
pthread_t tid[2];
const int chunk_size_narrow = 4 * 1024 * 1024;
const int init_n_reads = 20;
if (m == 0) { // 还没开辟空间,要初始化
seqs = calloc(20, sizeof(bseq1_t)); // 先读取20个reads根据reads的长度和chunk size决定要读取多少条reads
seqs = calloc(init_n_reads, sizeof(bseq1_t)); // 先读取20个reads根据reads的长度和chunk size决定要读取多少条reads
#if 1
int ks1_ret = 0, ks2_ret = 0;
int i = 10;
int i = init_n_reads >> 1;
while (i-- > 0) {
ks1_ret = kseq_read(ks);
if (ks1_ret < 0) break;
@ -155,31 +157,60 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c
*n_ = n; *seqs_ptr = seqs;
return;
}
// 重新开辟空间
m = (chunk_size + size / 10 - 1) / (size / 10) * 2;
*m_ = m;
m = (chunk_size + size / init_n_reads - 1) / (size / init_n_reads);
#else
m = 50000;
#endif
seqs = realloc(seqs, m * sizeof(bseq1_t));
memset(seqs + n, 0, sizeof(bseq1_t));
*seqs_ptr = seqs;
memset(seqs + n, 0, sizeof(bseq1_t) * (m - n));
}
d[0].copy_comment = copy_comment; d[1].copy_comment = copy_comment;
d[0].ks = ks ; d[0].seq = &seqs[0]; d[0].n_bound = (m >> 1) - (n>>1); d[0].start_pos = n;
d[1].ks = ks2; d[1].seq = &seqs[0]; d[1].n_bound = (m >> 1) - (n>>1); d[1].start_pos = n+1;
d[0].chunk_size = d[1].chunk_size = (chunk_size + 1) >> 1;
d[0].chunk_size = d[1].chunk_size = (chunk_size - chunk_size_narrow - size) >> 1;
pthread_create(&tid[0], 0, thread_bseq_read, &d[0]);
pthread_create(&tid[1], 0, thread_bseq_read, &d[1]);
pthread_join(tid[0], 0); pthread_join(tid[1], 0);
size += d[0].ret_size + d[1].ret_size;
// 如果两个线程读入的reads数量不一致
if (d[0].ret_n < d[1].ret_n)
{
int num_to_read = d[1].ret_n - d[0].ret_n;
int offset = n + d[0].ret_n * 2;
while (num_to_read-- > 0 && kseq_read(ks) >= 0) {
trim_readno(&ks->name);
kseq2bseq1(ks, &seqs[offset], copy_comment);
seqs[offset].id = offset;
size += seqs[offset].l_seq;
offset += 2;
}
d[0].ret_n = d[1].ret_n;
}
else if (d[1].ret_n < d[0].ret_n)
{
int num_to_read = d[0].ret_n - d[1].ret_n;
int offset = n + 1 + d[1].ret_n * 2;
while (num_to_read-- > 0 && kseq_read(ks2) >= 0) {
trim_readno(&ks2->name);
kseq2bseq1(ks2, &seqs[offset], copy_comment);
seqs[offset].id = offset;
size += seqs[offset].l_seq;
offset += 2;
}
d[1].ret_n = d[0].ret_n;
}
n += d[0].ret_n + d[1].ret_n;
if (size == 0 && kseq_read(ks2) >= 0) { // test if the 2nd file is finished
fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__);
} else if (size < chunk_size && d[0].ret_status > 0 && d[1].ret_status > 0) {
while (kseq_read(ks) >= 0) {
if (ks2 && kseq_read(ks2) < 0) { // the 2nd file has fewer reads
if (kseq_read(ks2) < 0) { // the 2nd file has fewer reads
fprintf(stderr, "[W::%s] the 2nd file has fewer sequences.\n", __func__);
break;
}
@ -189,16 +220,13 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c
memset(seqs + n, 0, (m-n) * sizeof(bseq1_t));
}
READ_ONE_SEQ(ks);
if (ks2) {
READ_ONE_SEQ(ks2);
}
if (size >= chunk_size && (n&1) == 0) break;
}
if (size == 0) { // test if the 2nd file is finished
if (ks2 && kseq_read(ks2) >= 0) fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__);
if (kseq_read(ks2) >= 0) fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__);
}
}
*n_ = n; *size_ = size;
if (m > *m_) *m_ = m;
*seqs_ptr = seqs;

4
bwa.h
View File

@ -65,6 +65,10 @@ typedef struct {
kstring_t sam;
} bseq1_t;
typedef struct {
kstring_t sam;
} seq_sam_t;
extern int bwa_verbose, bwa_dbg;
extern char bwa_rg_id[256];

254
bwamem.c
View File

@ -30,6 +30,7 @@
#include <assert.h>
#include <limits.h>
#include <math.h>
#include <immintrin.h>
#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif
@ -127,7 +128,12 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len,
a->mem.n = 0;
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
// 1. first pass: find all SMEMs
while (x < len) {
@ -149,8 +155,13 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len,
}
}
#ifdef SHOW_PERF
a->time_seed_1 += realtime_msec() - tmp_time;
tmp_time = realtime_msec();
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_1 += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
#ifdef FILTER_FULL_MATCH
@ -173,9 +184,15 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len,
}
}
#ifdef SHOW_PERF
a->time_seed_2 = realtime_msec() - tmp_time;
tmp_time = realtime_msec();
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_2 += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
// 3. third pass: LAST-like
if (opt->max_mem_intv > 0) {
x = 0;
@ -190,7 +207,12 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len,
}
}
#ifdef SHOW_PERF
a->time_seed_3 += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_3 += tmp_time2 - tmp_time1;
#endif
#ifdef FILTER_FULL_MATCH
@ -350,7 +372,12 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) {
mem_seed_t s;
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
s.rbeg = fmt_sa(fmt, p->x[0] + k);
//kv_push(uint64_t, v1, s.rbeg);
@ -369,9 +396,14 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
// s.rbeg = (fmt->l_pac << 1) - slen - s.rbeg;
//}
//s.rbeg = fmt_sa(fmt, p->x[0] + k);
#ifdef SHOW_PERF
tmp_time = realtime_msec() - tmp_time;
__sync_fetch_and_add(&time_bwt_sa, tmp_time);
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_sa += tmp_time2 - tmp_time1;
#endif
CHECK_ADD_CHAIN(tmp, lower, upper);
}
@ -836,15 +868,27 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac
printf("*** Left query: "); for (j = 0; j < s->qbeg; ++j) putchar("ACGTN"[(int)query[s->qbeg - 1 - j]]); putchar('\n');
}
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
#ifndef USE_AVX2
a->score = ksw_extend2(s->qbeg, qs, tmp, rs, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, aw[0], opt->pen_clip5, opt->zdrop, s->len * opt->a, &qle, &tle, &gtle, &gscore, &max_off[0], aux->sw_buf);
#else
a->score = ksw_extend2_avx2(s->qbeg, query, tmp, rseq, 1, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, opt->a, opt->b, aw[0], opt->pen_clip5, opt->zdrop, s->len * opt->a, &qle, &tle, &gtle, &gscore, &max_off[0], aux->sw_buf);
#endif
#ifdef SHOW_PERF
aux->time_bsw += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_bsw += tmp_time2 - tmp_time1;
#endif
if (bwa_verbose >= 4) { printf("*** Left extension: prev_score=%d; score=%d; bandwidth=%d; max_off_diagonal_dist=%d\n", prev, a->score, aw[0], max_off[0]); fflush(stdout); }
if (a->score == prev || max_off[0] < (aw[0]>>1) + (aw[0]>>2)) break;
@ -876,7 +920,12 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac
printf("*** Right query: "); for (j = 0; j < l_query - qe; ++j) putchar("ACGTN"[(int)query[qe+j]]); putchar('\n');
}
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
#ifndef USE_AVX2
a->score = ksw_extend2(l_query - qe, query + qe, rmax[1] - rmax[0] - re, rseq + re, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, aw[1], opt->pen_clip3, opt->zdrop, sc0, &qle, &tle, &gtle, &gscore, &max_off[1], aux->sw_buf);
@ -884,7 +933,12 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac
a->score = ksw_extend2_avx2(l_query - qe, query + qe, rmax[1] - rmax[0] - re, rseq + re, 0, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, opt->a, opt->b, aw[1], opt->pen_clip3, opt->zdrop, sc0, &qle, &tle, &gtle, &gscore, &max_off[1], aux->sw_buf);
#endif
#ifdef SHOW_PERF
aux->time_bsw += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_bsw += tmp_time2 - tmp_time1;
#endif
if (bwa_verbose >= 4) { printf("*** Right extension: prev_score=%d; score=%d; bandwidth=%d; max_off_diagonal_dist=%d\n", prev, a->score, aw[1], max_off[1]); fflush(stdout); }
if (a->score == prev || max_off[1] < (aw[1]>>1) + (aw[1]>>2)) break;
@ -1134,7 +1188,7 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a)
}
// TODO (future plan): group hits into a uint64_t[] array. This will be cleaner and more flexible
void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m)
void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss)
{
extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, mem_alnreg_v *a, int l_query, const char *query);
// kstring_t str;
@ -1146,7 +1200,7 @@ void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac,
XA = mem_gen_alt(opt, bns, pac, a, s->l_seq, s->seq);
kv_init(aa);
// str.l = str.m = 0; str.s = 0;
s->sam.l = 0;
ss->sam.l = 0;
for (k = l = 0; k < a->n; ++k) {
mem_alnreg_t *p = &a->a[k];
mem_aln_t *q;
@ -1169,10 +1223,10 @@ void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac,
mem_aln_t t;
t = mem_reg2aln(opt, bns, pac, s->l_seq, s->seq, 0);
t.flag |= extra_flag;
mem_aln2sam(opt, bns, &s->sam, s, 1, &t, 0, m);
mem_aln2sam(opt, bns, &ss->sam, s, 1, &t, 0, m);
} else {
for (k = 0; k < aa.n; ++k)
mem_aln2sam(opt, bns, &s->sam, s, aa.n, aa.a, k, m);
mem_aln2sam(opt, bns, &ss->sam, s, aa.n, aa.a, k, m);
for (k = 0; k < aa.n; ++k) free(aa.a[k].cigar);
free(aa.a);
}
@ -1309,18 +1363,18 @@ static void worker1(void *data, int i, int tid)
static void worker2(void *data, int i, int tid)
{
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]);
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2]);
extern void mem_reg2ovlp(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a);
mem_worker_t *w = (mem_worker_t*)data;
if (!(w->opt->flag&MEM_F_PE)) {
if (bwa_verbose >= 4) printf("=====> Finalizing read '%s' <=====\n", w->seqs[i].name);
mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a, w->n_processed + i);
if (w->opt->flag & MEM_F_PRIMARY5) mem_reorder_primary5(w->opt->T, &w->regs[i]);
mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0);
mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0, &w->sams[i]);
free(w->regs[i].a);
} else {
if (bwa_verbose >= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i<<1|0].name);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed>>1) + i, &w->seqs[i<<1], &w->regs[i<<1]);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed>>1) + i, &w->seqs[i<<1], &w->regs[i<<1], &w->sams[i<<1]);
free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a);
}
}
@ -1381,7 +1435,12 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in
smem->mem.n = 0;
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
// 1. first pass: find all SMEMs
//fprintf(gfp1, "read\n");
@ -1405,12 +1464,21 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in
if (start_flag) ++start_N_num;
}
}
#ifdef SHOW_PERF
a->time_seed_1 += realtime_msec() - tmp_time;
tmp_time = realtime_msec();
s2n += 1;
#ifdef SHOW_DATA_PERF
gdat[0] += 1; // read num ++
if (max_seed_len == len - start_N_num)
s1n += 1;
gdat[1] += 1; // seed-1 full match num ++
#endif
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_1 += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
#ifdef FILTER_FULL_MATCH
@ -1435,10 +1503,17 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in
}
// fprintf(gfp1, "end\n");
}
#ifdef SHOW_PERF
a->time_seed_2 += realtime_msec() - tmp_time;
tmp_time = realtime_msec();
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_2 += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
// 3. third pass: LAST-like
if (opt->max_mem_intv > 0) {
x = 0;
@ -1453,8 +1528,14 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in
} else ++x;
}
}
#ifdef SHOW_PERF
a->time_seed_3 += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
a->time_seed_3 += tmp_time2 - tmp_time1;
#endif
#ifdef FILTER_FULL_MATCH
@ -1490,19 +1571,31 @@ void find_smem(const mem_opt_t *opt, const FMTIndex *fmt, int len, const uint8_t
for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count)
{
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
uint64_t pos = fmt_sa_offset(fmt, p->x[0] + k); // this is the base coordinate in the forward-reverse reference
#ifdef SHOW_PERF
aux->time_sa += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_sa += tmp_time2 - tmp_time1;
#endif
kv_push(uint64_t, smemv->pos_arr, pos);
}
}
}
}
void generate_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t *bns, int len, const uint8_t *seq, smem_v *smemv, mem_chain_v *chain)
{
int i, j, b, e, l_rep;
@ -1615,6 +1708,15 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
mem_alnreg_v *regp;
char *seq;
#ifdef SHOW_PERF
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
// 1. seeding
for (i = 0; i < nseq; ++i) {
seq = seq_arr[i].seq;
@ -1625,6 +1727,16 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
find_smem(opt, fmt, l_seq, (uint8_t *)seq, aux, &smem_arr[i]);
}
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_seed_all += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
// 2. chain
for (i=0; i<nseq; ++i) {
seq = seq_arr[i].seq;
@ -1636,6 +1748,16 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
if (bwa_verbose >= 4) mem_print_chain(bns, chnp);
}
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_chain += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
// 3. align
for (i=0; i<nseq; ++i) {
seq = seq_arr[i].seq;
@ -1643,19 +1765,14 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
chnp = chain_arr + i;
regp = reg_arr + i;
kv_init(*regp);
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
#endif
for(j=0; j<chnp->n; ++j) {
mem_chain_t *p = &chnp->a[j];
if (bwa_verbose >= 4) err_printf("* ---> Processing chain(%d) <---\n", j);
mem_chain2aln(opt, bns, pac, l_seq, (uint8_t*)seq, p, regp, aux);
free(chnp->a[j].seeds);
}
//#ifdef SHOW_PERF
// tmp_time = realtime_msec() - tmp_time;
// __sync_fetch_and_add(&time_ksw_extend2, tmp_time);
//#endif
free(chnp->a); chnp->m = 0; chnp->a = 0;
regp->n = mem_sort_dedup_patch(opt, bns, pac, (uint8_t *)seq, regp->n, regp->a);
if (bwa_verbose >= 4) {
@ -1670,13 +1787,17 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t
if (p->rid >= 0 && bns->anns[p->rid].is_alt)
p->is_alt = 1;
}
#ifdef SHOW_PERF
tmp_time = realtime_msec() - tmp_time;
__sync_fetch_and_add(&time_ksw_extend2, tmp_time);
#endif
}
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
aux->time_bsw_all += tmp_time2 - tmp_time1;
#endif
#if 1
// 4. calc insert size
#define MIN_RATIO 0.8
@ -1708,7 +1829,7 @@ static void worker_smem_align(void *data, int i, int tid)
static void worker_sam(void *data, int i, int tid)
{
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]);
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2]);
extern void mem_reg2ovlp(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a);
extern void mem_matesw_batch(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], bseq1_t *s, mem_alnreg_v *a, int data_size);
mem_worker_t *w = (mem_worker_t*)data;
@ -1720,7 +1841,7 @@ static void worker_sam(void *data, int i, int tid)
if (bwa_verbose >= 4) printf("=====> Finalizing read '%s' <=====\n", w->seqs[i].name);
mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a, w->n_processed + i);
if (w->opt->flag & MEM_F_PRIMARY5) mem_reorder_primary5(w->opt->T, &w->regs[i]);
mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0);
mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0, &w->sams[i]);
free(w->regs[i].a);
}
} else {
@ -1730,13 +1851,13 @@ static void worker_sam(void *data, int i, int tid)
for (i=start; i<end; i+=2) {
if (bwa_verbose >= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i].name);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed+i)>>1, &w->seqs[i], &w->regs[i]);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed+i)>>1, &w->seqs[i], &w->regs[i], &w->sams[i]);
free(w->regs[i].a); free(w->regs[i+1].a);
}
}
}
void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0)
void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, seq_sam_t *sams)
{
extern void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n);
mem_pestat_t pes[4];
@ -1751,8 +1872,9 @@ void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed
w->n = n;
w->regs = realloc(w->regs, n * sizeof(mem_alnreg_v));
}
w->seqs = seqs; w->n_processed = n_processed;
w->seqs = seqs; w->n_processed = n_processed; w->sams = sams;
w->n_reads = n; w->pes = &pes[0];
#if 1
if ((opt->flag & MEM_F_PE) && !pes0) { // infer insert sizes if not provided
int i, j;
@ -1762,25 +1884,47 @@ void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed
w->isize_arr[i][j].n = 0;
}
#endif
#ifdef SHOW_PERF
int64_t tmp_time = realtime_msec();
uint64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
//kt_for(opt->n_threads, worker1, w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
kt_for(opt->n_threads, worker_smem_align, w, n_batch); // find mapping positions
#ifdef SHOW_PERF
time_work1 += realtime_msec() - tmp_time;
tmp_time = realtime_msec();
#endif
if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided
if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0
else mem_pestat(opt, w->bns->l_pac, n, w->isize_arr, pes); // otherwise, infer the insert size distribution from data
//else mem_pestat_old(opt, w->bns->l_pac, n, w->regs, pes);
}
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_work_kernel += tmp_time2 - tmp_time1;
tmp_time1 = tmp_time2;
#endif
kt_for(opt->n_threads, worker2, w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment
//kt_for(opt->n_threads, worker_sam, w, n_batch);
#ifdef SHOW_PERF
time_work2 += realtime_msec() - tmp_time;
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_work_sam += tmp_time2 - tmp_time1;
#endif
//free(w.regs);
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] Processed %d reads in %.3f CPU sec, %.3f real sec\n", __func__, n, cputime() - ctime, realtime() - rtime);

View File

@ -145,11 +145,14 @@ typedef struct { size_t n, m; mem_chain_t *a; } mem_chain_v;
typedef struct {
bwtintv_v mem, mem1, *tmpv[2];
buf_t *sw_buf, *seq_buf;
int64_t time_seed_1;
int64_t time_seed_2;
int64_t time_seed_3;
int64_t time_sa;
int64_t time_bsw;
uint64_t time_seed_1;
uint64_t time_seed_2;
uint64_t time_seed_3;
uint64_t time_seed_all;
uint64_t time_chain;
uint64_t time_sa;
uint64_t time_bsw;
uint64_t time_bsw_all;
} smem_aux_t;
typedef struct {
@ -167,6 +170,7 @@ typedef struct {
const mem_pestat_t *pes;
smem_aux_t **aux;
bseq1_t *seqs;
seq_sam_t *sams;
smem_v **smem_arr;
mem_chain_v **chain_arr;
mem_alnreg_v *regs;
@ -212,7 +216,7 @@ extern "C" {
* @param pes0 insert-size info; if NULL, infer from data; if not NULL, it should be an array with 4 elements,
* corresponding to each FF, FR, RF and RR orientation. See mem_pestat() for more info.
*/
void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0);
void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, seq_sam_t *sams);
/**
* Find the aligned regions for one query sequence

View File

@ -547,11 +547,11 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a);
#define raw_mapq(diff, a) ((int)(6.02 * (diff) / (a) + .499))
int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2])
int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2])
{
extern int mem_mark_primary_se(const mem_opt_t *opt, int n, mem_alnreg_t *a, int64_t id);
extern int mem_approx_mapq_se(const mem_opt_t *opt, const mem_alnreg_t *a);
extern void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m);
extern void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss);
extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_alnreg_v *a, int l_query, const char *query);
int n = 0, i, j, z[2], o, subo, n_sub, extra_flag = 1, n_pri[2], n_aa[2];
@ -651,12 +651,12 @@ int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, co
aa[i][n_aa[i]++] = g[i];
}
}
s[0].sam.l = 0;
ss[0].sam.l = 0;
for (i = 0; i < n_aa[0]; ++i)
mem_aln2sam(opt, bns, &s[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits
s[1].sam.l = 0;
mem_aln2sam(opt, bns, &ss[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits
ss[1].sam.l = 0;
for (i = 0; i < n_aa[1]; ++i)
mem_aln2sam(opt, bns, &s[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits
mem_aln2sam(opt, bns, &ss[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits
if (strcmp(s[0].name, s[1].name) != 0) err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
// free
for (i = 0; i < 2; ++i) {
@ -685,8 +685,8 @@ no_pairing:
d = mem_infer_dir(bns->l_pac, a[0].a[0].rb, a[1].a[0].rb, &dist);
if (!pes[d].failed && dist >= pes[d].low && dist <= pes[d].high) extra_flag |= 2;
}
mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41|extra_flag, &h[1]);
mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81|extra_flag, &h[0]);
mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41|extra_flag, &h[1], &ss[0]);
mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81|extra_flag, &h[0], &ss[1]);
if (strcmp(s[0].name, s[1].name) != 0) err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
free(h[0].cigar); free(h[1].cigar);
return n;

View File

@ -1,6 +1,8 @@
thread=1
n_r1=~/data/fastq/dataset/na12878_wes_144/diff_r1.fq
n_r2=~/data/fastq/dataset/na12878_wes_144/diff_r2.fq
n_r1=~/data/fastq/dataset/na12878_wgs_150/diff_r1.fq
n_r2=~/data/fastq/dataset/na12878_wgs_150/diff_r2.fq
#n_r1=~/data/fastq/dataset/na12878_wes_144/diff_r1.fq
#n_r2=~/data/fastq/dataset/na12878_wes_144/diff_r2.fq
#n_r1=~/data/fastq/ZY2105177532213000/n_r1.fq
#n_r2=~/data/fastq/ZY2105177532213000/n_r2.fq
#n_r1=~/data/fastq/ZY2105177532213000/sn_r1.fq

396
fastmap.c
View File

@ -32,6 +32,8 @@
#include <limits.h>
#include <ctype.h>
#include <math.h>
#include <pthread.h>
#include <immintrin.h>
#include "bwa.h"
#include "bwamem.h"
#include "kvec.h"
@ -39,11 +41,12 @@
#include "bntseq.h"
#include "kseq.h"
#include "fmt_idx.h"
#include "yarn.h"
KSEQ_DECLARE(gzFile)
// 记录运行时间的变量
/*
#ifdef SHOW_PERF
int64_t time_ksw_extend2 = 0,
time_ksw_global2 = 0,
time_ksw_align2 = 0,
@ -65,8 +68,31 @@ int64_t dn = 0, n16 = 0, n17 = 0, n18 = 0, n19 = 0, nall = 0, num_sa = 0;
int64_t s1n = 0, s2n = 0, s3n = 0, s1l = 0, s2l = 0, s3l = 0;
int64_t gn[100] = {0};
int64_t g_num_smem2 = 0;
FILE *gfp1, *gfp2, *gfp3;
#endif
*/
#ifdef SHOW_PERF
uint64_t time_process_data = 0, time_read = 0, time_write = 0, time_compute = 0,
time_seed_1 = 0, time_seed_2 = 0, time_seed_3 = 0,
time_seed_sa = 0, time_seed_chain = 0, time_seed_all = 0,
time_bsw = 0, time_bsw_all = 0,
time_work_kernel = 0, time_work_sam = 0,
time_load_idx = 0;
uint64_t proc_freq = 1000;
#endif
#ifdef SHOW_DATA_PERF
/*
gdat[0]: read nums
gdat[1]: seed-1 full match nums
*/
int64_t gdat[100];
#endif
#ifdef DEBUG_OUTPUT
FILE * gfp1, *gfp2, *gfp3;
#endif
extern unsigned char nst_nt4_table[256];
@ -77,8 +103,11 @@ void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_d
typedef struct {
int n_seqs;
int n_sams;
int m_seqs;
int m_sams;
bseq1_t *seqs;
seq_sam_t *sams;
} ktp_data_t;
typedef struct {
@ -91,60 +120,277 @@ typedef struct {
mem_worker_t *w;
int data_idx; // pingpong buffer index
ktp_data_t *data;
volatile int read_complete;
volatile int calc_complete;
long read_idx;
long calc_idx;
long write_idx;
} ktp_aux_t;
static void *process(void *shared, int step, void *_data)
// read
static inline void *read_data(ktp_aux_t *aux, ktp_data_t *data)
{
ktp_aux_t *aux = (ktp_aux_t*)shared;
ktp_data_t *data = (ktp_data_t*)_data;
int i;
if (step == 0) {
#ifdef SHOW_PERF
int64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
ktp_data_t *ret = aux->data + aux->data_idx;
aux->data_idx = !aux->data_idx;
int64_t size = 0;
bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2, aux->copy_comment, &size, &ret->m_seqs, &ret->seqs);
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_read += tmp_time2 - tmp_time1;
#endif
#if 0
// 计算内存占用
int i;
int64_t ms = 0;
for (i = 0; i < ret->n_seqs; ++i)
{
ms += ret->seqs[i].m_name + ret->seqs[i].m_seq + ret->seqs[i].m_comment + ret->seqs[i].m_qual;
}
fprintf(stderr, "seq size: %ld M\n", ms / 1024 / 1024);
#endif
if (ret->n_seqs == 0) {
return 0;
}
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size);
return ret;
} else if (step == 1) {
}
// calculate
static inline void *calc_data(ktp_aux_t *aux, ktp_data_t *data)
{
#ifdef SHOW_PERF
int64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
const mem_opt_t *opt = aux->opt;
// const bwaidx_t *idx = aux->idx;
if (data->n_sams != data->n_seqs) {
if (data->m_sams < data->m_seqs) {
data->m_sams = data->m_seqs;
data->sams = realloc(data->sams, data->m_sams * sizeof(seq_sam_t));
memset(data->sams + data->n_sams, 0, (data->m_sams - data->n_sams) * sizeof(seq_sam_t));
}
data->n_sams = data->n_seqs;
}
if (opt->flag & MEM_F_SMARTPE) {
int i;
fprintf(stderr, "here MEM_F_SMARTPE\n");
bseq1_t *sep[2];
seq_sam_t *ss[2];
int n_sep[2];
mem_opt_t tmp_opt = *opt;
bseq_classify(data->n_seqs, data->seqs, n_sep, sep);
ss[0] = calloc(0, n_sep[0] * sizeof(seq_sam_t));
ss[1] = calloc(0, n_sep[1] * sizeof(seq_sam_t));
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]);
if (n_sep[0]) {
tmp_opt.flag &= ~MEM_F_PE;
mem_process_seqs(&tmp_opt, aux->w, aux->n_processed, n_sep[0], sep[0], 0);
mem_process_seqs(&tmp_opt, aux->w, aux->n_processed, n_sep[0], sep[0], 0, ss[0]);
for (i = 0; i < n_sep[0]; ++i)
data->seqs[sep[0][i].id].sam = sep[0][i].sam;
data->sams[sep[0][i].id].sam = ss[0][i].sam;
}
if (n_sep[1]) {
tmp_opt.flag |= MEM_F_PE;
mem_process_seqs(&tmp_opt, aux->w, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0);
mem_process_seqs(&tmp_opt, aux->w, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0, ss[1]);
for (i = 0; i < n_sep[1]; ++i)
data->seqs[sep[1][i].id].sam = sep[1][i].sam;
data->sams[sep[1][i].id].sam = ss[1][i].sam;
}
free(sep[0]); free(sep[1]);
} else
mem_process_seqs(opt, aux->w, aux->n_processed, data->n_seqs, data->seqs, aux->pes0);
free(ss[0]); free(ss[1]);
} else mem_process_seqs(opt, aux->w, aux->n_processed, data->n_seqs, data->seqs, aux->pes0, data->sams);
aux->n_processed += data->n_seqs;
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_compute += tmp_time2 - tmp_time1;
#endif
return data;
} else if (step == 2) {
for (i = 0; i < data->n_seqs; ++i) {
if (data->seqs[i].sam.l) err_fputs(data->seqs[i].sam.s, stdout);
}
// write
static inline void *write_data(ktp_aux_t *aux, ktp_data_t *data)
{
int i;
#ifdef SHOW_PERF
int64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
//int64_t ms = 0;
for (i = 0; i < data->n_sams; ++i)
{
//ms += data->sams[i].sam.m;
if (data->sams[i].sam.l)
err_fputs(data->sams[i].sam.s, stdout);
}
//fprintf(stderr, "sam size: %ld M\n", ms / 1024 / 1024);
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_write += tmp_time2 - tmp_time1;
#endif
return 0;
}
// io 异步,读和写不能同时
static void *process(void *shared, int step, void *_data)
{
ktp_aux_t *aux = (ktp_aux_t*)shared;
ktp_data_t *data = (ktp_data_t*)_data;
if (step == 0) {
return read_data(aux, data);
} else if (step == 1) {
return calc_data(aux, data);
} else if (step == 2) {
return write_data(aux, data);
}
return 0;
}
////////////// 读和写可以同时进行的pipeline
static lock_t *input_have = NULL;
static lock_t *output_have = NULL;
static void *thread_read(void *data)
{
ktp_aux_t *aux = (ktp_aux_t *)data;
while (1)
{
POSSESS(input_have);
WAIT_FOR(input_have, NOT_TO_BE, 0);
RELEASE(input_have);
// fprintf(stderr, "start read: %ld\n", aux->read_idx);
if (read_data(aux, aux->data) == 0) {
POSSESS(input_have);
aux->read_complete = 1;
TWIST(input_have, BY, -1);
break;
}
POSSESS(input_have);
aux->read_idx++;
TWIST(input_have, BY, -1);
// fprintf(stderr, "next read: %ld\n", aux->read_idx);
}
return 0;
}
static void *thread_calc(void *data)
{
ktp_aux_t *aux = (ktp_aux_t *)data;
int d_idx = 0;
int add_idx = 0;
while (1)
{
// fprintf(stderr, "start calc: %ld\n", aux->calc_idx);
POSSESS(input_have);
WAIT_FOR(input_have, NOT_TO_BE, 2);
RELEASE(input_have); // 应该没必要持有吧?
POSSESS(output_have);
WAIT_FOR(output_have, NOT_TO_BE, 2);
RELEASE(output_have);
if (aux->calc_idx < aux->read_idx) {
calc_data(aux, aux->data + d_idx);
d_idx = !d_idx;
add_idx = 1;
}
if (aux->read_complete)
{
POSSESS(output_have);
if (add_idx) aux->calc_idx ++;
aux->calc_complete = 1;
TWIST(output_have, BY, 1); // 最后要唤醒写线程
break; // 计算完了
}
POSSESS(output_have);
if (add_idx) aux->calc_idx ++;
TWIST(output_have, BY, 1);
POSSESS(input_have);
TWIST(input_have, BY, 1);
// fprintf(stderr, "next calc: %ld\n", aux->calc_idx);
}
return 0;
}
static void *thread_write(void *data)
{
ktp_aux_t *aux = (ktp_aux_t *)data;
int d_idx = 0;
while (1)
{
// fprintf(stderr, "start write: %ld\n", aux->write_idx);
POSSESS(output_have);
WAIT_FOR(output_have, NOT_TO_BE, 0);
RELEASE(output_have);
if (aux->write_idx < aux->calc_idx) {
write_data(aux, aux->data + d_idx);
d_idx = !d_idx;
aux->write_idx++;
}
if (aux->calc_complete) {
if (aux->write_idx < aux->calc_idx)
write_data(aux, aux->data + d_idx);
break;
}
POSSESS(output_have);
TWIST(output_have, BY, -1);
// fprintf(stderr, "next write: %ld\n", aux->write_idx);
}
return 0;
}
static void new_pipeline(ktp_aux_t *aux)
{
input_have = NEW_LOCK(2);
output_have = NEW_LOCK(0);
pthread_t tid[3];
int i;
pthread_create(&tid[0], 0, thread_read, aux);
pthread_create(&tid[1], 0, thread_calc, aux);
pthread_create(&tid[2], 0, thread_write, aux);
for (i = 0; i < 3; ++i) pthread_join(tid[i], 0);
}
//////////////
static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
{
if (opt0->a) { // matching score is changed
@ -163,11 +409,23 @@ static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
int main_mem(int argc, char *argv[])
{
#ifdef SHOW_PERF
#ifdef DEBUG_OUTPUT
gfp1 = fopen("./fp1.txt", "w");
gfp2 = fopen("./fp2.txt", "w");
gfp3 = fopen("./fp3.txt", "w");
#endif
#ifdef SHOW_PERF
#if USE_RDTSC
uint64_t tmp_time = __rdtsc();
sleep(1);
proc_freq = __rdtsc() - tmp_time;
#else
proc_freq = 1000;
#endif
#endif
mem_opt_t *opt, opt0;
int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0;
int fixed_chunk_size = -1;
@ -184,9 +442,10 @@ int main_mem(int argc, char *argv[])
aux.opt = opt = mem_opt_init();
memset(&opt0, 0, sizeof(mem_opt_t));
while ((c = getopt(argc, argv, "51qpaMCSPVYjuk:c:v:s:r:t:b:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:o:f:W:x:G:h:y:K:X:H:F:z:")) >= 0) {
while ((c = getopt(argc, argv, "512qpaMCSPVYjuk:c:v:s:r:t:b:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:o:f:W:x:G:h:y:K:X:H:F:z:")) >= 0) {
if (c == 'k') opt->min_seed_len = atoi(optarg), opt0.min_seed_len = 1;
else if (c == '1') no_mt_io = 1;
else if (c == '2') no_mt_io = 2;
else if (c == 'x') mode = optarg;
else if (c == 'w') opt->w = atoi(optarg), opt0.w = 1;
else if (c == 'A') opt->a = atoi(optarg), opt0.a = 1;
@ -313,6 +572,7 @@ int main_mem(int argc, char *argv[])
fprintf(stderr, " -W INT discard a chain if seeded bases shorter than INT [0]\n");
fprintf(stderr, " -m INT perform at most INT rounds of mate rescues for each read [%d]\n", opt->max_matesw);
fprintf(stderr, " -S skip mate rescue\n");
fprintf(stderr, " -2 read and write in different disks \n");
fprintf(stderr, " -P skip pairing; mate rescue performed unless -S also in use\n");
fprintf(stderr, "\nScoring options:\n\n");
fprintf(stderr, " -A INT score for a sequence match, which scales options -TdBOELU unless overridden [%d]\n", opt->a);
@ -391,6 +651,16 @@ int main_mem(int argc, char *argv[])
}
} else update_a(opt, &opt0);
bwa_fill_scmat(opt->a, opt->b, opt->mat);
#ifdef SHOW_PERF
int64_t tmp_time1, tmp_time2;
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
aux.idx = bwa_idx_load_from_shm(argv[optind]);
if (aux.idx == 0) {
if ((aux.idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak
@ -400,6 +670,15 @@ int main_mem(int argc, char *argv[])
for (i = 0; i < aux.idx->bns->n_seqs; ++i)
aux.idx->bns->anns[i].is_alt = 0;
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_load_idx += tmp_time2 - tmp_time1;
#endif
ko = kopen(argv[optind + 1], &fd);
if (ko == 0) {
if (bwa_verbose >= 1) fprintf(stderr, "[E::%s] fail to open file `%s'.\n", __func__, argv[optind + 1]);
@ -430,8 +709,27 @@ int main_mem(int argc, char *argv[])
bwa_print_sam_hdr(aux.idx->bns, hdr_line);
aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads;
kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3);
//kt_pipeline(3, process, &aux, 3);
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time1 = __rdtsc();
#else
tmp_time1 = realtime_msec();
#endif
#endif
if (no_mt_io == 2) new_pipeline(&aux);
else kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3);
#ifdef SHOW_PERF
#if USE_RDTSC
tmp_time2 = __rdtsc();
#else
tmp_time2 = realtime_msec();
#endif
time_process_data += tmp_time2 - tmp_time1;
#endif
free(hdr_line);
free(opt);
bwa_idx_destroy(aux.idx);
@ -443,26 +741,60 @@ int main_mem(int argc, char *argv[])
}
#ifdef SHOW_PERF
int64_t avg_seed_1 = 0, avg_seed_2 = 0, avg_seed_3 = 0, avg_sa = 0, avg_bsw = 0;
uint64_t avg_seed_1 = 0, avg_seed_2 = 0, avg_seed_3 = 0,
avg_sa = 0, avg_chain = 0, avg_seed_all = 0,
avg_bsw = 0, avg_bsw_all = 0;
uint64_t min_seed_all = UINT64_MAX, min_bsw_all = UINT64_MAX;
uint64_t max_seed_all = 0, max_bsw_all = 0;
for (i = 0; i < opt->n_threads; ++i) {
avg_seed_1 += aux.w->aux[i]->time_seed_1;
avg_seed_2 += aux.w->aux[i]->time_seed_2;
avg_seed_3 += aux.w->aux[i]->time_seed_3;
avg_sa += aux.w->aux[i]->time_sa;
avg_chain += aux.w->aux[i]->time_chain;
avg_seed_all += aux.w->aux[i]->time_seed_all;
avg_bsw += aux.w->aux[i]->time_bsw;
avg_bsw_all += aux.w->aux[i]->time_bsw_all;
min_seed_all = MIN(min_seed_all, aux.w->aux[i]->time_seed_all);
min_bsw_all = MIN(min_bsw_all, aux.w->aux[i]->time_bsw_all);
max_seed_all = MAX(max_seed_all, aux.w->aux[i]->time_seed_all);
max_bsw_all = MAX(max_bsw_all, aux.w->aux[i]->time_bsw_all);
}
fprintf(stderr, "\n");
fprintf(stderr, "time_work1: %f s\n", time_work1 / 1000.0);
fprintf(stderr, "time_work2: %f s\n", time_work2 / 1000.0);
fprintf(stderr, "time_process_seq: %f s\n", (time_work1 + time_work2) / 1000.0);
fprintf(stderr, "time_seed_1: %f s\n", avg_seed_1 / 1000.0 / opt->n_threads);
fprintf(stderr, "time_seed_2: %f s\n", avg_seed_2 / 1000.0 / opt->n_threads);
fprintf(stderr, "time_seed_3: %f s\n", avg_seed_3 / 1000.0 / opt->n_threads);
fprintf(stderr, "time_bwt_sa: %f s\n", avg_sa / 1000.0 / opt->n_threads);
fprintf(stderr, "time_bsw: %f s\n", avg_bsw / 1000.0 / opt->n_threads);
fprintf(stderr, "full_match: %ld, all: %ld\n", s1n, s2n);
fprintf(stderr, "time_load_idx: %0.2lf s\n", time_load_idx * 1.0 / proc_freq);
fprintf(stderr, "time_process_data: %0.2lf s\n", time_process_data * 1.0 / proc_freq);
fprintf(stderr, "time_read: %0.2lf s\n", time_read * 1.0 / proc_freq);
fprintf(stderr, "time_compute: %0.2lf s\n", time_compute * 1.0 / proc_freq);
fprintf(stderr, "time_write: %0.2lf s\n", time_write * 1.0 / proc_freq);
fprintf(stderr, "time_work_kernel: %0.2lf s\n", time_work_kernel * 1.0 / proc_freq);
fprintf(stderr, "time_work_sam: %0.2lf s\n", time_work_sam * 1.0 / proc_freq);
fprintf(stderr, "time_process_seq: %0.2lf s\n", (time_work_kernel + time_work_sam) * 1.0 / proc_freq);
fprintf(stderr, "time_seed_all: %0.2lf (%0.2lf, %0.2lf) s\n", avg_seed_all * 1.0 / proc_freq / opt->n_threads, min_seed_all * 1.0 / proc_freq, max_seed_all * 1.0 / proc_freq);
fprintf(stderr, "time_seed_1: %0.2lf s\n", avg_seed_1 * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "time_seed_2: %0.2lf s\n", avg_seed_2 * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "time_seed_3: %0.2lf s\n", avg_seed_3 * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "time_sa: %0.2lf s\n", avg_sa * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "time_chain: %0.2lf s\n", avg_chain * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "time_bsw_all: %0.2lf (%0.2lf, %0.2lf) s\n", avg_bsw_all * 1.0 / proc_freq / opt->n_threads, min_bsw_all * 1.0 / proc_freq, max_bsw_all * 1.0 / proc_freq);
fprintf(stderr, "time_bsw: %0.2lf s\n", avg_bsw * 1.0 / proc_freq / opt->n_threads);
fprintf(stderr, "\n");
#endif
#ifdef SHOW_DATA_PERF
fprintf(stderr, "\n");
fprintf(stderr, "full_match: %ld, all: %ld\n", gdat[1], gdat[0]);
fprintf(stderr, "\n");
#endif
#ifdef DEBUG_OUTPUT
fclose(gfp1);
fclose(gfp2);
fclose(gfp3);

View File

@ -10,8 +10,6 @@
extern FILE *gfp1, *gfp2, *gfp3;
#endif
//#define DEBUG_SW_EXTEND 1
#define NO_VAL -1
#ifdef __GNUC__
@ -151,7 +149,7 @@ static const uint16_t h_vec_int_mask[SIMD_WIDTH][SIMD_WIDTH] = {
max_vec = _mm256_max_epu16(max_vec, _mm256_alignr_epi8(max_vec, max_vec, 8)); \
max_vec = _mm256_max_epu16(max_vec, _mm256_permute2x128_si256(max_vec, max_vec, 0x01)); \
int16_t *maxVal = (int16_t*)&max_vec; \
m = MAX(m,maxVal[0]); \
m = MAX(m, maxVal[0]); /*用来解决与BSW结果不一样的第二种情况(上边界)*/ \
if (maxVal[0] > 0) { \
for(j=beg, i=iend; j<=end; j+=SIMD_WIDTH, i-=SIMD_WIDTH) { \
__m256i h2_vec = _mm256_loadu_si256((__m256i*) (&hA2[j])); \
@ -201,7 +199,9 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
{
//return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off);
#ifdef DEBUG_OUTPUT
//fprintf(gfp1, "%d\n", qlen);
#endif
if (qlen * a + h0 < 255) return ksw_extend2_avx2_u8(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, a, b, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off, buf);
int16_t *mA,*hA, *eA, *fA, *mA1, *mA2, *hA0, *hA1, *eA1, *fA1, *hA2, *eA2, *fA2; // hA0保存上上个col的H其他的保存上个H E F M
@ -277,7 +277,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
int m_last=0;
int iend;
//#undef KSW_EQUAL
#ifdef KSW_EQUAL
int midx = 1, icheck = 0, checkspecial = 1;
int m3 = 0, m2 = 0, m1 = 0;
@ -375,6 +375,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
SIMD_FIND_MAX;
#ifdef KSW_EQUAL
// 用来解决与BSW结果不一样的第一种情况(左边界)
#if 0
if (hA1[0] < b && checkspecial) {
int mi;
@ -477,12 +478,12 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
SWAP_DATA_POINTER;
}
#ifdef DEBUG_OUTPUT
#ifdef DEBUG_SW_EXTEND
fprintf(gfp1, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
fprintf(gfp2, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
fprintf(gfp3, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
#endif
#ifdef DEBUG_SW_EXTEND
fprintf(gfp1, "%-4d", -1);
fprintf(gfp2, "%-4d", -1);
fprintf(gfp3, "%-4d", -1);
@ -518,6 +519,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
fprintf(gfp2, "\n");
fprintf(gfp3, "\n");
}
#endif
#endif
// free(mem);
if (_qle) *_qle = max_j + 1;
@ -621,7 +623,8 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
}
ins[0][0] = del[0][0] = score[0][0] = h0;
#endif
#ifdef SHOW_PERF
#ifdef DEBUG_OUTPUT
// fprintf(gfp1, "start\n");
int bsw_cal_num = 0;
int real_cal_num = 0;
@ -634,6 +637,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
}
// fprintf(gfp1, "%d\n", bsw_cal_num);
#endif
for (i = 0; LIKELY(i < tlen); ++i) {
int t, f = 0, h1, m = 0, mj = -1;
int8_t *q = &qp[ref[i] * qlen];
@ -646,10 +650,10 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
h1 = h0 - (o_del + e_del * (i + 1));
if (h1 < 0) h1 = 0;
} else h1 = 0;
//m = h1;
//m = h1; // 用来解决和VP-BSW结果不一样的第一种情况(左边界)
for (j = beg; LIKELY(j < end); ++j) {
#ifdef SHOW_PERF
#ifdef DEBUG_OUTPUT
real_cal_num++;
#endif
@ -668,6 +672,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
h = M > e? M : e; // e and f are guaranteed to be non-negative, so h>=0 even if M<0
h = h > f? h : f;
h1 = h; // save H(i,j) to h1 for the next column
#ifdef DEBUG_SW_EXTEND
score[i+1][j+1] = h;
#endif
@ -677,6 +682,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
t = t > 0? t : 0;
e -= e_del;
e = e > t? e : t; // computed E(i+1,j)
#ifdef DEBUG_SW_EXTEND
del[i+1][j+1] = e;
#endif
@ -713,12 +719,12 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
// fprintf(stderr, "beg: %d; end: %d\n", beg, end);
//}
}
#ifdef DEBUG_OUTPUT
#ifdef DEBUG_SW_EXTEND
fprintf(gfp1, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
fprintf(gfp2, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
fprintf(gfp3, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max);
#endif
#ifdef DEBUG_SW_EXTEND
fprintf(gfp1, "%-4d", -1);
fprintf(gfp2, "%-4d", -1);
fprintf(gfp3, "%-4d", -1);
@ -759,7 +765,9 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
fprintf(gfp3, "\n");
}
#endif
#ifdef SHOW_PERF
#endif
#ifdef DEBUG_OUTPUT
//fprintf(gfp1, "%d\nend\n", real_cal_num);
#endif

12
run.sh
View File

@ -1,12 +1,16 @@
thread=64
thread=1
#n_r1=~/data/fastq/dataset/na12878_wgs_150/bn1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_150/bn2.fq
n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq
n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq
#n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq
#n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq
#n_r1=~/data/fastq/dataset/na12878_wgs_150/n1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_150/n2.fq
#n_r1=~/data/fastq/dataset/na12878_wgs_101/na_1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_101/na_2.fq
n_r1=~/data/fastq/dataset/zy_wes/sn1.fq
n_r2=~/data/fastq/dataset/zy_wes/sn2.fq
#n_r1=~/data/fastq/zy/wgs/n1.fq
#n_r2=~/data/fastq/zy/wgs/n2.fq
#n_r1=~/data/fastq/ds/n1.fq
@ -72,6 +76,6 @@ time ./bwa mem -t $thread -b 256 -M -R @RG\\tID:normal\\tSM:normal\\tPL:illumina
$reference \
$n_r1 \
$n_r2 \
-o $out
-o $out -2

View File

@ -310,11 +310,11 @@ double realtime(void)
return tp.tv_sec + tp.tv_usec * 1e-6;
}
int64_t realtime_msec(void)
uint64_t realtime_msec(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (int64_t)1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec));
return (uint64_t)1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec));
}
long peakrss(void)

55
utils.h
View File

@ -31,31 +31,24 @@
#include <stdio.h>
#include <zlib.h>
#define USE_RDTSC 0
#ifdef SHOW_PERF
extern uint64_t time_process_data, time_read, time_write, time_compute,
time_seed_1, time_seed_2, time_seed_3,
time_seed_sa, time_seed_chain, time_seed_all,
time_bsw, time_bsw_all,
time_work_kernel, time_work_sam,
time_load_idx;
extern uint64_t proc_freq;
#endif
extern int64_t time_ksw_extend2,
time_ksw_global2,
time_ksw_align2,
time_bwt_smem1a,
time_seed_1,
time_seed_2,
time_seed_3,
time_fmt_smem_0,
time_bwt_extend,
time_bwt_occ4,
time_bwt_sa,
time_bwt_sa_read,
time_bns,
time_work1,
time_work2,
time_flt_chain;
#ifdef SHOW_DATA_PERF
extern int64_t gdat[100];
#endif
extern int64_t gn[100];
extern int64_t dn, n16, n17, n18, n19, nall, num_sa;
extern int64_t s1n, s2n, s3n, s1l, s2l, s3l;
extern int64_t g_num_smem2;
#ifdef DEBUG_OUTPUT
extern FILE *gfp1, *gfp2, *gfp3;
#endif
#undef MAX
@ -79,6 +72,24 @@ extern FILE *gfp1, *gfp2, *gfp3;
#define xassert(cond, msg) if ((cond) == 0) _err_fatal_simple_core(__func__, msg)
#if defined(__GNUC__) && __GNUC__ < 11 && !defined(__clang__)
#if defined(__i386__)
static inline unsigned long long __rdtsc(void)
{
unsigned long long int x;
__asm__ volatile(".byte 0x0f, 0x31" : "=A"(x));
return x;
}
#elif defined(__x86_64__)
static inline unsigned long long __rdtsc(void)
{
unsigned hi, lo;
__asm__ __volatile__("rdtsc" : "=a"(lo), "=d"(hi));
return ((unsigned long long)lo) | (((unsigned long long)hi) << 32);
}
#endif
#endif
typedef struct {
uint64_t x, y;
} pair64_t;
@ -121,7 +132,7 @@ extern "C" {
double cputime(void);
double realtime(void);
int64_t realtime_msec(void);
uint64_t realtime_msec(void);
long peakrss(void);
void ks_introsort_64 (size_t n, uint64_t *a);

398
yarn.c 100644
View File

@ -0,0 +1,398 @@
/* yarn.c -- generic thread operations implemented using pthread functions
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
* For conditions of distribution and use, see copyright notice in yarn.h
*/
/* Basic thread operations implemented using the POSIX pthread library. All
pthread references are isolated within this module to allow alternate
implementations with other thread libraries. See yarn.h for the description
of these operations. */
/* Version history:
1.0 19 Oct 2008 First version
1.1 26 Oct 2008 No need to set the stack size -- remove
Add yarn_abort() function for clean-up on error exit
1.2 19 Dec 2011 (changes reversed in 1.3)
1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
Update thread portability #defines per IEEE 1003.1-2008
Fix documentation in yarn.h for yarn_prefix
1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr
Accept and do nothing for NULL argument to free_lock()
1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel()
Normalize the code style
1.6 3 Apr 2019 Add debugging information to fail() error messages
1.7 12 Apr 2020 Fix use after free bug in ignition()
*/
// For thread portability.
#define _XOPEN_SOURCE 700
#define _POSIX_C_SOURCE 200809L
#define _THREAD_SAFE
// Use large file functions if available.
#define _FILE_OFFSET_BITS 64
// External libraries and entities referenced.
#include <stdio.h> // fprintf(), stderr
#include <stdlib.h> // exit(), malloc(), free(), NULL
#include <pthread.h> // pthread_t, pthread_create(), pthread_join(),
// pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
// PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
// pthread_self(), pthread_equal(),
// pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
// pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
// pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
// pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy()
#include <errno.h> // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN
// Interface definition.
#include "yarn.h"
// Constants.
#define local static // for non-exported functions and globals
// Error handling external globals, resettable by application.
char *yarn_prefix = (char*)"yarn";
void (*yarn_abort)(int) = NULL;
// Immediately exit -- use for errors that shouldn't ever happen.
local void fail(int err, char const *file, long line, char const *func) {
fprintf(stderr, "%s: ", yarn_prefix);
switch (err) {
case EPERM:
fputs("already unlocked", stderr);
break;
case ESRCH:
fputs("no such thread", stderr);
break;
case EDEADLK:
fputs("resource deadlock", stderr);
break;
case ENOMEM:
fputs("out of memory", stderr);
break;
case EBUSY:
fputs("can't destroy locked resource", stderr);
break;
case EINVAL:
fputs("invalid request", stderr);
break;
case EAGAIN:
fputs("resource unavailable", stderr);
break;
default:
fprintf(stderr, "internal error %d", err);
}
fprintf(stderr, " (%s:%ld:%s)\n", file, line, func);
if (yarn_abort != NULL)
yarn_abort(err);
exit(err);
}
// Memory handling routines provided by user. If none are provided, malloc()
// and free() are used, which are therefore assumed to be thread-safe.
typedef void *(*malloc_t)(size_t);
typedef void (*free_t)(void *);
local malloc_t my_malloc_f = malloc;
local free_t my_free = free;
// Use user-supplied allocation routines instead of malloc() and free().
void yarn_mem(malloc_t lease, free_t vacate) {
my_malloc_f = lease;
my_free = vacate;
}
// Memory allocation that cannot fail (from the point of view of the caller).
local void *my_malloc(size_t size, char const *file, long line) {
void *block;
if ((block = my_malloc_f(size)) == NULL)
fail(ENOMEM, file, line, "malloc");
return block;
}
// -- Lock functions --
struct lock_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
long value;
};
lock_t *new_lock_(long initial, char const *file, long line) {
lock_t *bolt = (lock_t *)my_malloc(sizeof(struct lock_s), file, line);
int ret = pthread_mutex_init(&(bolt->mutex), NULL);
if (ret)
fail(ret, file, line, "mutex_init");
ret = pthread_cond_init(&(bolt->cond), NULL);
if (ret)
fail(ret, file, line, "cond_init");
bolt->value = initial;
return bolt;
}
void possess_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_lock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_lock");
}
void release_(lock_t *bolt, char const *file, long line) {
int ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
void twist_(lock_t *bolt, enum twist_op op, long val,
char const *file, long line) {
if (op == TO)
bolt->value = val;
else if (op == BY)
bolt->value += val;
int ret = pthread_cond_broadcast(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_broadcast");
ret = pthread_mutex_unlock(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_unlock");
}
#define until(a) while(!(a))
void wait_for_(lock_t *bolt, enum wait_op op, long val,
char const *file, long line) {
switch (op) {
case TO_BE:
until (bolt->value == val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case NOT_TO_BE:
until (bolt->value != val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_MORE_THAN:
until (bolt->value > val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
break;
case TO_BE_LESS_THAN:
until (bolt->value < val) {
int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex));
if (ret)
fail(ret, file, line, "cond_wait");
}
}
}
long peek_lock(lock_t *bolt) {
return bolt->value;
}
void free_lock_(lock_t *bolt, char const *file, long line) {
if (bolt == NULL)
return;
int ret = pthread_cond_destroy(&(bolt->cond));
if (ret)
fail(ret, file, line, "cond_destroy");
ret = pthread_mutex_destroy(&(bolt->mutex));
if (ret)
fail(ret, file, line, "mutex_destroy");
my_free(bolt);
}
// -- Thread functions (uses the lock_t functions above) --
struct thread_s {
pthread_t id;
int done; // true if this thread has exited
thread *next; // for list of all launched threads
};
// List of threads launched but not joined, count of threads exited but not
// joined (incremented by ignition() just before exiting).
local lock_t threads_lock = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER,
0 // number of threads exited but not joined
};
local thread *threads = NULL; // list of extant threads
// Structure in which to pass the probe and its payload to ignition().
struct capsule {
void (*probe)(void *);
void *payload;
char const *file;
long line;
};
// Mark the calling thread as done and alert join_all().
local void reenter(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// find this thread in the threads list by matching the thread id
pthread_t me = pthread_self();
possess_(&(threads_lock), capsule->file, capsule->line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (pthread_equal(match->id, me))
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, capsule->file, capsule->line, "reenter lost");
// mark this thread as done and move it to the head of the list
match->done = 1;
if (threads != match) {
*prior = match->next;
match->next = threads;
threads = match;
}
// update the count of threads to be joined and alert join_all()
twist_(&(threads_lock), BY, +1, capsule->file, capsule->line);
// free the capsule resource, even if the thread is cancelled (though yarn
// doesn't use pthread_cancel() -- you never know)
my_free(capsule);
}
// All threads go through this routine. Just before a thread exits, it marks
// itself as done in the threads list and alerts join_all() so that the thread
// resources can be released. Use a cleanup stack so that the marking occurs
// even if the thread is cancelled.
local void *ignition(void *arg) {
struct capsule *capsule = (struct capsule *)arg;
// run reenter() before leaving
pthread_cleanup_push(reenter, arg);
// execute the requested function with argument
capsule->probe(capsule->payload);
// mark this thread as done, letting join_all() know, and free capsule
pthread_cleanup_pop(1);
// exit thread
return NULL;
}
// Not all POSIX implementations create threads as joinable by default, so that
// is made explicit here.
thread *launch_(void (*probe)(void *), void *payload,
char const *file, long line) {
// construct the requested call and argument for the ignition() routine
// (allocated instead of automatic so that we're sure this will still be
// there when ignition() actually starts up -- ignition() will free this
// allocation)
struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line);
capsule->probe = probe;
capsule->payload = payload;
capsule->file = file;
capsule->line = line;
// assure this thread is in the list before join_all() or ignition() looks
// for it
possess_(&(threads_lock), file, line);
// create the thread and call ignition() from that thread
thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line);
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (ret)
fail(ret, file, line, "attr_init");
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (ret)
fail(ret, file, line, "attr_setdetachstate");
ret = pthread_create(&(th->id), &attr, ignition, capsule);
if (ret)
fail(ret, file, line, "create");
ret = pthread_attr_destroy(&attr);
if (ret)
fail(ret, file, line, "attr_destroy");
// put the thread in the threads list for join_all()
th->done = 0;
th->next = threads;
threads = th;
release_(&(threads_lock), file, line);
return th;
}
void join_(thread *ally, char const *file, long line) {
// wait for thread to exit and return its resources
int ret = pthread_join(ally->id, NULL);
if (ret)
fail(ret, file, line, "join");
// find the thread in the threads list
possess_(&(threads_lock), file, line);
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match == ally)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join lost");
// remove thread from list and update exited count, free thread
if (match->done)
threads_lock.value--;
*prior = match->next;
release_(&(threads_lock), file, line);
my_free(ally);
}
// This implementation of join_all() only attempts to join threads that have
// announced that they have exited (see ignition()). When there are many
// threads, this is faster than waiting for some random thread to exit while a
// bunch of other threads have already exited.
int join_all_(char const *file, long line) {
// grab the threads list and initialize the joined count
int count = 0;
possess_(&(threads_lock), file, line);
// do until threads list is empty
while (threads != NULL) {
// wait until at least one thread has reentered
wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line);
// find the first thread marked done (should be at or near the top)
thread **prior = &(threads);
thread *match;
while ((match = *prior) != NULL) {
if (match->done)
break;
prior = &(match->next);
}
if (match == NULL)
fail(ESRCH, file, line, "join_all lost");
// join the thread (will be almost immediate), remove from the threads
// list, update the reenter count, and free the thread
int ret = pthread_join(match->id, NULL);
if (ret)
fail(ret, file, line, "join");
threads_lock.value--;
*prior = match->next;
my_free(match);
count++;
}
// let go of the threads list and return the number of threads joined
release_(&(threads_lock), file, line);
return count;
}

139
yarn.h 100644
View File

@ -0,0 +1,139 @@
/* yarn.h -- generic interface for thread operations
* Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler
* Version 1.7 12 Apr 2020 Mark Adler
*/
/*
This software is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
Mark Adler
madler@alumni.caltech.edu
*/
/* Basic thread operations
This interface isolates the local operating system implementation of threads
from the application in order to facilitate platform independent use of
threads. All of the implementation details are deliberately hidden.
Assuming adequate system resources and proper use, none of these functions
can fail. As a result, any errors encountered will cause an exit() to be
executed, or the execution of your own optionally-provided abort function.
These functions allow the simple launching and joining of threads, and the
locking of objects and synchronization of changes of objects. The latter is
implemented with a single lock_t type that contains an integer value. The
value can be ignored for simple exclusive access to an object, or the value
can be used to signal and wait for changes to an object.
-- Arguments --
thread *thread; identifier for launched thread, used by join
void probe(void *); pointer to function "probe", run when thread starts
void *payload; single argument passed to the probe function
lock_t *lock_t; a lock_t with a value -- used for exclusive access to
an object and to synchronize threads waiting for
changes to an object
long val; value to set lock_t, increment lock_t, or wait for
int n; number of threads joined
-- Thread functions --
thread = launch(probe, payload) - launch a thread -- exit via probe() return
join(thread) - join a thread and by joining end it, waiting for the thread
to exit if it hasn't already -- will free the resources allocated by
launch() (don't try to join the same thread more than once)
n = join_all() - join all threads launched by launch() that are not joined
yet and free the resources allocated by the launches, usually to clean
up when the thread processing is done -- join_all() returns an int with
the count of the number of threads joined (join_all() should only be
called from the main thread, and should only be called after any calls
of join() have completed)
-- Lock functions --
lock_t = new_lock(val) - create a new lock_t with initial value val (lock_t is
created in the released state)
possess(lock_t) - acquire exclusive possession of a lock_t, waiting if necessary
twist(lock_t, [TO | BY], val) - set lock_t to or increment lock_t by val, signal
all threads waiting on this lock_t and then release the lock_t -- must
possess the lock_t before calling (twist releases, so don't do a
release() after a twist() on the same lock_t)
wait_for(lock_t, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val)
- wait on lock_t value to be, not to be, be greater than, or be less than
val -- must possess the lock_t before calling, will possess the lock_t on
return but the lock_t is released while waiting to permit other threads
to use twist() to change the value and signal the change (so make sure
that the object is in a usable state when waiting)
release(lock_t) - release a possessed lock_t (do not try to release a lock_t that
the current thread does not possess)
val = peek_lock(lock_t) - return the value of the lock_t (assumes that lock_t is
already possessed, no possess or release is done by peek_lock())
free_lock(lock_t) - free the resources allocated by new_lock() (application
must assure that the lock_t is released before calling free_lock())
-- Memory allocation ---
yarn_mem(better_malloc, better_free) - set the memory allocation and free
routines for use by the yarn routines where the supplied routines have
the same interface and operation as malloc() and free(), and may be
provided in order to supply thread-safe memory allocation routines or
for any other reason -- by default malloc() and free() will be used
-- Error control --
yarn_prefix - a char pointer to a string that will be the prefix for any
error messages that these routines generate before exiting -- if not
changed by the application, "yarn" will be used
yarn_abort - an external function that will be executed when there is an
internal yarn error, due to out of memory or misuse -- this function
may exit to abort the application, or if it returns, the yarn error
handler will exit (set to NULL by default for no action)
*/
extern char *yarn_prefix;
extern void (*yarn_abort)(int);
void yarn_mem(void *(*)(size_t), void (*)(void *));
typedef struct thread_s thread;
thread *launch_(void (*)(void *), void *, char const *, long);
#define LAUNCH(a, b) launch_(a, b, __FILE__, __LINE__)
void join_(thread *, char const *, long);
#define JOIN(a) join_(a, __FILE__, __LINE__)
int join_all_(char const *, long);
#define JOIN_ALL() join_all_(__FILE__, __LINE__)
typedef struct lock_s lock_t;
lock_t *new_lock_(long, char const *, long);
#define NEW_LOCK(a) new_lock_(a, __FILE__, __LINE__)
void possess_(lock_t *, char const *, long);
#define POSSESS(a) possess_(a, __FILE__, __LINE__)
void release_(lock_t *, char const *, long);
// #define release(a) release_(a, __FILE__, __LINE__)
#define RELEASE(a) release_(a, __FILE__, __LINE__)
enum twist_op { TO, BY };
void twist_(lock_t *, enum twist_op, long, char const *, long);
#define TWIST(a, b, c) twist_(a, b, c, __FILE__, __LINE__)
enum wait_op {
TO_BE, /* or */ NOT_TO_BE, /* that is the question */
TO_BE_MORE_THAN, TO_BE_LESS_THAN };
void wait_for_(lock_t *, enum wait_op, long, char const *, long);
#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__)
long peek_lock(lock_t *);
void free_lock_(lock_t *, char const *, long);
#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__)