From 40544d02feca86f67a1c3973f77d7fe1e70dba73 Mon Sep 17 00:00:00 2001 From: zzh Date: Mon, 13 Jan 2025 20:24:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8gz=5Fread=E9=83=A8=E5=88=86=E5=8A=A0?= =?UTF-8?q?=E4=BA=86=E5=BC=82=E6=AD=A5=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 6 ++- Makefile | 4 +- bwa.c | 25 ++++++--- fastmap.c | 53 +++++++++++++------- ksw_extend2_avx2.c | 12 +++-- ksw_extend2_avx2_u8.c | 8 +-- profiling.c | 14 ++++++ profiling.h | 20 ++++++-- run.sh | 79 ++++++++++++----------------- utils.c | 114 ++++++++++++++++++++++++++++++++++++++++-- utils.h | 3 ++ 11 files changed, 248 insertions(+), 90 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index d9cb6da..834a9a0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -42,6 +42,10 @@ "ertindex.h": "c", "ertseeding.h": "c", "algorithm": "c", - "filesystem": "c" + "filesystem": "c", + "deque": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "vector": "cpp" } } \ No newline at end of file diff --git a/Makefile b/Makefile index 9506ced..96f3905 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ CC= gcc -CFLAGS= -g -Wall -Wno-unused-function -mavx2 #-O2 +CFLAGS= -g -Wall -Wno-unused-function -mavx2 -O2 WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS SHOW_PERF= -DSHOW_PERF -SHOW_DATA_PERF= -DSHOW_DATA_PERF +SHOW_DATA_PERF= #-DSHOW_DATA_PERF FILTER_FULL_MATCH= #-DFILTER_FULL_MATCH USE_MT_READ= -DUSE_MT_READ diff --git a/bwa.c b/bwa.c index 8827e57..0cf9ce7 100644 --- a/bwa.c +++ b/bwa.c @@ -108,14 +108,20 @@ static void *thread_bseq_read(void *data) { int cur_n = 0, cur_pos = d->start_pos, size = 0; int ret_status = 1; - while (cur_n < d->n_bound && (ret_status = kseq_read(ks)) >= 0) { - trim_readno(&ks->name); - kseq2bseq1(ks, seqs + cur_pos, copy_comment); + //pthread_t thread_id = pthread_self(); + //fprintf(stderr, "Thread ID: %lu\n", thread_id); + PROF_START(parse); + while (cur_n < d->n_bound && (ret_status = kseq_read(ks)) >= 0) { + + trim_readno(&ks->name); + kseq2bseq1(ks, seqs + cur_pos, copy_comment); seqs[cur_pos].id = cur_pos; size += seqs[cur_pos].l_seq; cur_pos += 2; cur_n += 1; - if (size >= chunk_size) break; + + if (size >= chunk_size) break; } + PROF_END(gprof[G_parse_seq], parse); d->ret_n = cur_n; d->ret_size = size; d->ret_status = ret_status; return 0; } @@ -243,12 +249,13 @@ void bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_comment int size = 0, m, n; bseq1_t *seqs = *seqs_ptr; n = 0; m = *m_; - while (kseq_read(ks) >= 0) { + while (kseq_read(ks) >= 0) { if (ks2 && kseq_read(ks2) < 0) { // the 2nd file has fewer reads fprintf(stderr, "[W::%s] the 2nd file has fewer sequences.\n", __func__); break; } - if (n >= m) { + PROF_START(parse); + if (n >= m) { m = m? m<<1 : 256; seqs = realloc(seqs, m * sizeof(bseq1_t)); memset(seqs + n, 0, (m-n) * sizeof(bseq1_t)); @@ -257,9 +264,11 @@ void bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_comment if (ks2) { READ_ONE_SEQ(ks2); } - if (size >= chunk_size && (n&1) == 0) break; + PROF_END(gprof[G_parse_seq], parse); + if (size >= chunk_size && (n&1) == 0) break; } - if (size == 0) { // test if the 2nd file is finished + + if (size == 0) { // test if the 2nd file is finished if (ks2 && kseq_read(ks2) >= 0) fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__); } diff --git a/fastmap.c b/fastmap.c index eaa3c07..bffa6b2 100644 --- a/fastmap.c +++ b/fastmap.c @@ -225,20 +225,24 @@ static void *thread_read(void *data) ktp_aux_t *aux = (ktp_aux_t *)data; while (1) { - POSSESS(input_have); - WAIT_FOR(input_have, NOT_TO_BE, 0); + PROF_START(w1); + POSSESS(input_have); + WAIT_FOR(input_have, NOT_TO_BE, 0); RELEASE(input_have); - // fprintf(stderr, "start read: %ld\n", aux->read_idx); + PROF_END(gprof[G_read_wait_1], w1); + // fprintf(stderr, "start read: %ld\n", aux->read_idx); if (read_data(aux, aux->data) == 0) { POSSESS(input_have); aux->read_complete = 1; TWIST(input_have, BY, -1); break; } - POSSESS(input_have); + PROF_START(w2); + POSSESS(input_have); aux->read_idx++; TWIST(input_have, BY, -1); - // fprintf(stderr, "next read: %ld\n", aux->read_idx); + PROF_END(gprof[G_read_wait_2], w2); + // fprintf(stderr, "next read: %ld\n", aux->read_idx); } return 0; } @@ -251,15 +255,17 @@ static void *thread_calc(void *data) while (1) { // fprintf(stderr, "start calc: %ld\n", aux->calc_idx); - POSSESS(input_have); + PROF_START(w1); + POSSESS(input_have); WAIT_FOR(input_have, NOT_TO_BE, 2); RELEASE(input_have); // 应该没必要持有吧? POSSESS(output_have); WAIT_FOR(output_have, NOT_TO_BE, 2); RELEASE(output_have); + PROF_END(gprof[G_comp_wait_1], w1); - if (aux->calc_idx < aux->read_idx) { + if (aux->calc_idx < aux->read_idx) { calc_data(aux, aux->data + d_idx); d_idx = !d_idx; add_idx = 1; @@ -272,13 +278,15 @@ static void *thread_calc(void *data) TWIST(output_have, BY, 1); // 最后要唤醒写线程 break; // 计算完了 } - POSSESS(output_have); + PROF_START(w2); + POSSESS(output_have); if (add_idx) aux->calc_idx ++; TWIST(output_have, BY, 1); POSSESS(input_have); TWIST(input_have, BY, 1); - // fprintf(stderr, "next calc: %ld\n", aux->calc_idx); + PROF_END(gprof[G_comp_wait_2], w2); + // fprintf(stderr, "next calc: %ld\n", aux->calc_idx); } return 0; } @@ -290,10 +298,12 @@ static void *thread_write(void *data) while (1) { // fprintf(stderr, "start write: %ld\n", aux->write_idx); - POSSESS(output_have); + PROF_START(w1); + POSSESS(output_have); WAIT_FOR(output_have, NOT_TO_BE, 0); RELEASE(output_have); - if (aux->write_idx < aux->calc_idx) { + PROF_END(gprof[G_write_wait_1], w1); + if (aux->write_idx < aux->calc_idx) { write_data(aux, aux->data + d_idx); d_idx = !d_idx; aux->write_idx++; @@ -303,9 +313,11 @@ static void *thread_write(void *data) write_data(aux, aux->data + d_idx); break; } - POSSESS(output_have); + PROF_START(w2); + POSSESS(output_have); TWIST(output_have, BY, -1); - // fprintf(stderr, "next write: %ld\n", aux->write_idx); + PROF_END(gprof[G_write_wait_2], w2); + // fprintf(stderr, "next write: %ld\n", aux->write_idx); } return 0; } @@ -617,8 +629,10 @@ int main_mem(int argc, char *argv[]) return 1; } fp = gzdopen(fd, "r"); - aux.ks = kseq_init(fp); - if (optind + 2 < argc) { + start_async_read(fp); + // stop_async_read(fp); + aux.ks = kseq_init(fp); + if (optind + 2 < argc) { if (opt->flag&MEM_F_PE) { if (bwa_verbose >= 2) fprintf(stderr, "[W::%s] when '-p' is in use, the second query file is ignored.\n", __func__); @@ -629,7 +643,8 @@ int main_mem(int argc, char *argv[]) return 1; } fp2 = gzdopen(fd2, "r"); - aux.ks2 = kseq_init(fp2); + start_async_read(fp2); + aux.ks2 = kseq_init(fp2); opt->flag |= MEM_F_PE; } } @@ -656,9 +671,11 @@ int main_mem(int argc, char *argv[]) // free(opt); // bwa_idx_destroy(aux.idx); // kseq_destroy(aux.ks); - err_gzclose(fp); kclose(ko); - if (aux.ks2) { + stop_async_read(fp); + err_gzclose(fp); kclose(ko); + if (aux.ks2) { // kseq_destroy(aux.ks2); + stop_async_read(fp2); err_gzclose(fp2); kclose(ko2); } PROF_END(gprof[G_ALL], all); diff --git a/ksw_extend2_avx2.c b/ksw_extend2_avx2.c index d1b6005..d5165d0 100644 --- a/ksw_extend2_avx2.c +++ b/ksw_extend2_avx2.c @@ -136,6 +136,7 @@ static const uint16_t h_vec_int_mask[SIMD_WIDTH][SIMD_WIDTH] = { m = MAX(m, maxVal[0]); /*用来解决与BSW结果不一样的第二种情况(上边界)*/ \ if (maxVal[0] > 0 && m >= max) { \ for(j=beg, i=iend; j<=end; j+=SIMD_WIDTH, i-=SIMD_WIDTH) { \ + /*calc_num += 16;*/ \ __m256i h2_vec = _mm256_loadu_si256((__m256i*) (&hA2[j])); \ __m256i vcmp = _mm256_cmpeq_epi16(h2_vec, max_vec); \ uint32_t mask = _mm256_movemask_epi8(vcmp); \ @@ -202,7 +203,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 int *_max_off, // 取得最大得分时在query和reference上位置差的 最大值 buf_t *buf) // 之前已经开辟过的缓存 { - //return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off); +// return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off); #ifdef DEBUG_FILE_OUTPUT //fprintf(gf[0], "%d\n", qlen); @@ -371,8 +372,9 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 SIMD_CMP_SEQ; // 计算 SIMD_COMPUTE; - // 存储结果 - SIMD_STORE; + //calc_num += 16; + // 存储结果 + SIMD_STORE; } // 剩下的计算单元 if (j <= end) { @@ -382,7 +384,8 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 SIMD_CMP_SEQ; // 计算 SIMD_COMPUTE; - // 去除多余计算的部分 + //calc_num += 16; + // 去除多余计算的部分 SIMD_REMOVE_EXTRA; // 存储结果 SIMD_STORE; @@ -673,6 +676,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 } else h1 = 0; //m = h1; // 用来解决和VP-BSW结果不一样的第一种情况(左边界) for (j = beg; LIKELY(j < end); ++j) { + //calc_num++; #ifdef DEBUG_FILE_OUTPUT #ifdef COUNT_CALC_NUM diff --git a/ksw_extend2_avx2_u8.c b/ksw_extend2_avx2_u8.c index 2974254..a9f193a 100644 --- a/ksw_extend2_avx2_u8.c +++ b/ksw_extend2_avx2_u8.c @@ -310,8 +310,9 @@ int ksw_extend2_avx2_u8(int qlen, // query length 待匹配段碱基的query长 SIMD_CMP_SEQ; // 计算 SIMD_COMPUTE; - // 存储结果 - SIMD_STORE; + //calc_num += 32; + // 存储结果 + SIMD_STORE; } // 剩下的计算单元 if (j <= end) { @@ -321,7 +322,8 @@ int ksw_extend2_avx2_u8(int qlen, // query length 待匹配段碱基的query长 SIMD_CMP_SEQ; // 计算 SIMD_COMPUTE; - // 去除多余计算的部分 + //calc_num += 32; + // 去除多余计算的部分 SIMD_REMOVE_EXTRA; // 存储结果 SIMD_STORE; diff --git a/profiling.c b/profiling.c index f77a238..9238e4b 100644 --- a/profiling.c +++ b/profiling.c @@ -11,6 +11,8 @@ Date : 2024/04/06 #include "utils.h" #include "profiling.h" +uint64_t calc_num = 0; + #ifdef SHOW_PERF uint64_t tprof[LIM_THREAD_PROF_TYPE][LIM_THREAD] = {0}; uint64_t proc_freq = 1000; @@ -104,6 +106,18 @@ int display_stats(int nthreads) fprintf(stderr, "time_ksw_loop: %0.2lf s\n", gprof[G_KSW_LOOP] * 1.0 / proc_freq); fprintf(stderr, "time_ksw_end_loop: %0.2lf s\n", gprof[G_KSW_END_LOOP] * 1.0 / proc_freq); + fprintf(stderr, "parse seq: %0.2lf s\n", gprof[G_parse_seq] * 1.0 / proc_freq); + fprintf(stderr, "read seq: %0.2lf s\n", gprof[G_read_seq] * 1.0 / proc_freq); + +// fprintf(stderr, "read_wait_1: %0.2lf s\n", gprof[G_read_wait_1] * 1.0 / proc_freq); +// fprintf(stderr, "read_wait_2: %0.2lf s\n", gprof[G_read_wait_2] * 1.0 / proc_freq); +// fprintf(stderr, "comp_wait_1: %0.2lf s\n", gprof[G_comp_wait_1] * 1.0 / proc_freq); +// fprintf(stderr, "comp_wait_2: %0.2lf s\n", gprof[G_comp_wait_2] * 1.0 / proc_freq); +// fprintf(stderr, "write_wait_1: %0.2lf s\n", gprof[G_write_wait_1] * 1.0 / proc_freq); +// fprintf(stderr, "write_wait_2: %0.2lf s\n", gprof[G_write_wait_2] * 1.0 / proc_freq); + + fprintf(stderr, "real_cal num: %ld\n", calc_num); + fprintf(stderr, "\n"); #endif diff --git a/profiling.h b/profiling.h index 1ab4fcc..8121f52 100644 --- a/profiling.h +++ b/profiling.h @@ -43,11 +43,10 @@ extern int64_t gdat[LIM_GLOBAL_DATA_TYPE]; #define PROF_END(result, tmp_time) #endif - +extern uint64_t calc_num; // GLOBAL -enum -{ +enum { G_ALL = 0, G_PIPELINE, G_READ, @@ -60,7 +59,15 @@ enum G_MEM_PESTAT, G_MEM_SAM, G_KSW_LOOP, - G_KSW_END_LOOP + G_KSW_END_LOOP, + G_parse_seq, + G_read_seq, + G_read_wait_1, + G_read_wait_2, + G_comp_wait_1, + G_comp_wait_2, + G_write_wait_1, + G_write_wait_2 }; // THREAD @@ -84,7 +91,10 @@ enum T_SAM_REG2ALN, T_SEED_3_1, T_SEED_3_2, - T_SEED_3_3 + T_SEED_3_3, + T_SEEDING, + T_EXTENSION, + T_SAM, }; int display_stats(int); diff --git a/run.sh b/run.sh index e850b9d..3486691 100755 --- a/run.sh +++ b/run.sh @@ -1,53 +1,40 @@ -thread=64 +thread=128 -## d1 k18<=4 89% -#n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq -#n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq -#n_r1=~/data/fastq/dataset/na12878_wes_144/1w_1.fq -#n_r2=~/data/fastq/dataset/na12878_wes_144/1w_2.fq -#n_r1=~/data/fastq/dataset/na12878_wes_144/ss_1.fq -#n_r2=~/data/fastq/dataset/na12878_wes_144/ss_2.fq -#n_r1=~/data/fastq/dataset/na12878_wes_144/45m_r1.fq -#n_r2=~/data/fastq/dataset/na12878_wes_144/45m_r2.fq -#n_r1=~/data/fastq/dataset/na12878_wes_144/45mr1.fq.gz -#n_r2=~/data/fastq/dataset/na12878_wes_144/45mr2.fq.gz -## d2 <= 4 87% -#n_r1=~/data/fastq/dataset/na12878_wgs_101/na12878_r1.fq -#n_r2=~/data/fastq/dataset/na12878_wgs_101/na12878_r2.fq -#n_r1=~/data/fastq/dataset/na12878_wgs_101/s_1.fq -#n_r2=~/data/fastq/dataset/na12878_wgs_101/s_2.fq -#n_r1=~/data/fastq/dataset/na12878_wgs_101/45m_r1.fq -#n_r2=~/data/fastq/dataset/na12878_wgs_101/45m_r2.fq -# d3 <= 4 77% -#n_r1=~/data/fastq/dataset/na12878_wgs_150/s_1.fq -#n_r2=~/data/fastq/dataset/na12878_wgs_150/s_2.fq -#n_r1=~/data/fastq/dataset/na12878_wgs_150/45mr1.fq.gz -#n_r2=~/data/fastq/dataset/na12878_wgs_150/45mr2.fq.gz -## d4 <= 4 93% -#n_r1=~/data/fastq/dataset/zy_wes/s_1.fq -#n_r2=~/data/fastq/dataset/zy_wes/s_2.fq -#n_r1=~/data/fastq/dataset/zy_wes/45mr1.fq.gz -#n_r2=~/data/fastq/dataset/zy_wes/45mr2.fq.gz -## d5 <= 4 80% -#n_r1=~/data/fastq/dataset/zy_wgs/45mr1.fq.gz -#n_r2=~/data/fastq/dataset/zy_wgs/45mr2.fq.gz -#n_r1=~/data/fastq/dataset/zy_wgs/s_1.fq -#n_r2=~/data/fastq/dataset/zy_wgs/s_2.fq -n_r1=~/data1/fastq/dataset/zy_wgs/E150010395_L01_690_1.fq -n_r2=~/data1/fastq/dataset/zy_wgs/E150010395_L01_690_2.fq +n1=~/data/na12878-1.fq.gz +n2=~/data/na12878-2.fq.gz -reference=~/data1/fmt_ref/human_g1k_v37_decoy.fasta -#reference=~/reference/bwa/human_g1k_v37_decoy.fasta -#reference=~/data/reference/human_g1k_v37_decoy.fasta -#out=~/data1/out-u8-1.sam -#out=~/data1/out-i16.sam -#out=~/data1/fmt-out.sam -out=~/data/fastbwa.ert.sam +#n1=~/data/dataset/real/D1/1w1.fq +#n2=~/data/dataset/real/D1/1w2.fq + +#n1=~/data/dataset/real/D1/n1.fq.gz +#n2=~/data/dataset/real/D1/n2.fq.gz + +#n1=~/data/dataset/real/D2/n1.fq.gz +#n2=~/data/dataset/real/D2/n2.fq.gz + +#n1=~/data/dataset/real/D3/n1.fq.gz +#n2=~/data/dataset/real/D3/n2.fq.gz + +#n1=~/data/dataset/real/D4/n1.fq.gz +#n2=~/data/dataset/real/D4/n2.fq.gz + +#n1=~/data/dataset/real/D5/n1.fq.gz +#n2=~/data/dataset/real/D5/n2.fq.gz + + +reference=~/data/fmt_ref/human_g1k_v37_decoy.fasta + +out=./out.sam #out=/dev/null +sudo /home/zzh/clean_mem.sh +rm $out time ./fastbwa mem -t $thread -M -R @RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa \ $reference \ - $n_r1 \ - $n_r2 \ - -o $out -2 -Z + $n1 \ + $n2 \ + -o $out -2 #-Z + +#-K 2560000000 \ +#-K 2100000000 diff --git a/utils.c b/utils.c index fffd2a7..4a1295a 100644 --- a/utils.c +++ b/utils.c @@ -37,11 +37,15 @@ #include #include #endif +#include #include #include -#include "utils.h" #include "ksort.h" +#include "kvec.h" +#include "utils.h" +#include "yarn.h" +#include "khash.h" #define pair64_lt(a, b) ((a).x < (b).x || ((a).x == (b).x && (a).y < (b).y)) KSORT_INIT(128, pair64_t, pair64_lt) KSORT_INIT(64, uint64_t, ks_lt_generic) @@ -155,11 +159,54 @@ uint64_t fread_fix(FILE *fp, uint64_t size, void *a) return offset; } +typedef struct { + pthread_t tid; + void *buf[2]; + volatile int readSize[2]; + uint64_t getIdx; + uint64_t putIdx; + volatile int finish; + lock_t *mtx; +} FileKV; + +KHASH_MAP_INIT_INT64(fkv, FileKV); +static khash_t(fkv) *fHash = 0; + +#define USE_ASYNC_READ + int err_gzread(gzFile file, void *ptr, unsigned int len) { - int ret = gzread(file, ptr, len); + int ret = 0; + PROF_START(read); +#ifndef USE_ASYNC_READ + ret = gzread(file, ptr, len); +#else + khiter_t k = kh_get(fkv, fHash, (int64_t)file); + FileKV *val = &kh_value(fHash, k); + POSSESS(val->mtx); + WAIT_FOR(val->mtx, NOT_TO_BE, 0); // 等待有数据 + RELEASE(val->mtx); - if (ret < 0) + int curIdx = val->getIdx % 2; + if (val->finish) { + if (val->getIdx < val->putIdx) { + ret = val->readSize[curIdx]; + if (ret > 0) memcpy(ptr, val->buf[curIdx], ret); + ++val->getIdx; + return ret; + } + return 0; + } + ret = val->readSize[curIdx]; + memcpy(ptr, val->buf[curIdx], ret); + + POSSESS(val->mtx); + ++val->getIdx; + TWIST(val->mtx, BY, -1); +#endif + PROF_END(gprof[G_read_seq], read); + + if (ret < 0) { int errnum = 0; const char *msg = gzerror(file, &errnum); @@ -169,6 +216,67 @@ int err_gzread(gzFile file, void *ptr, unsigned int len) return ret; } +static int64_t kBufSize = 16777216; + +static void *async_gzread(void *data) { + gzFile file = (gzFile)data; + khiter_t k = kh_get(fkv, fHash, (int64_t)file); + FileKV *val = &kh_value(fHash, k); + + int ret = 0; + while (1) { + POSSESS(val->mtx); + WAIT_FOR(val->mtx, NOT_TO_BE, 2); // 等待有数据 + RELEASE(val->mtx); + + int curIdx = val->putIdx % 2; + ret = gzread(file, val->buf[curIdx], kBufSize); + val->readSize[curIdx] = ret; + + if (ret <= 0) { + POSSESS(val->mtx); + val->finish = 1; + TWIST(val->mtx, BY, 1); + break; + } + + POSSESS(val->mtx); + val->putIdx += 1; + TWIST(val->mtx, BY, 1); + } + + return NULL; +} + +int start_async_read(gzFile file) { + int ret = 0; +#ifdef USE_ASYNC_READ + if (fHash == 0) { + fHash = kh_init(fkv); + } + khiter_t k = kh_put(fkv, fHash, (int64_t)file, &ret); + kh_key(fHash, k) = (int64_t)file; + FileKV *fv = &kh_value(fHash, k); + + fv->mtx = NEW_LOCK(0); + fv->getIdx = fv->putIdx = fv->finish = 0; + fv->readSize[0] = fv->readSize[1] = 0; + fv->buf[0] = malloc(kBufSize); + fv->buf[1] = malloc(kBufSize); + ret = pthread_create(&fv->tid, 0, async_gzread, file); +#endif + return ret; +} + +int stop_async_read(gzFile file) { +#ifdef USE_ASYNC_READ + khiter_t k = kh_get(fkv, fHash, (int64_t)file); + FileKV *val = &kh_value(fHash, k); + pthread_join(val->tid, 0); +#endif + return 0; +} + int err_fseek(FILE *stream, long offset, int whence) { int ret = fseek(stream, offset, whence); diff --git a/utils.h b/utils.h index 332a7d6..ddf81b9 100644 --- a/utils.h +++ b/utils.h @@ -135,6 +135,9 @@ extern "C" { int memcpy_bwamem(void *dest, size_t dmax, const void *src, size_t smax, char *file_name, int line_num); + int start_async_read(gzFile file); + int stop_async_read(gzFile file); + #ifdef __cplusplus } #endif