在gz_read部分加了异步读取

This commit is contained in:
zzh 2025-01-13 20:24:20 +08:00
parent b50360bb48
commit 40544d02fe
11 changed files with 248 additions and 90 deletions

View File

@ -42,6 +42,10 @@
"ertindex.h": "c", "ertindex.h": "c",
"ertseeding.h": "c", "ertseeding.h": "c",
"algorithm": "c", "algorithm": "c",
"filesystem": "c" "filesystem": "c",
"deque": "cpp",
"string": "cpp",
"unordered_map": "cpp",
"vector": "cpp"
} }
} }

View File

@ -1,9 +1,9 @@
CC= gcc CC= gcc
CFLAGS= -g -Wall -Wno-unused-function -mavx2 #-O2 CFLAGS= -g -Wall -Wno-unused-function -mavx2 -O2
WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS
SHOW_PERF= -DSHOW_PERF SHOW_PERF= -DSHOW_PERF
SHOW_DATA_PERF= -DSHOW_DATA_PERF SHOW_DATA_PERF= #-DSHOW_DATA_PERF
FILTER_FULL_MATCH= #-DFILTER_FULL_MATCH FILTER_FULL_MATCH= #-DFILTER_FULL_MATCH
USE_MT_READ= -DUSE_MT_READ USE_MT_READ= -DUSE_MT_READ

25
bwa.c
View File

@ -108,14 +108,20 @@ static void *thread_bseq_read(void *data) {
int cur_n = 0, cur_pos = d->start_pos, size = 0; int cur_n = 0, cur_pos = d->start_pos, size = 0;
int ret_status = 1; int ret_status = 1;
while (cur_n < d->n_bound && (ret_status = kseq_read(ks)) >= 0) { //pthread_t thread_id = pthread_self();
trim_readno(&ks->name); //fprintf(stderr, "Thread ID: %lu\n", thread_id);
kseq2bseq1(ks, seqs + cur_pos, copy_comment); PROF_START(parse);
while (cur_n < d->n_bound && (ret_status = kseq_read(ks)) >= 0) {
trim_readno(&ks->name);
kseq2bseq1(ks, seqs + cur_pos, copy_comment);
seqs[cur_pos].id = cur_pos; seqs[cur_pos].id = cur_pos;
size += seqs[cur_pos].l_seq; size += seqs[cur_pos].l_seq;
cur_pos += 2; cur_n += 1; cur_pos += 2; cur_n += 1;
if (size >= chunk_size) break;
if (size >= chunk_size) break;
} }
PROF_END(gprof[G_parse_seq], parse);
d->ret_n = cur_n; d->ret_size = size; d->ret_status = ret_status; d->ret_n = cur_n; d->ret_size = size; d->ret_status = ret_status;
return 0; return 0;
} }
@ -243,12 +249,13 @@ void bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_comment
int size = 0, m, n; int size = 0, m, n;
bseq1_t *seqs = *seqs_ptr; bseq1_t *seqs = *seqs_ptr;
n = 0; m = *m_; n = 0; m = *m_;
while (kseq_read(ks) >= 0) { while (kseq_read(ks) >= 0) {
if (ks2 && kseq_read(ks2) < 0) { // the 2nd file has fewer reads if (ks2 && kseq_read(ks2) < 0) { // the 2nd file has fewer reads
fprintf(stderr, "[W::%s] the 2nd file has fewer sequences.\n", __func__); fprintf(stderr, "[W::%s] the 2nd file has fewer sequences.\n", __func__);
break; break;
} }
if (n >= m) { PROF_START(parse);
if (n >= m) {
m = m? m<<1 : 256; m = m? m<<1 : 256;
seqs = realloc(seqs, m * sizeof(bseq1_t)); seqs = realloc(seqs, m * sizeof(bseq1_t));
memset(seqs + n, 0, (m-n) * sizeof(bseq1_t)); memset(seqs + n, 0, (m-n) * sizeof(bseq1_t));
@ -257,9 +264,11 @@ void bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_comment
if (ks2) { if (ks2) {
READ_ONE_SEQ(ks2); READ_ONE_SEQ(ks2);
} }
if (size >= chunk_size && (n&1) == 0) break; PROF_END(gprof[G_parse_seq], parse);
if (size >= chunk_size && (n&1) == 0) break;
} }
if (size == 0) { // test if the 2nd file is finished
if (size == 0) { // test if the 2nd file is finished
if (ks2 && kseq_read(ks2) >= 0) if (ks2 && kseq_read(ks2) >= 0)
fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__); fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__);
} }

View File

@ -225,20 +225,24 @@ static void *thread_read(void *data)
ktp_aux_t *aux = (ktp_aux_t *)data; ktp_aux_t *aux = (ktp_aux_t *)data;
while (1) while (1)
{ {
POSSESS(input_have); PROF_START(w1);
WAIT_FOR(input_have, NOT_TO_BE, 0); POSSESS(input_have);
WAIT_FOR(input_have, NOT_TO_BE, 0);
RELEASE(input_have); RELEASE(input_have);
// fprintf(stderr, "start read: %ld\n", aux->read_idx); PROF_END(gprof[G_read_wait_1], w1);
// fprintf(stderr, "start read: %ld\n", aux->read_idx);
if (read_data(aux, aux->data) == 0) { if (read_data(aux, aux->data) == 0) {
POSSESS(input_have); POSSESS(input_have);
aux->read_complete = 1; aux->read_complete = 1;
TWIST(input_have, BY, -1); TWIST(input_have, BY, -1);
break; break;
} }
POSSESS(input_have); PROF_START(w2);
POSSESS(input_have);
aux->read_idx++; aux->read_idx++;
TWIST(input_have, BY, -1); TWIST(input_have, BY, -1);
// fprintf(stderr, "next read: %ld\n", aux->read_idx); PROF_END(gprof[G_read_wait_2], w2);
// fprintf(stderr, "next read: %ld\n", aux->read_idx);
} }
return 0; return 0;
} }
@ -251,15 +255,17 @@ static void *thread_calc(void *data)
while (1) while (1)
{ {
// fprintf(stderr, "start calc: %ld\n", aux->calc_idx); // fprintf(stderr, "start calc: %ld\n", aux->calc_idx);
POSSESS(input_have); PROF_START(w1);
POSSESS(input_have);
WAIT_FOR(input_have, NOT_TO_BE, 2); WAIT_FOR(input_have, NOT_TO_BE, 2);
RELEASE(input_have); // 应该没必要持有吧? RELEASE(input_have); // 应该没必要持有吧?
POSSESS(output_have); POSSESS(output_have);
WAIT_FOR(output_have, NOT_TO_BE, 2); WAIT_FOR(output_have, NOT_TO_BE, 2);
RELEASE(output_have); RELEASE(output_have);
PROF_END(gprof[G_comp_wait_1], w1);
if (aux->calc_idx < aux->read_idx) { if (aux->calc_idx < aux->read_idx) {
calc_data(aux, aux->data + d_idx); calc_data(aux, aux->data + d_idx);
d_idx = !d_idx; d_idx = !d_idx;
add_idx = 1; add_idx = 1;
@ -272,13 +278,15 @@ static void *thread_calc(void *data)
TWIST(output_have, BY, 1); // 最后要唤醒写线程 TWIST(output_have, BY, 1); // 最后要唤醒写线程
break; // 计算完了 break; // 计算完了
} }
POSSESS(output_have); PROF_START(w2);
POSSESS(output_have);
if (add_idx) aux->calc_idx ++; if (add_idx) aux->calc_idx ++;
TWIST(output_have, BY, 1); TWIST(output_have, BY, 1);
POSSESS(input_have); POSSESS(input_have);
TWIST(input_have, BY, 1); TWIST(input_have, BY, 1);
// fprintf(stderr, "next calc: %ld\n", aux->calc_idx); PROF_END(gprof[G_comp_wait_2], w2);
// fprintf(stderr, "next calc: %ld\n", aux->calc_idx);
} }
return 0; return 0;
} }
@ -290,10 +298,12 @@ static void *thread_write(void *data)
while (1) while (1)
{ {
// fprintf(stderr, "start write: %ld\n", aux->write_idx); // fprintf(stderr, "start write: %ld\n", aux->write_idx);
POSSESS(output_have); PROF_START(w1);
POSSESS(output_have);
WAIT_FOR(output_have, NOT_TO_BE, 0); WAIT_FOR(output_have, NOT_TO_BE, 0);
RELEASE(output_have); RELEASE(output_have);
if (aux->write_idx < aux->calc_idx) { PROF_END(gprof[G_write_wait_1], w1);
if (aux->write_idx < aux->calc_idx) {
write_data(aux, aux->data + d_idx); write_data(aux, aux->data + d_idx);
d_idx = !d_idx; d_idx = !d_idx;
aux->write_idx++; aux->write_idx++;
@ -303,9 +313,11 @@ static void *thread_write(void *data)
write_data(aux, aux->data + d_idx); write_data(aux, aux->data + d_idx);
break; break;
} }
POSSESS(output_have); PROF_START(w2);
POSSESS(output_have);
TWIST(output_have, BY, -1); TWIST(output_have, BY, -1);
// fprintf(stderr, "next write: %ld\n", aux->write_idx); PROF_END(gprof[G_write_wait_2], w2);
// fprintf(stderr, "next write: %ld\n", aux->write_idx);
} }
return 0; return 0;
} }
@ -617,8 +629,10 @@ int main_mem(int argc, char *argv[])
return 1; return 1;
} }
fp = gzdopen(fd, "r"); fp = gzdopen(fd, "r");
aux.ks = kseq_init(fp); start_async_read(fp);
if (optind + 2 < argc) { // stop_async_read(fp);
aux.ks = kseq_init(fp);
if (optind + 2 < argc) {
if (opt->flag&MEM_F_PE) { if (opt->flag&MEM_F_PE) {
if (bwa_verbose >= 2) if (bwa_verbose >= 2)
fprintf(stderr, "[W::%s] when '-p' is in use, the second query file is ignored.\n", __func__); fprintf(stderr, "[W::%s] when '-p' is in use, the second query file is ignored.\n", __func__);
@ -629,7 +643,8 @@ int main_mem(int argc, char *argv[])
return 1; return 1;
} }
fp2 = gzdopen(fd2, "r"); fp2 = gzdopen(fd2, "r");
aux.ks2 = kseq_init(fp2); start_async_read(fp2);
aux.ks2 = kseq_init(fp2);
opt->flag |= MEM_F_PE; opt->flag |= MEM_F_PE;
} }
} }
@ -656,9 +671,11 @@ int main_mem(int argc, char *argv[])
// free(opt); // free(opt);
// bwa_idx_destroy(aux.idx); // bwa_idx_destroy(aux.idx);
// kseq_destroy(aux.ks); // kseq_destroy(aux.ks);
err_gzclose(fp); kclose(ko); stop_async_read(fp);
if (aux.ks2) { err_gzclose(fp); kclose(ko);
if (aux.ks2) {
// kseq_destroy(aux.ks2); // kseq_destroy(aux.ks2);
stop_async_read(fp2);
err_gzclose(fp2); kclose(ko2); err_gzclose(fp2); kclose(ko2);
} }
PROF_END(gprof[G_ALL], all); PROF_END(gprof[G_ALL], all);

View File

@ -136,6 +136,7 @@ static const uint16_t h_vec_int_mask[SIMD_WIDTH][SIMD_WIDTH] = {
m = MAX(m, maxVal[0]); /*用来解决与BSW结果不一样的第二种情况(上边界)*/ \ m = MAX(m, maxVal[0]); /*用来解决与BSW结果不一样的第二种情况(上边界)*/ \
if (maxVal[0] > 0 && m >= max) { \ if (maxVal[0] > 0 && m >= max) { \
for(j=beg, i=iend; j<=end; j+=SIMD_WIDTH, i-=SIMD_WIDTH) { \ for(j=beg, i=iend; j<=end; j+=SIMD_WIDTH, i-=SIMD_WIDTH) { \
/*calc_num += 16;*/ \
__m256i h2_vec = _mm256_loadu_si256((__m256i*) (&hA2[j])); \ __m256i h2_vec = _mm256_loadu_si256((__m256i*) (&hA2[j])); \
__m256i vcmp = _mm256_cmpeq_epi16(h2_vec, max_vec); \ __m256i vcmp = _mm256_cmpeq_epi16(h2_vec, max_vec); \
uint32_t mask = _mm256_movemask_epi8(vcmp); \ uint32_t mask = _mm256_movemask_epi8(vcmp); \
@ -202,7 +203,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
int *_max_off, // 取得最大得分时在query和reference上位置差的 最大值 int *_max_off, // 取得最大得分时在query和reference上位置差的 最大值
buf_t *buf) // 之前已经开辟过的缓存 buf_t *buf) // 之前已经开辟过的缓存
{ {
//return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off); // return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off);
#ifdef DEBUG_FILE_OUTPUT #ifdef DEBUG_FILE_OUTPUT
//fprintf(gf[0], "%d\n", qlen); //fprintf(gf[0], "%d\n", qlen);
@ -371,8 +372,9 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
SIMD_CMP_SEQ; SIMD_CMP_SEQ;
// 计算 // 计算
SIMD_COMPUTE; SIMD_COMPUTE;
// 存储结果 //calc_num += 16;
SIMD_STORE; // 存储结果
SIMD_STORE;
} }
// 剩下的计算单元 // 剩下的计算单元
if (j <= end) { if (j <= end) {
@ -382,7 +384,8 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度
SIMD_CMP_SEQ; SIMD_CMP_SEQ;
// 计算 // 计算
SIMD_COMPUTE; SIMD_COMPUTE;
// 去除多余计算的部分 //calc_num += 16;
// 去除多余计算的部分
SIMD_REMOVE_EXTRA; SIMD_REMOVE_EXTRA;
// 存储结果 // 存储结果
SIMD_STORE; SIMD_STORE;
@ -673,6 +676,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长
} else h1 = 0; } else h1 = 0;
//m = h1; // 用来解决和VP-BSW结果不一样的第一种情况(左边界) //m = h1; // 用来解决和VP-BSW结果不一样的第一种情况(左边界)
for (j = beg; LIKELY(j < end); ++j) { for (j = beg; LIKELY(j < end); ++j) {
//calc_num++;
#ifdef DEBUG_FILE_OUTPUT #ifdef DEBUG_FILE_OUTPUT
#ifdef COUNT_CALC_NUM #ifdef COUNT_CALC_NUM

View File

@ -310,8 +310,9 @@ int ksw_extend2_avx2_u8(int qlen, // query length 待匹配段碱基的query长
SIMD_CMP_SEQ; SIMD_CMP_SEQ;
// 计算 // 计算
SIMD_COMPUTE; SIMD_COMPUTE;
// 存储结果 //calc_num += 32;
SIMD_STORE; // 存储结果
SIMD_STORE;
} }
// 剩下的计算单元 // 剩下的计算单元
if (j <= end) { if (j <= end) {
@ -321,7 +322,8 @@ int ksw_extend2_avx2_u8(int qlen, // query length 待匹配段碱基的query长
SIMD_CMP_SEQ; SIMD_CMP_SEQ;
// 计算 // 计算
SIMD_COMPUTE; SIMD_COMPUTE;
// 去除多余计算的部分 //calc_num += 32;
// 去除多余计算的部分
SIMD_REMOVE_EXTRA; SIMD_REMOVE_EXTRA;
// 存储结果 // 存储结果
SIMD_STORE; SIMD_STORE;

View File

@ -11,6 +11,8 @@ Date : 2024/04/06
#include "utils.h" #include "utils.h"
#include "profiling.h" #include "profiling.h"
uint64_t calc_num = 0;
#ifdef SHOW_PERF #ifdef SHOW_PERF
uint64_t tprof[LIM_THREAD_PROF_TYPE][LIM_THREAD] = {0}; uint64_t tprof[LIM_THREAD_PROF_TYPE][LIM_THREAD] = {0};
uint64_t proc_freq = 1000; uint64_t proc_freq = 1000;
@ -104,6 +106,18 @@ int display_stats(int nthreads)
fprintf(stderr, "time_ksw_loop: %0.2lf s\n", gprof[G_KSW_LOOP] * 1.0 / proc_freq); fprintf(stderr, "time_ksw_loop: %0.2lf s\n", gprof[G_KSW_LOOP] * 1.0 / proc_freq);
fprintf(stderr, "time_ksw_end_loop: %0.2lf s\n", gprof[G_KSW_END_LOOP] * 1.0 / proc_freq); fprintf(stderr, "time_ksw_end_loop: %0.2lf s\n", gprof[G_KSW_END_LOOP] * 1.0 / proc_freq);
fprintf(stderr, "parse seq: %0.2lf s\n", gprof[G_parse_seq] * 1.0 / proc_freq);
fprintf(stderr, "read seq: %0.2lf s\n", gprof[G_read_seq] * 1.0 / proc_freq);
// fprintf(stderr, "read_wait_1: %0.2lf s\n", gprof[G_read_wait_1] * 1.0 / proc_freq);
// fprintf(stderr, "read_wait_2: %0.2lf s\n", gprof[G_read_wait_2] * 1.0 / proc_freq);
// fprintf(stderr, "comp_wait_1: %0.2lf s\n", gprof[G_comp_wait_1] * 1.0 / proc_freq);
// fprintf(stderr, "comp_wait_2: %0.2lf s\n", gprof[G_comp_wait_2] * 1.0 / proc_freq);
// fprintf(stderr, "write_wait_1: %0.2lf s\n", gprof[G_write_wait_1] * 1.0 / proc_freq);
// fprintf(stderr, "write_wait_2: %0.2lf s\n", gprof[G_write_wait_2] * 1.0 / proc_freq);
fprintf(stderr, "real_cal num: %ld\n", calc_num);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
#endif #endif

View File

@ -43,11 +43,10 @@ extern int64_t gdat[LIM_GLOBAL_DATA_TYPE];
#define PROF_END(result, tmp_time) #define PROF_END(result, tmp_time)
#endif #endif
extern uint64_t calc_num;
// GLOBAL // GLOBAL
enum enum {
{
G_ALL = 0, G_ALL = 0,
G_PIPELINE, G_PIPELINE,
G_READ, G_READ,
@ -60,7 +59,15 @@ enum
G_MEM_PESTAT, G_MEM_PESTAT,
G_MEM_SAM, G_MEM_SAM,
G_KSW_LOOP, G_KSW_LOOP,
G_KSW_END_LOOP G_KSW_END_LOOP,
G_parse_seq,
G_read_seq,
G_read_wait_1,
G_read_wait_2,
G_comp_wait_1,
G_comp_wait_2,
G_write_wait_1,
G_write_wait_2
}; };
// THREAD // THREAD
@ -84,7 +91,10 @@ enum
T_SAM_REG2ALN, T_SAM_REG2ALN,
T_SEED_3_1, T_SEED_3_1,
T_SEED_3_2, T_SEED_3_2,
T_SEED_3_3 T_SEED_3_3,
T_SEEDING,
T_EXTENSION,
T_SAM,
}; };
int display_stats(int); int display_stats(int);

79
run.sh
View File

@ -1,53 +1,40 @@
thread=64 thread=128
## d1 k18<=4 89% n1=~/data/na12878-1.fq.gz
#n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq n2=~/data/na12878-2.fq.gz
#n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq
#n_r1=~/data/fastq/dataset/na12878_wes_144/1w_1.fq
#n_r2=~/data/fastq/dataset/na12878_wes_144/1w_2.fq
#n_r1=~/data/fastq/dataset/na12878_wes_144/ss_1.fq
#n_r2=~/data/fastq/dataset/na12878_wes_144/ss_2.fq
#n_r1=~/data/fastq/dataset/na12878_wes_144/45m_r1.fq
#n_r2=~/data/fastq/dataset/na12878_wes_144/45m_r2.fq
#n_r1=~/data/fastq/dataset/na12878_wes_144/45mr1.fq.gz
#n_r2=~/data/fastq/dataset/na12878_wes_144/45mr2.fq.gz
## d2 <= 4 87%
#n_r1=~/data/fastq/dataset/na12878_wgs_101/na12878_r1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_101/na12878_r2.fq
#n_r1=~/data/fastq/dataset/na12878_wgs_101/s_1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_101/s_2.fq
#n_r1=~/data/fastq/dataset/na12878_wgs_101/45m_r1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_101/45m_r2.fq
# d3 <= 4 77%
#n_r1=~/data/fastq/dataset/na12878_wgs_150/s_1.fq
#n_r2=~/data/fastq/dataset/na12878_wgs_150/s_2.fq
#n_r1=~/data/fastq/dataset/na12878_wgs_150/45mr1.fq.gz
#n_r2=~/data/fastq/dataset/na12878_wgs_150/45mr2.fq.gz
## d4 <= 4 93%
#n_r1=~/data/fastq/dataset/zy_wes/s_1.fq
#n_r2=~/data/fastq/dataset/zy_wes/s_2.fq
#n_r1=~/data/fastq/dataset/zy_wes/45mr1.fq.gz
#n_r2=~/data/fastq/dataset/zy_wes/45mr2.fq.gz
## d5 <= 4 80%
#n_r1=~/data/fastq/dataset/zy_wgs/45mr1.fq.gz
#n_r2=~/data/fastq/dataset/zy_wgs/45mr2.fq.gz
#n_r1=~/data/fastq/dataset/zy_wgs/s_1.fq
#n_r2=~/data/fastq/dataset/zy_wgs/s_2.fq
n_r1=~/data1/fastq/dataset/zy_wgs/E150010395_L01_690_1.fq
n_r2=~/data1/fastq/dataset/zy_wgs/E150010395_L01_690_2.fq
reference=~/data1/fmt_ref/human_g1k_v37_decoy.fasta
#reference=~/reference/bwa/human_g1k_v37_decoy.fasta
#reference=~/data/reference/human_g1k_v37_decoy.fasta
#out=~/data1/out-u8-1.sam #n1=~/data/dataset/real/D1/1w1.fq
#out=~/data1/out-i16.sam #n2=~/data/dataset/real/D1/1w2.fq
#out=~/data1/fmt-out.sam
out=~/data/fastbwa.ert.sam #n1=~/data/dataset/real/D1/n1.fq.gz
#n2=~/data/dataset/real/D1/n2.fq.gz
#n1=~/data/dataset/real/D2/n1.fq.gz
#n2=~/data/dataset/real/D2/n2.fq.gz
#n1=~/data/dataset/real/D3/n1.fq.gz
#n2=~/data/dataset/real/D3/n2.fq.gz
#n1=~/data/dataset/real/D4/n1.fq.gz
#n2=~/data/dataset/real/D4/n2.fq.gz
#n1=~/data/dataset/real/D5/n1.fq.gz
#n2=~/data/dataset/real/D5/n2.fq.gz
reference=~/data/fmt_ref/human_g1k_v37_decoy.fasta
out=./out.sam
#out=/dev/null #out=/dev/null
sudo /home/zzh/clean_mem.sh
rm $out
time ./fastbwa mem -t $thread -M -R @RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa \ time ./fastbwa mem -t $thread -M -R @RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa \
$reference \ $reference \
$n_r1 \ $n1 \
$n_r2 \ $n2 \
-o $out -2 -Z -o $out -2 #-Z
#-K 2560000000 \
#-K 2100000000

114
utils.c
View File

@ -37,11 +37,15 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#endif #endif
#include <pthread.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/time.h> #include <sys/time.h>
#include "utils.h"
#include "ksort.h" #include "ksort.h"
#include "kvec.h"
#include "utils.h"
#include "yarn.h"
#include "khash.h"
#define pair64_lt(a, b) ((a).x < (b).x || ((a).x == (b).x && (a).y < (b).y)) #define pair64_lt(a, b) ((a).x < (b).x || ((a).x == (b).x && (a).y < (b).y))
KSORT_INIT(128, pair64_t, pair64_lt) KSORT_INIT(128, pair64_t, pair64_lt)
KSORT_INIT(64, uint64_t, ks_lt_generic) KSORT_INIT(64, uint64_t, ks_lt_generic)
@ -155,11 +159,54 @@ uint64_t fread_fix(FILE *fp, uint64_t size, void *a)
return offset; return offset;
} }
typedef struct {
pthread_t tid;
void *buf[2];
volatile int readSize[2];
uint64_t getIdx;
uint64_t putIdx;
volatile int finish;
lock_t *mtx;
} FileKV;
KHASH_MAP_INIT_INT64(fkv, FileKV);
static khash_t(fkv) *fHash = 0;
#define USE_ASYNC_READ
int err_gzread(gzFile file, void *ptr, unsigned int len) int err_gzread(gzFile file, void *ptr, unsigned int len)
{ {
int ret = gzread(file, ptr, len); int ret = 0;
PROF_START(read);
#ifndef USE_ASYNC_READ
ret = gzread(file, ptr, len);
#else
khiter_t k = kh_get(fkv, fHash, (int64_t)file);
FileKV *val = &kh_value(fHash, k);
POSSESS(val->mtx);
WAIT_FOR(val->mtx, NOT_TO_BE, 0); // 等待有数据
RELEASE(val->mtx);
if (ret < 0) int curIdx = val->getIdx % 2;
if (val->finish) {
if (val->getIdx < val->putIdx) {
ret = val->readSize[curIdx];
if (ret > 0) memcpy(ptr, val->buf[curIdx], ret);
++val->getIdx;
return ret;
}
return 0;
}
ret = val->readSize[curIdx];
memcpy(ptr, val->buf[curIdx], ret);
POSSESS(val->mtx);
++val->getIdx;
TWIST(val->mtx, BY, -1);
#endif
PROF_END(gprof[G_read_seq], read);
if (ret < 0)
{ {
int errnum = 0; int errnum = 0;
const char *msg = gzerror(file, &errnum); const char *msg = gzerror(file, &errnum);
@ -169,6 +216,67 @@ int err_gzread(gzFile file, void *ptr, unsigned int len)
return ret; return ret;
} }
static int64_t kBufSize = 16777216;
static void *async_gzread(void *data) {
gzFile file = (gzFile)data;
khiter_t k = kh_get(fkv, fHash, (int64_t)file);
FileKV *val = &kh_value(fHash, k);
int ret = 0;
while (1) {
POSSESS(val->mtx);
WAIT_FOR(val->mtx, NOT_TO_BE, 2); // 等待有数据
RELEASE(val->mtx);
int curIdx = val->putIdx % 2;
ret = gzread(file, val->buf[curIdx], kBufSize);
val->readSize[curIdx] = ret;
if (ret <= 0) {
POSSESS(val->mtx);
val->finish = 1;
TWIST(val->mtx, BY, 1);
break;
}
POSSESS(val->mtx);
val->putIdx += 1;
TWIST(val->mtx, BY, 1);
}
return NULL;
}
int start_async_read(gzFile file) {
int ret = 0;
#ifdef USE_ASYNC_READ
if (fHash == 0) {
fHash = kh_init(fkv);
}
khiter_t k = kh_put(fkv, fHash, (int64_t)file, &ret);
kh_key(fHash, k) = (int64_t)file;
FileKV *fv = &kh_value(fHash, k);
fv->mtx = NEW_LOCK(0);
fv->getIdx = fv->putIdx = fv->finish = 0;
fv->readSize[0] = fv->readSize[1] = 0;
fv->buf[0] = malloc(kBufSize);
fv->buf[1] = malloc(kBufSize);
ret = pthread_create(&fv->tid, 0, async_gzread, file);
#endif
return ret;
}
int stop_async_read(gzFile file) {
#ifdef USE_ASYNC_READ
khiter_t k = kh_get(fkv, fHash, (int64_t)file);
FileKV *val = &kh_value(fHash, k);
pthread_join(val->tid, 0);
#endif
return 0;
}
int err_fseek(FILE *stream, long offset, int whence) int err_fseek(FILE *stream, long offset, int whence)
{ {
int ret = fseek(stream, offset, whence); int ret = fseek(stream, offset, whence);

View File

@ -135,6 +135,9 @@ extern "C" {
int memcpy_bwamem(void *dest, size_t dmax, const void *src, size_t smax, char *file_name, int line_num); int memcpy_bwamem(void *dest, size_t dmax, const void *src, size_t smax, char *file_name, int line_num);
int start_async_read(gzFile file);
int stop_async_read(gzFile file);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif