Support for parallel decompression.
Written by Mihael Gerdts <mike.gerdts@oracle.com>
These changes should be sent upstream.
--- pigz-2.3.3/Makefile.orig 2016-09-26 11:06:54.000000000 +0000
+++ pigz-2.3.3/Makefile 2016-09-26 13:16:05.000000000 +0000
@@ -70,6 +70,15 @@
compress -f < pigz.c | ./unpigz | cmp - pigz.c ;\
fi
@rm -f pigz.c.gz pigz.c.zz pigz.c.zip
+ @rm -rf d/1 d/2
+ (mkdir -p d/1; cd d/1; tar xzf ../../../../pigz-2.3.3.tar.gz; \
+ cd ..; cp -pr 1 2; ../pigz -rp 4 --index %z 1; \
+ ../pigz -drp 4 --index %z 1; diff -r 1 2)
+ @rm -rf d/1 d/2
+ (mkdir -p d/1; cd d/1; tar xzf ../../../../pigz-2.3.3.tar.gz; \
+ cd ..; cp -pr 1 2; ../pigz -zrp 4 -X %f.idx 1; \
+ ../pigz -dzrp 4 -X %f.idx 1; diff -r 1 2)
+ @rm -rf d/1 d/2
tests: dev test
./pigzn -kf pigz.c ; ./pigz -t pigz.c.gz
--- pigz-2.3.3/pigz.1.orig 2016-09-26 11:07:52.000000000 +0000
+++ pigz-2.3.3/pigz.1 2016-09-26 11:12:03.000000000 +0000
@@ -185,6 +185,14 @@
.B -V --version
Show the version of pigz.
.TP
+.B -X --index file
+During compression, create an index that can be used for parallel
+decompression. During decompression, use the specified index file for parallel
+decompression. Each occurrence of %f and %z are replaced by the uncompressed
+and compressed file names, respectively. If the index file is the same file as
+the compressed file, the index is written to or read from the end of the
+compressed file.
+.TP
.B -z --zlib
Compress to zlib (.zz) instead of gzip format.
.TP
--- pigz-2.3.3/pigz.c.orig 2016-09-26 11:07:43.000000000 +0000
+++ pigz-2.3.3/pigz.c 2016-09-27 08:09:45.122201079 +0000
@@ -218,14 +218,27 @@
the --independent or -i option, so that the blocks can be decompressed
independently for partial error recovery or for random access.
- Decompression can't be parallelized over an arbitrary number of processors
- like compression can be, at least not without specially prepared deflate
- streams for that purpose. As a result, pigz uses a single thread (the main
- thread) for decompression, but will create three other threads for reading,
- writing, and check calculation, which can speed up decompression under some
+ The --index or -X option causes the generation of a block index which can be
+ used for parallel decompression. The block index can be appended onto the
+ compressed output or it may be stored in a separate file. The uncompressed
+ size, compressed size, checksum of each block are stored in the index,
+ allowing future applications to perform random reads of the compressed file.
+ Streams generated with -X are readable by legacy versions of pigz and gzip.
+
+ Decompression can be parallelized, but only if a block index is available.
+ If a block index is not present, pigz uses a single thread (the main thread)
+ for decompression, but will create three other threads for reading, writing,
+ and check calculation, which can speed up decompression under some
circumstances. Parallel decompression can be turned off by specifying one
process (-dp 1 or -tp 1).
+ If the block index is present, the main thread reads the input file and
+ dispatches each block to an uncompress thread. The uncompress thread
+ uncompresses the block, verifies the block checksum, and passes the block
+ off to a writer thread. The writer thread writes the blocks in order,
+ and combines the individual block checksums into a per-file checksum. The
+ per-file checksum is compared to the checksum in the stream's trailer.
+
pigz requires zlib 1.2.1 or later to allow setting the dictionary when doing
raw deflate. Since zlib 1.2.3 corrects security vulnerabilities in zlib
version 1.2.1 and 1.2.2, conditionals check for zlib 1.2.3 or later during
@@ -260,7 +273,7 @@
jobs until instructed to return. When a job is pulled, the dictionary, if
provided, will be loaded into the deflate engine and then that input buffer
is dropped for reuse. Then the input data is compressed into an output
- buffer that grows in size if necessary to hold the compressed data. The job
+ buffer that grows in size if necessary to hold the compressed data. The job
is then put into the write job list, sorted by the sequence number. The
compress thread however continues to calculate the check value on the input
data, either a CRC-32 or Adler-32, possibly in parallel with the write
@@ -286,13 +299,14 @@
can't get way ahead of the write thread and build up a large backlog of
unwritten compressed data. The write thread will write the compressed data,
drop the output buffer, and then wait for the check value to be unlocked
- by the compress thread. Then the write thread combines the check value for
- this chunk with the total check value for eventual use in the trailer. If
- this is not the last chunk, the write thread then goes back to look for the
- next output chunk in sequence. After the last chunk, the write thread
- returns and joins the main thread. Unlike the compress threads, a new write
- thread is launched for each input stream. The write thread writes the
- appropriate header and trailer around the compressed data.
+ by the compress thread. Then the write thread writes an index entry (if -X)
+ and combines the check value for this chunk with the total check value for
+ eventual use in the trailer. If this is not the last chunk, the write thread
+ then goes back to look for the next output chunk in sequence. After the last
+ chunk, the write thread returns and joins the main thread. Unlike the
+ compress threads, a new write thread is launched for each input stream. The
+ write thread writes the appropriate header and trailer around the compressed
+ data.
The input and output buffers are reused through their collection in pools.
Each buffer has a use count, which when decremented to zero returns the
@@ -341,6 +355,9 @@
#if __STDC_VERSION__-0 >= 199901L || __GNUC__-0 >= 3
# include <inttypes.h> /* intmax_t */
#endif
+#include <stddef.h> /* offsetof() */
+#include <sys/mman.h> /* mmap() */
+#include <netinet/in.h> /* htonl() */
#ifdef DEBUG
# if defined(__APPLE__)
@@ -473,9 +490,11 @@
char *prog; /* name by which pigz was invoked */
int ind; /* input file descriptor */
int outd; /* output file descriptor */
+ int idxd; /* index file descriptor */
char *inf; /* input file name (allocated) */
size_t inz; /* input file name allocated size */
char *outf; /* output file name (allocated) */
+ char *index; /* index file name template (may have %f, %z) */
int verbosity; /* 0 = quiet, 1 = normal, 2 = verbose, 3 = trace */
int headis; /* 1 to store name, 2 to store date, 3 both */
int pipeout; /* write output to stdout even if file */
@@ -620,7 +639,7 @@
local void yarn_free(void *ptr)
{
- return free_track(&mem_track, ptr);
+ free_track(&mem_track, ptr);
}
#endif
@@ -820,12 +839,15 @@
#endif
+local void idx_abort(void);
+
/* abort or catch termination signal */
local void cut_short(int sig)
{
if (sig == SIGINT) {
Trace(("termination by user"));
}
+ idx_abort();
if (g.outd != -1 && g.outd != 1) {
unlink(g.outf);
RELEASE(g.outf);
@@ -951,11 +973,23 @@
return dos;
}
-/* put a 4-byte integer into a byte array in LSB order or MSB order */
+/* put integers into a byte array in LSB order or MSB order */
#define PUT2L(a,b) (*(a)=(b)&0xff,(a)[1]=(b)>>8)
#define PUT4L(a,b) (PUT2L(a,(b)&0xffff),PUT2L((a)+2,(b)>>16))
+#define PUT8L(a,b) (PUT4L(a,(b)&0xffffffff),PUT4L((a)+4,(b)>>32))
#define PUT4M(a,b) (*(a)=(b)>>24,(a)[1]=(b)>>16,(a)[2]=(b)>>8,(a)[3]=(b))
+/* pull LSB order or MSB order integers from an unsigned char buffer */
+#define PULL2L(p) ((p)[0] + ((unsigned)((p)[1]) << 8))
+#define PULL4L(p) (PULL2L(p) + ((unsigned long)(PULL2L((p) + 2)) << 16))
+#define PULL8L(p) ((uint64_t)((p)[0]) | ((uint64_t)((p)[1]) << 8) | \
+ ((uint64_t)((p)[2]) << 16) | ((uint64_t)((p)[3]) << 24) | \
+ ((uint64_t)((p)[4]) << 32) | ((uint64_t)((p)[5]) << 40) | \
+ ((uint64_t)((p)[6]) << 48) | ((uint64_t)((p)[7]) << 56))
+#define PULL2M(p) (((unsigned)((p)[0]) << 8) + (p)[1])
+#define PULL4M(p) (((unsigned long)(PULL2M(p)) << 16) + PULL2M((p) + 2))
+
+
/* write a gzip, zlib, or zip header using the information in the globals */
local unsigned long put_header(void)
{
@@ -1253,7 +1287,7 @@
/* get a space from a pool -- the use count is initially set to one, so there
is no need to call use_space() for the first use */
-local struct space *get_space(struct pool *pool)
+local struct space *get_space_size(struct pool *pool, size_t size)
{
struct space *space;
@@ -1266,6 +1300,15 @@
if (pool->head != NULL) {
space = pool->head;
possess(space->use);
+ /* If there's not enough space, free and malloc rather than realloc to
+ avoid the potential of an unnecessary memory copy. */
+ if (space->size < size) {
+ free(space->buf);
+ space->buf = malloc(size);
+ if (space->buf == NULL)
+ throw(ENOMEM, "not enough memory");
+ space->size = size;
+ }
pool->head = space->next;
twist(pool->have, BY, -1); /* one less in pool */
twist(space->use, TO, 1); /* initially one user */
@@ -1281,13 +1324,18 @@
release(pool->have);
space = alloc(NULL, sizeof(struct space));
space->use = new_lock(1); /* initially one user */
- space->buf = alloc(NULL, pool->size);
- space->size = pool->size;
+ space->buf = alloc(NULL, size);
+ space->size = size;
space->len = 0;
space->pool = pool; /* remember the pool this belongs to */
return space;
}
+local struct space *get_space(struct pool *pool)
+{
+ return get_space_size(pool, pool->size);
+}
+
/* increase the size of the buffer in space */
local void grow_space(struct space *space)
{
@@ -1354,17 +1402,35 @@
return count;
}
+/* prompt for permission to overwrite a file */
+local int allow_overwrite(const char *path)
+{
+ char ch;
+ int reply = -1;
+
+ fprintf(stderr, "%s exists -- overwrite (y/n)? ", path);
+ fflush(stderr);
+ do {
+ ch = getchar();
+ if (reply < 0 && ch != ' ' && ch != '\t')
+ reply = ch == 'y' || ch == 'Y' ? 1 : 0;
+ } while (ch != EOF && ch != '\n' && ch != '\r');
+ return reply;
+}
+
/* input and output buffer pools */
local struct pool in_pool;
local struct pool out_pool;
local struct pool dict_pool;
local struct pool lens_pool;
+local struct pool idx_pool;
/* -- parallel compression -- */
/* compress or write job (passed from compress list to write list) -- if seq is
equal to -1, compress_thread is instructed to return; if more is false then
- this is the last chunk, which after writing tells write_thread to return */
+ this is the last chunk, which after writing tells compress_write_thread to
+ return */
struct job {
long seq; /* sequence number */
int more; /* true if this is not the last chunk */
@@ -1411,6 +1477,7 @@
new_pool(&out_pool, OUTPOOL(g.block), -1);
new_pool(&dict_pool, DICT, -1);
new_pool(&lens_pool, g.block >> (RSYNCBITS - 1), -1);
+ new_pool(&idx_pool, 1, -1);
}
/* command the compress threads to all return, then join them all (call from
@@ -1447,6 +1514,8 @@
Trace(("-- freed %d output buffers", caught));
caught = free_pool(&in_pool);
Trace(("-- freed %d input buffers", caught));
+ caught = free_pool(&idx_pool);
+ Trace(("-- freed %d index buffers", caught));
free_lock(write_first);
free_lock(compress_have);
compress_have = NULL;
@@ -1710,18 +1779,483 @@
}
}
+/* Block Index
+
+ The block index is an array of idx_entry structs followed by an idx_trailer
+ struct. They are written to the file in LSB order. The block index can
+ exist as a standalone file or be appended onto the compressed files.
+
+ The trailer is used to identify a block index. The beginning of the trailer
+ contains a magic number that is a value too large to be confused with a valid
+ block length. Aside from backwards P's the magic number looks kinda like
+ "0xf pigzip 0xf". */
+#define IDXMAGIC 0xf916219f
+
+struct idx_trailer {
+ uint32_t magic;
+ uint64_t count;
+};
+
+struct idx_entry {
+ uint32_t infsz; /* inflated size of the block */
+ uint32_t defsz; /* deflated size of the block */
+ uint32_t check; /* adler32 or crc32 checksum of the block */
+};
+
+local struct {
+ int valid; /* Do the rest of these fields mean anything? */
+
+ /* An array of entries. References address in space or map */
+ struct idx_entry *ents; /* not in right byte order, used for offset */
+ uint64_t seq; /* current entry */
+ int64_t eof; /* has the last entry been retrieved? */
+
+ /* When compressing and appending, entries are stored in space->buf. */
+ int append; /* is the index at end of compressed file? */
+ struct space *space; /* space for storage of index */
+
+ /* The following are valid only when mmap is used. */
+ uchar_t *map; /* mmap'd region containing ents */
+ size_t mapsz; /* size of mmap'd region at map */
+ off_t mapoff; /* bytes between map and ents */
+
+ /* Index path, after %f and %z are replaced. */
+ char path[PATH_MAX+1];
+} idx;
+
+/* determines if the two paths refer to the same extant file */
+local int same_file(const char *f1, const char *f2)
+{
+ struct stat s1;
+ struct stat s2;
+
+ return (stat(f1, &s1) == 0 && stat(f2, &s2) == 0 &&
+ s1.st_dev == s2.st_dev && s1.st_ino == s2.st_ino);
+}
+
+/* Remove the index file, but only if it is not the same as in or out.
+ We don't worry about a full cleanup, as this should only be called in an
+ error path just before exiting. */
+local void idx_abort(void)
+{
+ if (!idx.valid)
+ return;
+ if (idx.path[0] == '\0' || idx.append)
+ return;
+ (void) unlink(idx.path);
+}
+
+/* If 0 is returned, a trailer was found and read. Non-zero return means
+ there was no trailer. Does not exit. Does not change file pointer for fd. */
+local int idx_read_trailer(int fd, char *path, struct idx_trailer *trail)
+{
+ uchar_t buf[sizeof(*trail)];
+ off_t off;
+ struct stat st;
+
+ if (fd < 0) {
+ Trace(("%s: index file descriptor %d not valid", path, fd));
+ return -1;
+ }
+ if (fstat(fd, &st) != 0 || !S_ISREG(st.st_mode)) {
+ Trace(("%s: index appended to non-regular file", path));
+ return -1;
+ }
+ off = st.st_size - sizeof(*trail);
+ if (off < 0) {
+ Trace(("%s: index file too short for header", path));
+ return -1;
+ }
+ if (pread(fd, buf, sizeof(buf), off) != sizeof(buf)) {
+ Trace(("%s: unable to read index trailer", path));
+ return -1;
+ }
+ trail->magic = PULL4L(buf);
+ trail->count = PULL8L(buf + 4);
+
+ if (trail->magic != IDXMAGIC) {
+ Trace(("%s: invalid pigz index magic", path));
+ return -1;
+ }
+ return 0;
+}
+
+/* Expand a path pattern containing %f and/or %z tokens into a full path.
+ * Result is stored in idx.path. */
+local int expand_pathpat(char *pathpat)
+{
+ char *copy = NULL; /* points to in or out global */
+ char *suf = NULL; /* suffix (.zz, .gz, etc.) */
+ int chop_suffix;
+ int len;
+ int i;
+ int j;
+ int nag;
+
+ /* Be quiet when opportunistic index use check is being done. */
+ nag = ((g.index == NULL) && strcmp(pathpat, "%z"));
+
+ for (i = 0, j = 0; pathpat[i] && j < sizeof(idx.path); i++) {
+ if (pathpat[i] != '%') {
+ idx.path[j++] = pathpat[i];
+ continue;
+ }
+ i++;
+ switch (pathpat[i]) {
+ case '%': /* %% is replaced by % */
+ idx.path[j++] = '%';
+ continue;
+ case 'f': /* %f is replaced by uncompressed file name */
+ if (g.decode) {
+ if (strcmp(g.outf, "<stdout>") != 0) {
+ copy = g.outf; /* uncompressed file */
+ chop_suffix = 0;
+ break;
+ }
+ if (strcmp(g.inf, "<stdin>") != 0) {
+ copy = g.inf; /* compressed file */
+ chop_suffix = 1;
+ suf = strrchr(g.inf, '.');
+ break;
+ }
+ if (nag)
+ complain("file name for %%f unknown");
+ return -1;
+ }
+
+ if (strcmp(g.outf, "<stdout>") != 0) {
+ copy = g.outf; /* compressed file */
+ chop_suffix = 1;
+ suf = strrchr(g.outf, '.');
+ break;
+ }
+ if (strcmp(g.inf, "<stdin>") != 0) {
+ copy = g.inf; /* uncompressed file */
+ chop_suffix = 0;
+ break;
+ }
+ if (nag)
+ complain("file name for %%f unknown");
+ return -1;
+ case 'z': /* %z is replaced by compressed file name */
+ chop_suffix = 0;
+ if (g.decode) {
+ if (strcmp(g.inf, "<stdin>") == 0) {
+ if (nag)
+ complain("file name for %%z unknown");
+ return -1;
+ }
+ copy = g.inf;
+ break;
+ }
+ if (strcmp(pathpat, "%z") == 0) {
+ /* index will be appended onto stdout */
+ copy = NULL;
+ idx.append = 1;
+ break;
+ }
+ if (strcmp(g.outf, "<stdout>") == 0) {
+ if (nag)
+ complain("file name for %%z unknown");
+ return -1;
+ }
+ copy = g.outf;
+ break;
+ default:
+ if (nag) {
+ complain("invalid %% sequence in index file pattern %s",
+ pathpat);
+ }
+ return -1;
+ }
+
+ /* pathpat is "%z" and out is stdout */
+ if (copy == NULL)
+ break;
+
+ len = strlen(&idx.path[j]) + strlen(copy);
+ if (chop_suffix)
+ len -= strlen(suf);
+ if (len >= (sizeof(idx.path) - j)) {
+ if (nag)
+ complain("index file name too long");
+ return -1;
+ }
+ (void)strncpy(&idx.path[j], copy, sizeof(idx.path) - j);
+ j += len;
+ assert(j <= sizeof(idx.path));
+ }
+ if (j == sizeof(idx.path)) {
+ idx.path[j-1] = '\0';
+ if (nag)
+ complain("index file \"%s...\" name too long", idx.path);
+ return -1;
+ }
+ idx.path[j] = '\0';
+
+ if (copy == NULL && idx.append) {
+ (void)strncpy(idx.path, g.outf, sizeof(idx.path));
+ idx.path[sizeof(idx.path) - 1] = '\0';
+ }
+ else {
+ if (same_file(g.decode ? g.outf : g.inf, idx.path)) {
+ if (nag)
+ complain("index file %s must not be same as uncompressed file",
+ idx.path);
+ return -1;
+ }
+
+ idx.append = same_file(g.decode ? g.inf : g.outf, idx.path);
+ }
+
+ if (g.verbosity > 1)
+ (void) fprintf(stderr, "index %s ", idx.path);
+
+ return 0;
+}
+
+/* open the index file associated with the current input or output file. */
+local int idx_open(char *pathpat)
+{
+ int ret;
+ struct stat st;
+
+ assert(pathpat != NULL);
+
+ memset(&idx, 0, sizeof(idx));
+
+ setup_jobs();
+
+ g.idxd = -1;
+
+ if (expand_pathpat(pathpat) != 0)
+ return -1;
+
+ if (g.decode) { /* Uncompress */
+ int64_t sz;
+ int64_t off;
+ long pagesize;
+
+ /* Position idxd at the first index record to read. */
+ if (idx.append) {
+ struct idx_trailer trail;
+
+ /* uncompressing, index at end of compressed file */
+ if (idx_read_trailer(g.ind, g.inf, &trail) != 0) {
+ complain("%s: could not read index", g.inf);
+ return -1;
+ }
+
+ g.idxd = dup(g.ind);
+ if (fstat(g.idxd, &st) != 0 || !S_ISREG(st.st_mode)) {
+ complain("%s: index appended to non-regular file", idx.path);
+ (void) close(g.idxd);
+ return -1;
+ }
+ off = st.st_size - sizeof(trail);
+ sz = trail.count * sizeof(struct idx_entry);
+ off -= sz; /* offset into file of first idx_entry */
+ } else {
+ /* Uncompressing, index in a different file. */
+ if ((g.idxd = open(idx.path, O_RDONLY)) < 0) {
+ complain("%s: unable to open index file", idx.path);
+ return -1;
+ }
+ if (fstat(g.idxd, &st) != 0) {
+ complain("%s: unable to stat index file", idx.path);
+ (void) close(g.idxd);
+ return -1;
+ }
+ off = 0;
+ }
+ /* Try to mmap the index file and let the OS manage the space used by
+ the index entries. The starting offset of must be a multiple of the
+ page size. The mapping will end at the end of the file. */
+ if ((pagesize = sysconf(_SC_PAGESIZE)) > 0) {
+ off_t moff; /* mmap offset in idxd */
+
+ /* moff is the beginning of the page containing off */
+ moff = off & ~(pagesize -1);
+ idx.mapsz = st.st_size - moff;
+ idx.map = mmap(NULL, idx.mapsz, PROT_READ, MAP_PRIVATE, g.idxd, moff);
+ if (idx.map != MAP_FAILED) {
+ (void)close(g.idxd);
+ g.idxd = -1;
+
+ /* set up array for idx_get() */
+ idx.ents = (struct idx_entry*)(idx.map + (off & (pagesize -1)));
+
+ idx.valid = 1;
+ return 0;
+ }
+ idx.mapsz = 0;
+ idx.map = NULL;
+ }
+ /* unable to mmap. Ensure idxfd is positioned properly. */
+ if (lseek(g.idxd, off, SEEK_SET) != off) {
+ complain("%s: unable to seek on index file", idx.path);
+ return -1;
+ }
+ idx.valid = 1;
+ return 0;
+ }
+
+ /* compress - entries will be added to idx.space or idxd. */
+ if (idx.append) {
+ idx.space = get_space(&idx_pool);
+ idx.valid = 1;
+ return 0;
+ }
+
+ g.idxd = open(idx.path, O_WRONLY | O_CREAT | O_TRUNC | (g.force ? 0 : O_EXCL),
+ 0600);
+ if (g.idxd < 0 && errno == EEXIST && isatty(0) && g.verbosity &&
+ allow_overwrite(idx.path)) {
+ g.idxd = open(idx.path, O_CREAT | O_TRUNC | O_WRONLY, 0600);
+ if (g.idxd == -1) {
+ complain("%s: %s", idx.path, strerror(errno));
+ return -1;
+ }
+ }
+ idx.valid = 1;
+ return 0;
+}
+
+local void idx_get_next(struct idx_entry *entry)
+{
+ uchar_t buf[sizeof(*entry)];
+ uchar_t *base;
+
+ if (idx.ents != NULL)
+ base = (uchar_t *)&idx.ents[idx.seq];
+ else {
+ readn(g.idxd, buf, sizeof(buf));
+ base = buf;
+ }
+ entry->infsz = PULL4L(base);
+ entry->defsz = PULL4L(base + 4);
+ entry->check = PULL4L(base + 8);
+}
+
+/* Returns the fields of the next index entry. */
+local void idx_get(uint64_t *inflated, uint64_t *deflated, uint64_t *check,
+ int *last)
+{
+ struct idx_trailer *t;
+ static struct idx_entry entry; /* value from previous call */
+
+ assert(!idx.eof);
+
+ if (idx.seq == 0)
+ idx_get_next(&entry);
+
+ *inflated = entry.infsz;
+ *deflated = entry.defsz;
+ *check = entry.check;
+ idx.seq++;
+
+ /* Look for trailer after this. Value retained for next call. */
+ idx_get_next(&entry);
+
+ t = (struct idx_trailer *)&entry;
+ *last = (t->magic == IDXMAGIC);
+ idx.eof = *last;
+}
+
+local void idx_add(size_t insz, size_t outsz, unsigned long check)
+{
+ uchar_t buf[sizeof(struct idx_entry)];
+ uchar_t *start;
+
+ idx.seq++;
+
+ /* point start at the right buffer, ensuring it is big enough */
+ if (g.idxd != -1) {
+ start = buf;
+ } else {
+ possess(idx.space->use);
+ while (idx.space->size - idx.space->len < sizeof(struct idx_entry))
+ grow_space(idx.space);
+ start = idx.space->buf + idx.space->len;
+ }
+
+ /* copy data into buffer */
+ PUT4L(start, (uint32_t)insz);
+ PUT4L(start + 4, (uint32_t)outsz);
+ PUT4L(start + 8, (uint32_t)check);
+
+ if (g.idxd != -1)
+ writen(g.idxd, buf, sizeof(buf));
+ else {
+ idx.space->len += sizeof(struct idx_entry);
+ release(idx.space->use);
+ }
+}
+
+local void idx_close(void)
+{
+ uchar_t buf[sizeof(struct idx_trailer)];
+
+ assert(idx.valid);
+ idx.valid = 0;
+
+ if (g.decode && !g.keep && !idx.append)
+ (void)unlink(idx.path);
+
+ if (idx.map != NULL) { /* uncompressing, using mmap'd index */
+ (void)munmap(idx.map, idx.mapsz);
+ idx.ents = NULL;
+ return;
+ }
+
+ if (g.decode) { /* uncompressing, from a file */
+ (void)close(g.idxd);
+ g.idxd = -1;
+ return;
+ }
+
+ if (idx.space != NULL) { /* compressing, append to output file */
+ writen(g.outd, idx.space->buf, idx.space->len);
+ release(idx.space->use);
+ drop_space(idx.space);
+ }
+
+ PUT4L(buf, IDXMAGIC);
+ PUT8L(buf + 4, idx.seq);
+
+ writen(idx.append ? g.outd : g.idxd, buf, sizeof(buf));
+
+ if (g.idxd != -1) {
+ (void) close(g.idxd);
+ g.idxd = -1;
+ }
+}
+
+/* Does the compressed input file have an index appended? */
+local int ind_has_index(void)
+{
+ struct idx_trailer trail;
+
+ /* Not relevant unless we are uncompressing */
+ if (g.decode == 0)
+ return (0);
+
+ return (idx_read_trailer(g.ind, g.inf, &trail) == 0);
+}
+
/* collect the write jobs off of the list in sequence order and write out the
compressed data until the last chunk is written -- also write the header and
trailer and combine the individual check values of the input buffers */
-local void write_thread(void *dummy)
+local void compress_write_thread(void *dummy)
{
long seq; /* next sequence number looking for */
struct job *job; /* job pulled and working on */
size_t len; /* input length */
+ size_t olen; /* output length */
int more; /* true if more chunks to write */
unsigned long head; /* header length */
unsigned long ulen; /* total uncompressed size (overflow ok) */
- unsigned long clen; /* total compressed size (overflow ok) */
+ size_t clen; /* total compressed size */
unsigned long check; /* check value of uncompressed data */
ball_t err; /* error information from throw() */
@@ -1747,23 +2281,27 @@
/* update lengths, save uncompressed length for COMB */
more = job->more;
len = job->in->len;
+ olen = job->out->len;
drop_space(job->in);
ulen += (unsigned long)len;
- clen += (unsigned long)(job->out->len);
+ clen += olen;
/* write the compressed data and drop the output buffer */
Trace(("-- writing #%ld", seq));
- writen(g.outd, job->out->buf, job->out->len);
+ writen(g.outd, job->out->buf, olen);
drop_space(job->out);
Trace(("-- wrote #%ld%s", seq, more ? "" : " (last)"));
- /* wait for check calculation to complete, then combine, once
- the compress thread is done with the input, release it */
+ /* wait for check calculation to complete, then combine */
possess(job->calc);
wait_for(job->calc, TO_BE, 1);
release(job->calc);
check = COMB(check, job->check, len);
+ /* update the block index */
+ if (g.index)
+ idx_add(len, olen, job->check);
+
/* free the job */
free_lock(job->calc);
FREE(job);
@@ -1845,7 +2383,7 @@
setup_jobs();
/* start write thread */
- writeth = launch(write_thread, NULL);
+ writeth = launch(compress_write_thread, NULL);
/* read from input and start compress threads (write thread will pick up
the output of the compress threads) */
@@ -2303,7 +2841,7 @@
#ifndef NOTHREAD
/* if first time in or procs == 1, read a buffer to have something to
return, otherwise wait for the previous read job to complete */
- if (g.procs > 1) {
+ if (g.procs > 1 && g.index == NULL && !ind_has_index()) {
/* if first time, fire up the read thread, ask for a read */
if (g.in_which == -1) {
g.in_which = 1;
@@ -2404,12 +2942,6 @@
g.in_next += togo; \
} while (0)
-/* pull LSB order or MSB order integers from an unsigned char buffer */
-#define PULL2L(p) ((p)[0] + ((unsigned)((p)[1]) << 8))
-#define PULL4L(p) (PULL2L(p) + ((unsigned long)(PULL2L((p) + 2)) << 16))
-#define PULL2M(p) (((unsigned)((p)[0]) << 8) + (p)[1])
-#define PULL4M(p) (((unsigned long)(PULL2M(p)) << 16) + PULL2M((p) + 2))
-
/* convert MS-DOS date and time to a Unix time, assuming current timezone
(you got a better idea?) */
local time_t dos2time(unsigned long dos)
@@ -3033,6 +3565,73 @@
return 0;
}
+local void check_trailer(unsigned long check, off_t clen)
+{
+ unsigned tmp2; /* used by GET4() */
+ unsigned long tmp4; /* used by GET4() */
+ unsigned long len;
+
+ /* read and check trailer */
+ if (g.form > 1) { /* zip local trailer (if any) */
+ if (g.form == 3) { /* data descriptor follows */
+ /* read original version of data descriptor */
+ g.zip_crc = GET4();
+ g.zip_clen = GET4();
+ g.zip_ulen = GET4();
+ if (g.in_eof)
+ throw(EDOM, "%s: corrupted zip entry -- missing trailer: ", g.inf);
+
+ /* if crc doesn't match, try info-zip variant with sig */
+ if (g.zip_crc != g.out_check) {
+ if (g.zip_crc != 0x08074b50UL || g.zip_clen != g.out_check)
+ throw(EDOM, "%s: corrupted zip entry -- crc32 mismatch: ", g.inf);
+ g.zip_crc = g.zip_clen;
+ g.zip_clen = g.zip_ulen;
+ g.zip_ulen = GET4();
+ }
+
+ /* handle incredibly rare cases where crc equals signature */
+ else if (g.zip_crc == 0x08074b50UL && g.zip_clen == g.zip_crc &&
+ ((clen & LOW32) != g.zip_crc || g.zip_ulen == g.zip_crc)) {
+ g.zip_crc = g.zip_clen;
+ g.zip_clen = g.zip_ulen;
+ g.zip_ulen = GET4();
+ }
+
+ /* if second length doesn't match, try 64-bit lengths */
+ if (g.zip_ulen != (g.out_tot & LOW32)) {
+ g.zip_ulen = GET4();
+ (void)GET4();
+ }
+ if (g.in_eof)
+ throw(EDOM, "%s: corrupted zip entry -- missing trailer: ", g.inf);
+ }
+ if (g.zip_clen != (clen & LOW32) || g.zip_ulen != (g.out_tot & LOW32))
+ throw(EDOM, "%s: corrupted zip entry -- length mismatch: ", g.inf);
+ check = g.zip_crc;
+ }
+ else if (g.form == 1) { /* zlib (big-endian) trailer */
+ check = (unsigned long)(GET()) << 24;
+ check += (unsigned long)(GET()) << 16;
+ check += (unsigned)(GET()) << 8;
+ check += GET();
+ if (g.in_eof)
+ throw(EDOM, "%s: corrupted zlib stream -- missing trailer: ", g.inf);
+ if (check != g.out_check)
+ throw(EDOM, "%s: corrupted zlib stream -- adler32 mismatch: ", g.inf);
+ }
+ else { /* gzip trailer */
+ check = GET4();
+ len = GET4();
+ if (g.in_eof)
+ throw(EDOM, "%s: corrupted gzip stream -- missing trailer: ", g.inf);
+ if (check != g.out_check)
+ throw(EDOM, "%s: corrupted gzip stream -- crc32 mismatch: ", g.inf);
+ if (len != (g.out_tot & LOW32))
+ throw(EDOM, "%s: corrupted gzip stream -- length mismatch: ", g.inf);
+ }
+}
+
/* inflate for decompression or testing -- decompress from ind to outd unless
decode != 1, in which case just test ind, and then also list if list != 0;
look for and decode multiple, concatenated gzip and/or zlib streams;
@@ -3040,10 +3639,8 @@
local void infchk(void)
{
int ret, cont, was;
- unsigned long check, len;
+ unsigned long check;
z_stream strm;
- unsigned tmp2;
- unsigned long tmp4;
off_t clen;
cont = 0;
@@ -3080,72 +3677,7 @@
/* compute compressed data length */
clen = g.in_tot - g.in_left;
- /* read and check trailer */
- if (g.form > 1) { /* zip local trailer (if any) */
- if (g.form == 3) { /* data descriptor follows */
- /* read original version of data descriptor */
- g.zip_crc = GET4();
- g.zip_clen = GET4();
- g.zip_ulen = GET4();
- if (g.in_eof)
- throw(EDOM, "%s: corrupted entry -- missing trailer",
- g.inf);
-
- /* if crc doesn't match, try info-zip variant with sig */
- if (g.zip_crc != g.out_check) {
- if (g.zip_crc != 0x08074b50UL || g.zip_clen != g.out_check)
- throw(EDOM, "%s: corrupted entry -- crc32 mismatch",
- g.inf);
- g.zip_crc = g.zip_clen;
- g.zip_clen = g.zip_ulen;
- g.zip_ulen = GET4();
- }
-
- /* handle incredibly rare cases where crc equals signature */
- else if (g.zip_crc == 0x08074b50UL &&
- g.zip_clen == g.zip_crc &&
- ((clen & LOW32) != g.zip_crc ||
- g.zip_ulen == g.zip_crc)) {
- g.zip_crc = g.zip_clen;
- g.zip_clen = g.zip_ulen;
- g.zip_ulen = GET4();
- }
-
- /* if second length doesn't match, try 64-bit lengths */
- if (g.zip_ulen != (g.out_tot & LOW32)) {
- g.zip_ulen = GET4();
- (void)GET4();
- }
- if (g.in_eof)
- throw(EDOM, "%s: corrupted entry -- missing trailer",
- g.inf);
- }
- if (g.zip_clen != (clen & LOW32) ||
- g.zip_ulen != (g.out_tot & LOW32))
- throw(EDOM, "%s: corrupted entry -- length mismatch",
- g.inf);
- check = g.zip_crc;
- }
- else if (g.form == 1) { /* zlib (big-endian) trailer */
- check = (unsigned long)(GET()) << 24;
- check += (unsigned long)(GET()) << 16;
- check += (unsigned)(GET()) << 8;
- check += GET();
- if (g.in_eof)
- throw(EDOM, "%s: corrupted -- missing trailer", g.inf);
- if (check != g.out_check)
- throw(EDOM, "%s: corrupted -- adler32 mismatch", g.inf);
- }
- else { /* gzip trailer */
- check = GET4();
- len = GET4();
- if (g.in_eof)
- throw(EDOM, "%s: corrupted -- missing trailer", g.inf);
- if (check != g.out_check)
- throw(EDOM, "%s: corrupted -- crc32 mismatch", g.inf);
- if (len != (g.out_tot & LOW32))
- throw(EDOM, "%s: corrupted -- length mismatch", g.inf);
- }
+ check_trailer(check, clen);
/* show file information if requested */
if (g.list) {
@@ -3169,6 +3701,231 @@
complain("warning: %s: trailing junk was ignored", g.inf);
}
+local void uncompress_write_thread(void *dummy)
+{
+ long seq; /* next sequence number looking for */
+ struct job *job; /* job pulled and working on */
+ int more; /* true if more chunks to write */
+
+ (void)dummy;
+
+ seq = 0;
+ do {
+ /* get next write job in order */
+ possess(write_first);
+ wait_for(write_first, TO_BE, seq);
+ job = write_head;
+ write_head = job->next;
+ twist(write_first, TO, write_head == NULL ? -1 : write_head->seq);
+
+ /* Checksum has been verified. Accumulate the checksum, write the
+ output, and free the input and output spaces. While the input space
+ could be dropped earlier, it is done here to ensure the write queue
+ doesn't grow without bounds. */
+ g.out_check = COMB(g.out_check, job->check, job->out->len);
+ g.out_tot += job->out->len;
+
+ Trace(("-- writing #%ld", seq));
+ if (g.decode == 1) /* don't really write if just checking */
+ writen(g.outd, job->out->buf, job->out->len);
+ drop_space(job->in);
+ drop_space(job->out);
+ Trace(("-- wrote #%ld%s", seq, job->more ? "" : " (last)"));
+
+ more = job->more;
+ free(job);
+
+ seq++;
+ } while (more);
+
+ /* verify no more jobs, prepare for next use */
+ possess(compress_have);
+ assert(compress_head == NULL && peek_lock(compress_have) == 0);
+ release(compress_have);
+ possess(write_first);
+ assert(write_head == NULL);
+ twist(write_first, TO, -1);
+}
+
+local void uncompress_thread(void *dummy)
+{
+ struct job *job; /* job pulled and working on */
+ struct job *here, **prior; /* pointers for inserting in write list */
+ unsigned long check; /* check value of output */
+ z_stream strm; /* deflate stream */
+ int err; /* error from inflate() */
+ long firstcheck; /* the initial checksum value */
+
+ (void)dummy;
+
+ strm.zfree = Z_NULL;
+ strm.zalloc = Z_NULL;
+ strm.opaque = Z_NULL;
+ if (inflateInit2(&strm, -15) != Z_OK)
+ throw(ENOMEM, "not enough memory");
+
+ firstcheck = CHECK(0, Z_NULL, 0);
+
+ /* keep looking for work */
+ for (;;) {
+ possess(compress_have);
+ wait_for(compress_have, NOT_TO_BE, 0);
+ job = compress_head;
+ assert(job != NULL);
+ if (job->seq == -1)
+ break;
+ compress_head = job->next;
+ if (job->next == NULL)
+ compress_tail = &compress_head;
+ twist(compress_have, BY, -1);
+
+ /* got a job -- buffers have all been allocated to the right size.
+ deflate and verify the checksum. */
+ Trace(("-- uncompressing #%ld", job->seq));
+ if (inflateReset2(&strm, -15) != Z_OK)
+ throw(EINVAL, "stream reset failed: %s", strm.msg);
+ strm.next_in = job->in->buf;
+ strm.avail_in = job->in->len;
+ strm.next_out = job->out->buf;
+ strm.avail_out = job->out->len;
+
+ err = inflate(&strm, Z_SYNC_FLUSH);
+ if (err != Z_OK && err != Z_STREAM_END)
+ throw(EINVAL, "corrupted input -- invalid deflate data: %s", strm.msg);
+
+ /* It's not strictly necessary to verify the checksum here, but it
+ seems nice to get an error about a bad checksum as early as possible
+ to wasteful cpu and i/o consumtion. */
+ check = CHECK(firstcheck, job->out->buf, job->out->len);
+ if (check != job->check) {
+ if (g.form == 1)
+ throw(EDOM, "%s: corrupted zlib stream -- adler32 mismatch: ", g.inf);
+ else
+ throw(EDOM, "%s: corrupted gzip stream -- crc32 mismatch: ", g.inf);
+ }
+
+ Trace(("-- uncompressed #%ld%s", job->seq, job->more ? "" : " (last)"));
+
+ /* insert write job in list in sorted order, alert write thread */
+ possess(write_first);
+ prior = &write_head;
+ while ((here = *prior) != NULL) {
+ if (here->seq > job->seq)
+ break;
+ prior = &(here->next);
+ }
+ job->next = here;
+ *prior = job;
+ twist(write_first, TO, write_head->seq);
+ }
+ /* found job with seq == -1 -- free inflate memory and return to join */
+ release(compress_have);
+ (void)inflateEnd(&strm);
+}
+
+local void parallel_infchk(void)
+{
+ long seq; /* sequence number */
+ struct job *job; /* job of uncompress, then write */
+ struct space *insp; /* space for job input */
+ struct space *outsp; /* space for job output */
+ size_t fromload;
+ uint64_t infsz; /* size after inflate() */
+ uint64_t defsz; /* size before inflate() */
+ uint64_t check; /* checksum */
+ int last = 0; /* is this the last block? */
+
+ /* If the index is useless, don't try to use it. */
+ if (!idx.valid) {
+ infchk();
+ return;
+ }
+
+ if (g.form > 1) {
+ complain("index not supported with zip file ", g.inf);
+ infchk();
+ return;
+ }
+
+ /* if first time or after an option change, setup the job lists */
+ setup_jobs();
+
+ /* start write thread */
+ writeth = launch(uncompress_write_thread, NULL);
+
+ /* updated by uncompress_write_thread */
+ g.out_check = CHECK(0L, Z_NULL, 0);
+ out_len = 0;
+ g.out_tot = 0;
+
+ for (seq = 0; !last; seq++) {
+ /* get the next entry from the index */
+ idx_get(&infsz, &defsz, &check, &last);
+
+ job = malloc(sizeof(struct job));
+ if (job == NULL)
+ throw(ENOMEM, "not enough memory");
+ job->seq = seq;
+ job->more = !last;
+ job->in = get_space_size(&in_pool, defsz);
+ job->out = get_space_size(&out_pool, infsz);
+ job->lens = NULL;
+ job->check = check;
+ job->calc = NULL;
+ job->next = NULL;
+
+ /* reading the header cached some data, be sure not to skip it */
+ fromload = (g.in_left < defsz ? g.in_left : defsz);
+ if (fromload > 0) {
+ (void)memcpy(job->in->buf, g.in_next, fromload);
+ g.in_left -= fromload;
+ g.in_next += fromload;
+ }
+ if (fromload < defsz)
+ readn(g.ind, job->in->buf + fromload, defsz - fromload);
+ job->in->len = defsz;
+ job->out->len = infsz;
+
+ out_len += infsz;
+
+ /* start another uncompress thread if needed */
+ if (cthreads <= seq && cthreads < g.procs) {
+ (void)launch(uncompress_thread, NULL);
+ cthreads++;
+ }
+
+ possess(compress_have);
+ *compress_tail = job;
+ compress_tail = &(job->next);
+ twist(compress_have, BY, +1);
+ }
+
+ /* wait for the write thread to complete (we leave the compress threads out
+ there and waiting in case there is another stream to compress) */
+ join(writeth);
+ writeth = NULL;
+ Trace(("-- write thread joined"));
+
+ check_trailer(g.out_check, out_len);
+}
+
+/* parallel_infchk() or infchk(), whichever works. */
+local void best_infchk(void)
+{
+ if (g.index != NULL) {
+ /* User specified index file */
+ if (idx_open(g.index) != 0)
+ throw(EINVAL, "invalid index file");
+ }
+ else if (ind_has_index())
+ (void)idx_open("%z");
+
+ if (idx.valid)
+ parallel_infchk();
+ else
+ infchk();
+}
+
/* --- decompress Unix compress (LZW) input --- */
/* Type for accumulating bits. 23 bits will be used to accumulate up to 16-bit
@@ -3576,7 +4333,7 @@
if (g.decode == 2) {
try {
if (method == 8)
- infchk();
+ best_infchk();
else {
unlzw();
if (g.list) {
@@ -3649,19 +4406,8 @@
/* if exists and not -f, give user a chance to overwrite */
if (g.outd < 0 && errno == EEXIST && isatty(0) && g.verbosity) {
- int ch, reply;
-
- fprintf(stderr, "%s exists -- overwrite (y/n)? ", g.outf);
- fflush(stderr);
- reply = -1;
- do {
- ch = getchar();
- if (reply < 0 && ch != ' ' && ch != '\t')
- reply = ch == 'y' || ch == 'Y' ? 1 : 0;
- } while (ch != EOF && ch != '\n' && ch != '\r');
- if (reply == 1)
- g.outd = open(g.outf, O_CREAT | O_TRUNC | O_WRONLY,
- 0600);
+ if (allow_overwrite(g.outf))
+ g.outd = open(g.outf, O_CREAT | O_TRUNC | O_WRONLY, 0600);
}
/* if exists and no overwrite, report and go on to next */
@@ -3684,10 +4430,11 @@
/* process ind to outd */
if (g.verbosity > 1)
fprintf(stderr, "%s to %s ", g.inf, g.outf);
+
if (g.decode) {
try {
if (method == 8)
- infchk();
+ best_infchk();
else if (method == 257)
unlzw();
else
@@ -3708,8 +4455,14 @@
}
}
#ifndef NOTHREAD
- else if (g.procs > 1)
+ else if (g.index != NULL) {
+ if (idx_open(g.index) != 0)
+ throw(EINVAL, "invalid index file");
parallel_compress();
+ }
+ else if (g.procs > 1) {
+ parallel_compress();
+ }
#endif
else
single_compress(0);
@@ -3718,6 +4471,10 @@
fflush(stderr);
}
+ /* close index file - this may append the index to outd */
+ if (idx.valid)
+ idx_close();
+
/* finish up, copy attributes, set times, delete original */
if (g.ind != 0)
close(g.ind);
@@ -3781,6 +4538,9 @@
" -v, --verbose Provide more verbose output",
#endif
" -V --version Show the version of pigz",
+" -X --index file Create or use parallel uncompression index file.",
+" %f and %z are replaced by uncompressed and compressed",
+" file names",
" -z, --zlib Compress to zlib (.zz) instead of gzip format",
" -- All arguments after \"--\" are treated as files"
};
@@ -3859,11 +4619,11 @@
{"LZW", "Z"}, {"ascii", "a"}, {"best", "9"}, {"bits", "Z"},
{"blocksize", "b"}, {"decompress", "d"}, {"fast", "1"}, {"first", "F"},
{"force", "f"}, {"help", "h"}, {"independent", "i"}, {"iterations", "I"},
- {"keep", "k"}, {"license", "L"}, {"list", "l"}, {"maxsplits", "M"},
- {"name", "N"}, {"no-name", "n"}, {"no-time", "T"}, {"oneblock", "O"},
- {"processes", "p"}, {"quiet", "q"}, {"recursive", "r"}, {"rsyncable", "R"},
- {"silent", "q"}, {"stdout", "c"}, {"suffix", "S"}, {"test", "t"},
- {"to-stdout", "c"}, {"uncompress", "d"}, {"verbose", "v"},
+ {"index", "X"}, {"keep", "k"}, {"license", "L"}, {"list", "l"},
+ {"maxsplits", "M"}, {"name", "N"}, {"no-name", "n"}, {"no-time", "T"},
+ {"oneblock", "O"}, {"processes", "p"}, {"quiet", "q"}, {"recursive", "r"},
+ {"rsyncable", "R"}, {"silent", "q"}, {"stdout", "c"}, {"suffix", "S"},
+ {"test", "t"}, {"to-stdout", "c"}, {"uncompress", "d"}, {"verbose", "v"},
{"version", "V"}, {"zip", "K"}, {"zlib", "z"}};
#define NLOPTS (sizeof(longopts) / (sizeof(char *) << 1))
@@ -3903,7 +4663,7 @@
/* if no argument or dash option, check status of get */
if (get && (arg == NULL || *arg == '-')) {
- bad[1] = "bpSIM"[get - 1];
+ bad[1] = "bpSIMX"[get - 1];
throw(EINVAL, "missing parameter after %s", bad);
}
if (arg == NULL)
@@ -3972,6 +4732,7 @@
case 'S': get = 3; break;
case 'T': g.headis &= ~0xa; break;
case 'V': fputs(VERSION, stderr); exit(0);
+ case 'X': g.setdict = 0; get = 6; break;
case 'Z':
throw(EINVAL, "invalid option: LZW output not supported: %s",
bad);
@@ -4001,7 +4762,7 @@
return 0;
}
- /* process option parameter for -b, -p, -S, -I, or -M */
+ /* process option parameter for -b, -p, -S, -I, -M or -X */
if (get) {
size_t n;
@@ -4036,6 +4797,8 @@
g.zopts.numiterations = num(arg); /* optimization iterations */
else if (get == 5)
g.zopts.blocksplittingmax = num(arg); /* max block splits */
+ else if (get == 6)
+ g.index = arg; /* index file */
get = 0;
return 0;
}