New wrapper for gzclose; added err_fflush calls and made it call fsync too.

Added a new utils.c wrapper err_gzclose and changed gzclose calls to use it.

Put in some more err_fflush calls before files being written are closed.

Made err_fflush call fsync.  This is useful for remote filesystems where
errors may not be reported on fflush or fclose as problems at the server
end may only be detected after they have returned.  If bwa is being used
only to write to local filesystems, calling fsync is not really necessary.
To disable it, comment out #define FSYNC_ON_FLUSH in utils.c.
This commit is contained in:
Rob Davies 2013-01-03 16:57:37 +00:00
parent b081ac9b8b
commit 55f1b36534
12 changed files with 64 additions and 18 deletions

View File

@ -8,7 +8,7 @@
typedef gzFile bamFile;
#define bam_open(fn, mode) xzopen(fn, mode)
#define bam_dopen(fd, mode) gzdopen(fd, mode)
#define bam_close(fp) gzclose(fp)
#define bam_close(fp) err_gzclose(fp)
#define bam_read(fp, buf, size) err_gzread(fp, buf, size)
typedef struct {

View File

@ -73,6 +73,7 @@ void bns_dump(const bntseq_t *bns, const char *prefix)
else err_fprintf(fp, "\n");
err_fprintf(fp, "%lld %d %d\n", (long long)p->offset, p->len, p->n_ambs);
}
err_fflush(fp);
err_fclose(fp);
}
{ // dump .amb
@ -83,6 +84,7 @@ void bns_dump(const bntseq_t *bns, const char *prefix)
bntamb1_t *p = bns->ambs + i;
err_fprintf(fp, "%lld %d %c\n", (long long)p->offset, p->len, p->amb);
}
err_fflush(fp);
err_fclose(fp);
}
}
@ -279,6 +281,7 @@ int64_t bns_fasta2bntseq(gzFile fp_fa, const char *prefix, int for_only)
ct = bns->l_pac % 4;
err_fwrite(&ct, 1, 1, fp);
// close .pac file
err_fflush(fp);
err_fclose(fp);
}
bns_dump(bns, prefix);
@ -303,7 +306,7 @@ int bwa_fa2pac(int argc, char *argv[])
}
fp = xzopen(argv[optind], "r");
bns_fasta2bntseq(fp, (optind+1 < argc)? argv[optind+1] : argv[optind], for_only);
gzclose(fp);
err_gzclose(fp);
return 0;
}

View File

@ -46,7 +46,7 @@ void bwa_seq_close(bwa_seqio_t *bs)
if (bs == 0) return;
if (bs->is_bam) bam_close(bs->fp);
else {
gzclose(bs->ks->f->f);
err_gzclose(bs->ks->f->f);
kseq_destroy(bs->ks);
}
free(bs);

View File

@ -89,7 +89,7 @@ int bwa_index(int argc, char *argv[])
fprintf(stderr, "[bwa_index] Pack FASTA... ");
l_pac = bns_fasta2bntseq(fp, prefix, 0);
fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC);
gzclose(fp);
err_gzclose(fp);
} else { // color indexing
gzFile fp = xzopen(argv[optind], "r");
strcat(strcpy(str, prefix), ".nt");
@ -97,7 +97,7 @@ int bwa_index(int argc, char *argv[])
fprintf(stderr, "[bwa_index] Pack nucleotide FASTA... ");
l_pac = bns_fasta2bntseq(fp, str, 0);
fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC);
gzclose(fp);
err_gzclose(fp);
{
char *tmp_argv[3];
tmp_argv[0] = argv[0]; tmp_argv[1] = str; tmp_argv[2] = prefix;
@ -139,7 +139,7 @@ int bwa_index(int argc, char *argv[])
fprintf(stderr, "[bwa_index] Pack forward-only FASTA... ");
l_pac = bns_fasta2bntseq(fp, prefix, 1);
fprintf(stderr, "%.2f sec\n", (float)(clock() - t) / CLOCKS_PER_SEC);
gzclose(fp);
err_gzclose(fp);
}
{
bwt_t *bwt;

View File

@ -11,6 +11,7 @@ void bwt_dump_bwt(const char *fn, const bwt_t *bwt)
err_fwrite(&bwt->primary, sizeof(bwtint_t), 1, fp);
err_fwrite(bwt->L2+1, sizeof(bwtint_t), 4, fp);
err_fwrite(bwt->bwt, 4, bwt->bwt_size, fp);
err_fflush(fp);
err_fclose(fp);
}
@ -23,6 +24,7 @@ void bwt_dump_sa(const char *fn, const bwt_t *bwt)
err_fwrite(&bwt->sa_intv, sizeof(bwtint_t), 1, fp);
err_fwrite(&bwt->seq_len, sizeof(bwtint_t), 1, fp);
err_fwrite(bwt->sa + 1, sizeof(bwtint_t), bwt->n_sa - 1, fp);
err_fflush(fp);
err_fclose(fp);
}

View File

@ -201,7 +201,8 @@ int bwa_pac2cspac(int argc, char *argv[])
fp = xopen(str, "wb");
err_fwrite(cspac, 1, bns->l_pac/4 + 1, fp);
ct = bns->l_pac % 4;
err_fwrite(&ct, 1, 1, fp);
err_fwrite(&ct, 1, 1, fp);
err_fflush(fp);
err_fclose(fp);
bns_destroy(bns);
free(cspac);

View File

@ -752,7 +752,7 @@ static void process_seqs(bsw2seq_t *_seq, const bsw2opt_t *opt, const bntseq_t *
p->tid = -1; p->l = 0;
p->name = p->seq = p->qual = p->sam = 0;
}
fflush(stdout);
err_fflush(stdout);
_seq->n = 0;
}
@ -819,9 +819,9 @@ void bsw2_aln(const bsw2opt_t *opt, const bntseq_t *bns, bwt_t * const target, c
free(pac);
free(_seq->seq); free(_seq);
kseq_destroy(ks);
gzclose(fp);
err_gzclose(fp);
if (fn2) {
kseq_destroy(ks2);
gzclose(fp2);
err_gzclose(fp2);
}
}

View File

@ -76,7 +76,7 @@ int main_fastmap(int argc, char *argv[])
return 1;
}
fp = gzopen(argv[optind + 1], "r");
fp = xzopen(argv[optind + 1], "r");
seq = kseq_init(fp);
{ // load the packed sequences, BWT and SA
char *tmp = xcalloc(strlen(argv[optind]) + 5, 1);
@ -123,6 +123,6 @@ int main_fastmap(int argc, char *argv[])
bns_destroy(bns);
bwt_destroy(bwt);
kseq_destroy(seq);
gzclose(fp);
err_gzclose(fp);
return 0;
}

8
ksw.c
View File

@ -364,8 +364,8 @@ int main(int argc, char *argv[])
}
for (j = 0; j < 5; ++j) mat[k++] = 0;
// open file
fpt = gzopen(argv[optind], "r"); kst = kseq_init(fpt);
fpq = gzopen(argv[optind+1], "r"); ksq = kseq_init(fpq);
fpt = xzopen(argv[optind], "r"); kst = kseq_init(fpt);
fpq = xzopen(argv[optind+1], "r"); ksq = kseq_init(fpq);
// all-pair alignment
while (kseq_read(ksq) > 0) {
ksw_query_t *q[2];
@ -394,8 +394,8 @@ int main(int argc, char *argv[])
}
free(q[0]); free(q[1]);
}
kseq_destroy(kst); gzclose(fpt);
kseq_destroy(ksq); gzclose(fpq);
kseq_destroy(kst); err_gzclose(fpt);
kseq_destroy(ksq); err_gzclose(fpq);
return 0;
}
#endif // _KSW_MAIN

View File

@ -80,7 +80,7 @@ static seqs_t *load_seqs(const char *fn)
p->n = xstrdup((const char*)seq->name.s);
}
kseq_destroy(seq);
gzclose(fp);
err_gzclose(fp);
fprintf(stderr, "[load_seqs] %d sequences are loaded.\n", s->n_seqs);
return s;
}
@ -123,7 +123,7 @@ static void aln_seqs(const seqs_t *ss, const char *fn)
}
}
kseq_destroy(seq);
gzclose(fp);
err_gzclose(fp);
}
int bwa_stdsw(int argc, char *argv[])

39
utils.c
View File

@ -24,6 +24,7 @@
*/
/* Contact: Heng Li <lh3@sanger.ac.uk> */
#define FSYNC_ON_FLUSH
#include <stdio.h>
#include <stdarg.h>
@ -31,6 +32,11 @@
#include <string.h>
#include <zlib.h>
#include <errno.h>
#ifdef FSYNC_ON_FLUSH
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#endif
#include <sys/resource.h>
#include <sys/time.h>
#include "utils.h"
@ -196,6 +202,28 @@ int err_fflush(FILE *stream)
{
_err_fatal_simple("fflush", strerror(errno));
}
#ifdef FSYNC_ON_FLUSH
/* Calling fflush() ensures that all the data has made it to the
kernel buffers, but this may not be sufficient for remote filesystems
(e.g. NFS, lustre) as an error may still occur while the kernel
is copying the buffered data to the file server. To be sure of
catching these errors, we need to call fsync() on the file
descriptor, but only if it is a regular file. */
{
struct stat sbuf;
if (0 != fstat(fileno(stream), &sbuf))
{
_err_fatal_simple("fstat", strerror(errno));
}
if (S_ISREG(sbuf.st_mode))
{
if (0 != fsync(fileno(stream)))
{
_err_fatal_simple("fsync", strerror(errno));
}
}
}
#endif
return ret;
}
@ -209,6 +237,17 @@ int err_fclose(FILE *stream)
return ret;
}
int err_gzclose(gzFile file)
{
int ret = gzclose(file);
if (Z_OK != ret)
{
_err_fatal_simple("gzclose", Z_ERRNO == ret ? strerror(errno) : zError(ret));
}
return ret;
}
void *err_calloc(size_t nmemb, size_t size, const char *file, unsigned int line, const char *func)
{
void *p = calloc(nmemb, size);

View File

@ -78,6 +78,7 @@ extern "C" {
ATTRIBUTE((format(printf, 1, 2)));
int err_fflush(FILE *stream);
int err_fclose(FILE *stream);
int err_gzclose(gzFile file);
void *err_calloc(size_t nmemb, size_t size, const char *file, unsigned int line, const char *func);
void *err_malloc(size_t size, const char *file, unsigned int line, const char *func);