From 55f1b3653492c53bcbc423cb574cfa00e153d903 Mon Sep 17 00:00:00 2001 From: Rob Davies Date: Thu, 3 Jan 2013 16:57:37 +0000 Subject: [PATCH] New wrapper for gzclose; added err_fflush calls and made it call fsync too. Added a new utils.c wrapper err_gzclose and changed gzclose calls to use it. Put in some more err_fflush calls before files being written are closed. Made err_fflush call fsync. This is useful for remote filesystems where errors may not be reported on fflush or fclose as problems at the server end may only be detected after they have returned. If bwa is being used only to write to local filesystems, calling fsync is not really necessary. To disable it, comment out #define FSYNC_ON_FLUSH in utils.c. --- bamlite.h | 2 +- bntseq.c | 5 ++++- bwaseqio.c | 2 +- bwtindex.c | 6 +++--- bwtio.c | 2 ++ bwtmisc.c | 3 ++- bwtsw2_aux.c | 6 +++--- fastmap.c | 4 ++-- ksw.c | 8 ++++---- simple_dp.c | 4 ++-- utils.c | 39 +++++++++++++++++++++++++++++++++++++++ utils.h | 1 + 12 files changed, 64 insertions(+), 18 deletions(-) diff --git a/bamlite.h b/bamlite.h index 2b65c57..0c080fd 100644 --- a/bamlite.h +++ b/bamlite.h @@ -8,7 +8,7 @@ typedef gzFile bamFile; #define bam_open(fn, mode) xzopen(fn, mode) #define bam_dopen(fd, mode) gzdopen(fd, mode) -#define bam_close(fp) gzclose(fp) +#define bam_close(fp) err_gzclose(fp) #define bam_read(fp, buf, size) err_gzread(fp, buf, size) typedef struct { diff --git a/bntseq.c b/bntseq.c index b795af8..af83211 100644 --- a/bntseq.c +++ b/bntseq.c @@ -73,6 +73,7 @@ void bns_dump(const bntseq_t *bns, const char *prefix) else err_fprintf(fp, "\n"); err_fprintf(fp, "%lld %d %d\n", (long long)p->offset, p->len, p->n_ambs); } + err_fflush(fp); err_fclose(fp); } { // dump .amb @@ -83,6 +84,7 @@ void bns_dump(const bntseq_t *bns, const char *prefix) bntamb1_t *p = bns->ambs + i; err_fprintf(fp, "%lld %d %c\n", (long long)p->offset, p->len, p->amb); } + err_fflush(fp); err_fclose(fp); } } @@ -279,6 +281,7 @@ int64_t bns_fasta2bntseq(gzFile fp_fa, const char *prefix, int for_only) ct = bns->l_pac % 4; err_fwrite(&ct, 1, 1, fp); // close .pac file + err_fflush(fp); err_fclose(fp); } bns_dump(bns, prefix); @@ -303,7 +306,7 @@ int bwa_fa2pac(int argc, char *argv[]) } fp = xzopen(argv[optind], "r"); bns_fasta2bntseq(fp, (optind+1 < argc)? argv[optind+1] : argv[optind], for_only); - gzclose(fp); + err_gzclose(fp); return 0; } diff --git a/bwaseqio.c b/bwaseqio.c index 716f9b2..8d69b37 100644 --- a/bwaseqio.c +++ b/bwaseqio.c @@ -46,7 +46,7 @@ void bwa_seq_close(bwa_seqio_t *bs) if (bs == 0) return; if (bs->is_bam) bam_close(bs->fp); else { - gzclose(bs->ks->f->f); + err_gzclose(bs->ks->f->f); kseq_destroy(bs->ks); } free(bs); diff --git a/bwtindex.c b/bwtindex.c index f430e62..6d0604e 100644 --- a/bwtindex.c +++ b/bwtindex.c @@ -89,7 +89,7 @@ int bwa_index(int argc, char *argv[]) fprintf(stderr, "[bwa_index] Pack FASTA... "); l_pac = bns_fasta2bntseq(fp, prefix, 0); fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC); - gzclose(fp); + err_gzclose(fp); } else { // color indexing gzFile fp = xzopen(argv[optind], "r"); strcat(strcpy(str, prefix), ".nt"); @@ -97,7 +97,7 @@ int bwa_index(int argc, char *argv[]) fprintf(stderr, "[bwa_index] Pack nucleotide FASTA... "); l_pac = bns_fasta2bntseq(fp, str, 0); fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC); - gzclose(fp); + err_gzclose(fp); { char *tmp_argv[3]; tmp_argv[0] = argv[0]; tmp_argv[1] = str; tmp_argv[2] = prefix; @@ -139,7 +139,7 @@ int bwa_index(int argc, char *argv[]) fprintf(stderr, "[bwa_index] Pack forward-only FASTA... "); l_pac = bns_fasta2bntseq(fp, prefix, 1); fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC); - gzclose(fp); + err_gzclose(fp); } { bwt_t *bwt; diff --git a/bwtio.c b/bwtio.c index 0d4623e..ca5a6c0 100644 --- a/bwtio.c +++ b/bwtio.c @@ -11,6 +11,7 @@ void bwt_dump_bwt(const char *fn, const bwt_t *bwt) err_fwrite(&bwt->primary, sizeof(bwtint_t), 1, fp); err_fwrite(bwt->L2+1, sizeof(bwtint_t), 4, fp); err_fwrite(bwt->bwt, 4, bwt->bwt_size, fp); + err_fflush(fp); err_fclose(fp); } @@ -23,6 +24,7 @@ void bwt_dump_sa(const char *fn, const bwt_t *bwt) err_fwrite(&bwt->sa_intv, sizeof(bwtint_t), 1, fp); err_fwrite(&bwt->seq_len, sizeof(bwtint_t), 1, fp); err_fwrite(bwt->sa + 1, sizeof(bwtint_t), bwt->n_sa - 1, fp); + err_fflush(fp); err_fclose(fp); } diff --git a/bwtmisc.c b/bwtmisc.c index 49aa5aa..ccc82eb 100644 --- a/bwtmisc.c +++ b/bwtmisc.c @@ -201,7 +201,8 @@ int bwa_pac2cspac(int argc, char *argv[]) fp = xopen(str, "wb"); err_fwrite(cspac, 1, bns->l_pac/4 + 1, fp); ct = bns->l_pac % 4; - err_fwrite(&ct, 1, 1, fp); + err_fwrite(&ct, 1, 1, fp); + err_fflush(fp); err_fclose(fp); bns_destroy(bns); free(cspac); diff --git a/bwtsw2_aux.c b/bwtsw2_aux.c index ca39919..2228054 100644 --- a/bwtsw2_aux.c +++ b/bwtsw2_aux.c @@ -752,7 +752,7 @@ static void process_seqs(bsw2seq_t *_seq, const bsw2opt_t *opt, const bntseq_t * p->tid = -1; p->l = 0; p->name = p->seq = p->qual = p->sam = 0; } - fflush(stdout); + err_fflush(stdout); _seq->n = 0; } @@ -819,9 +819,9 @@ void bsw2_aln(const bsw2opt_t *opt, const bntseq_t *bns, bwt_t * const target, c free(pac); free(_seq->seq); free(_seq); kseq_destroy(ks); - gzclose(fp); + err_gzclose(fp); if (fn2) { kseq_destroy(ks2); - gzclose(fp2); + err_gzclose(fp2); } } diff --git a/fastmap.c b/fastmap.c index 7ef74a9..504e22e 100644 --- a/fastmap.c +++ b/fastmap.c @@ -76,7 +76,7 @@ int main_fastmap(int argc, char *argv[]) return 1; } - fp = gzopen(argv[optind + 1], "r"); + fp = xzopen(argv[optind + 1], "r"); seq = kseq_init(fp); { // load the packed sequences, BWT and SA char *tmp = xcalloc(strlen(argv[optind]) + 5, 1); @@ -123,6 +123,6 @@ int main_fastmap(int argc, char *argv[]) bns_destroy(bns); bwt_destroy(bwt); kseq_destroy(seq); - gzclose(fp); + err_gzclose(fp); return 0; } diff --git a/ksw.c b/ksw.c index 5d17a8f..270ecfb 100644 --- a/ksw.c +++ b/ksw.c @@ -364,8 +364,8 @@ int main(int argc, char *argv[]) } for (j = 0; j < 5; ++j) mat[k++] = 0; // open file - fpt = gzopen(argv[optind], "r"); kst = kseq_init(fpt); - fpq = gzopen(argv[optind+1], "r"); ksq = kseq_init(fpq); + fpt = xzopen(argv[optind], "r"); kst = kseq_init(fpt); + fpq = xzopen(argv[optind+1], "r"); ksq = kseq_init(fpq); // all-pair alignment while (kseq_read(ksq) > 0) { ksw_query_t *q[2]; @@ -394,8 +394,8 @@ int main(int argc, char *argv[]) } free(q[0]); free(q[1]); } - kseq_destroy(kst); gzclose(fpt); - kseq_destroy(ksq); gzclose(fpq); + kseq_destroy(kst); err_gzclose(fpt); + kseq_destroy(ksq); err_gzclose(fpq); return 0; } #endif // _KSW_MAIN diff --git a/simple_dp.c b/simple_dp.c index 267e77f..4c6a156 100644 --- a/simple_dp.c +++ b/simple_dp.c @@ -80,7 +80,7 @@ static seqs_t *load_seqs(const char *fn) p->n = xstrdup((const char*)seq->name.s); } kseq_destroy(seq); - gzclose(fp); + err_gzclose(fp); fprintf(stderr, "[load_seqs] %d sequences are loaded.\n", s->n_seqs); return s; } @@ -123,7 +123,7 @@ static void aln_seqs(const seqs_t *ss, const char *fn) } } kseq_destroy(seq); - gzclose(fp); + err_gzclose(fp); } int bwa_stdsw(int argc, char *argv[]) diff --git a/utils.c b/utils.c index bc39bf5..dc16308 100644 --- a/utils.c +++ b/utils.c @@ -24,6 +24,7 @@ */ /* Contact: Heng Li */ +#define FSYNC_ON_FLUSH #include #include @@ -31,6 +32,11 @@ #include #include #include +#ifdef FSYNC_ON_FLUSH +#include +#include +#include +#endif #include #include #include "utils.h" @@ -196,6 +202,28 @@ int err_fflush(FILE *stream) { _err_fatal_simple("fflush", strerror(errno)); } +#ifdef FSYNC_ON_FLUSH + /* Calling fflush() ensures that all the data has made it to the + kernel buffers, but this may not be sufficient for remote filesystems + (e.g. NFS, lustre) as an error may still occur while the kernel + is copying the buffered data to the file server. To be sure of + catching these errors, we need to call fsync() on the file + descriptor, but only if it is a regular file. */ + { + struct stat sbuf; + if (0 != fstat(fileno(stream), &sbuf)) + { + _err_fatal_simple("fstat", strerror(errno)); + } + if (S_ISREG(sbuf.st_mode)) + { + if (0 != fsync(fileno(stream))) + { + _err_fatal_simple("fsync", strerror(errno)); + } + } + } +#endif return ret; } @@ -209,6 +237,17 @@ int err_fclose(FILE *stream) return ret; } +int err_gzclose(gzFile file) +{ + int ret = gzclose(file); + if (Z_OK != ret) + { + _err_fatal_simple("gzclose", Z_ERRNO == ret ? strerror(errno) : zError(ret)); + } + + return ret; +} + void *err_calloc(size_t nmemb, size_t size, const char *file, unsigned int line, const char *func) { void *p = calloc(nmemb, size); diff --git a/utils.h b/utils.h index c6cfc81..f824245 100644 --- a/utils.h +++ b/utils.h @@ -78,6 +78,7 @@ extern "C" { ATTRIBUTE((format(printf, 1, 2))); int err_fflush(FILE *stream); int err_fclose(FILE *stream); + int err_gzclose(gzFile file); void *err_calloc(size_t nmemb, size_t size, const char *file, unsigned int line, const char *func); void *err_malloc(size_t size, const char *file, unsigned int line, const char *func);