/***********************************************************************
* *
* This software is part of the ast package *
* Copyright (c) 1996-2011 AT&T Intellectual Property *
* and is licensed under the *
* Eclipse Public License, Version 1.0 *
* by AT&T Intellectual Property *
* *
* A copy of the License is available at *
* http://www.eclipse.org/org/documents/epl-v10.html *
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
* *
* Information and Software Systems Research *
* AT&T Research *
* Florham Park NJ *
* *
* Glenn Fowler <gsf@research.att.com> *
* Phong Vo <kpv@research.att.com> *
* Doug McIlroy <doug@research.bell-labs.com> *
* *
***********************************************************************/
#pragma prototyped
/*
* sort main
*
* algorithm and interface
*
* Glenn Fowler
* Phong Vo
* AT&T Research
*
* key coders
*
* Doug McIlroy
* Bell Laboratories
*/
static const char usage[] =
"[-n?\n@(#)$Id: sort (AT&T Research) 2010-08-11 $\n]"
USAGE_LICENSE
"[+NAME?sort - sort and/or merge files]"
"[+DESCRIPTION?\bsort\b sorts lines of all the \afiles\a together and "
"writes the result on the standard output. The file name \b-\b means the "
"standard input. If no files are named, the standard input is sorted.]"
"[+?The default sort key is an entire line. Default ordering is "
"lexicographic by bytes in machine collating sequence. The ordering is "
"affected globally by the locale and/or the following options, one or "
"more of which may appear. See \brecsort\b(3) for details.]"
"[+?For backwards compatibility the \b-o\b option is allowed in any file "
"operand position when neither the \b-c\b nor the \b--\b options are "
"specified.]"
"[k:key?Restrict the sort key to a string beginning at \apos1\a and "
"ending at \apos2\a. \apos1\a and \apos2\a each have the form \am.n\a, "
"counting from 1, optionally followed by one or more of the flags "
"\bCMbdfginprZ\b; \bm\b counts fields from the beginning of the line and "
"\bn\b counts characters from the beginning of the field. If any flags "
"are present they override all the global ordering options for this key. "
"If \a.n\a is missing from \apos1\a, it is taken to be 1; if missing "
"from \apos2\a, it is taken to be the end of the field. If \apos2\a is "
"missing, it is taken to be end of line. The second form specifies a "
"fixed record length \areclen\a, and the last form specifies a fixed "
"field at byte position \aposition\a (counting from 1) of \alength\a "
"bytes. The obsolescent \breclen:fieldlen:offset\b (byte offset from 0) "
"is also accepted.]:[pos1[,pos2]]|.reclen|.position.length]]]]]"
"[K:oldkey?Specified in pairs: \b-K\b \apos1\a \b-K\b \apos2\a, where "
"positions count from 0.]# [pos]"
"[R:record|recfmt?Sets the record format to \aformat\a; newlines will be "
"treated as normal characters. The formats are:]:[format]"
"{"
"[+d[\aterminator\a]]?Variable length with record \aterminator\a "
"character, \b\\n\b by default.]"
"[+[f]]\areclen\a?Fixed record length \areclen\a.]"
"[+v[op...]]?Variable length. \bh4o0z2bi\b (4 byte IBM V format "
"descriptor) if \aop\a are omitted. \aop\a may be a combination "
"of:]"
"{"
"[+h\an\a?Header size is \an\a bytes (default 4).]"
"[+o\an\a?Size offset in header is \an\a bytes (default "
"0).]"
"[+z\an\a?Size length is \an\a bytes (default "
"min(\bh\b-\bo\b,2)).]"
"[+b?Size is big-endian (default).]"
"[+l?Size is little-endian (default \bb\b).]"
"[+i?Record length includes header (default).]"
"[+n?Record length does not include header (default "
"\bi\b).]"
"}"
"[+%?If the record format is not otherwise specified, and the "
"any input file name, from left to right, ends with "
"\b%\b\aformat\a or \b%\b\aformat\a\b.\b* then the record format "
"is set to \aformat\a. In addition, the \b-o\b path, if "
"specified and if it does not contain \b%\b and if it names a "
"regular file, is renamed to contain the input \b%\b\aformat\a.]"
"[+-?The first block of the first input file is sampled to check "
"for \bv\b variable length and \bf\b fixed length format "
"records. Not all formats are detected. \bsort\b exits with an "
"error diagnostic if the record format cannot be determined from "
"the sample.]"
"}"
"[b:ignorespace|ignore-leading-blanks?Ignore leading white space (spaces "
"and tabs) in field comparisons.]"
"[d:dictionary?`Phone directory' order: only letters, digits and white "
"space are significant in string comparisons.]"
"[E:codeset|convert?The field data codeset is \acodeset\a or the field "
"data must be converted from the \afrom\a codeset to the \ato\a codeset. "
"The codesets are:]:[codeset|from::to]"
"{\fcodesets\f}"
"[f:fold|ignorecase?Fold lower case letters onto upper case.]"
"[h:scaled|human-readable?Compare numbers scaled with IEEE 1541-2002 "
"suffixes.]"
"[i:ignorecontrol?Ignore characters outside the ASCII range 040-0176 in "
"string comparisons.]"
"[J:shuffle|jumble?Do a random shuffle of the sort keys. \aseed\a "
"specifies a pseudo random number generator seed. A \aseed\a of 0 "
"generates a seed based on time and pid.]#[seed]"
"[n:numeric?An initial numeric string, consisting of optional white "
"space, optional sign, and a nonempty string of digits with optional "
"decimal point, is sorted by value.]"
"[g:floating?Numeric, like \b-n\b, with \be\b-style exponents allowed.]"
"[p:bcd|packed-decimal?Compare packed decimal (bcd) numbers with "
"trailing sign.]"
"[M:months?Compare as month names. The first three characters after "
"optional white space are folded to lower case and compared. Invalid "
"fields compare low to \bjan\b.]"
"[r:reverse|invert?Reverse the sense of comparisons.]"
"[t:tabs?`Tab character' separating fields is \achar\a.]:[tab-char]"
"[c:check?Check that the single input file is sorted according to the "
"ordering rules; give no output on the standard output. If the input "
"is out of sort then write one diagnostic line on the standard error "
"and exit with code \b1\b.]"
"[C:silent-check?Like \b--check\b except no diagnostic is written.]"
"[j:processes|nproc|jobs?Use up to \ajobs\a separate processes to sort "
"the input. The current implementation still uses one process for the "
"final merge phase; improvements are planned.]#[processes]"
"[m:merge?Merge; the input files are already sorted.]"
"[u:unique?Unique. Keep only the first of multiple records that compare "
"equal on all keys. Implies \b-s\b.]"
"[s:stable?Stable sort. When all keys compare equal, preserve input "
"order. The default is \b--nostable\b (\aunstable\a sort): when all "
"keys compare equal, break the tie by using the entire record, ignoring "
"all but the \b-r\b option.]"
"[o:output?Place output in the designated \afile\a instead of on the "
"standard output. This file may be the same as one of the inputs. The "
"\afile\a \b-\b names the standard output. The option may appear among "
"the file arguments, except after \b--\b.]:[output]"
"[l:library?Load the external sort discipline \alibrary\a with optional "
"comma separated \aname=value\a arguments. Libraries are loaded, in left "
"to right order, after the sort method has been "
"initialized.]:[library[,name=value...]]]"
"[T:tempdir?Put temporary files in \atempdir\a.]:[tempdir:=/usr/tmp]"
"[L:list?List the available sort methods. See the \b-x\b option.]"
"[x:method?Specify the sort method to apply:]:[method:=rasp]"
"{\fmethods\f} [v:verbose?Trace the sort progress on the standard "
"error.]"
"[P:plugins?List plugin information for each \afile\a operand in "
"--\astyle\a on the standard error. If no \afile\a operands are "
"specified then the first instance of each \bsort\b plugin installed on "
"\b$PATH\b or a sibling dir on \b$PATH\b is listed. The special "
"\astyle\a \blist\b lists a line on the standard output for each plugin "
"with the name, a tab character, and plugin specific command line "
"options parameterized by \b${style}\b (suitable for \beval\b'ing in "
"\bsh\b(1).)]:[style:=list|man|html|nroff|usage]"
"[Z:zd|zoned-decimal?Compare zoned decimal (ZD) numbers with embedded "
"trailing sign.]"
"[z:size|zip?Suggest using the specified number of bytes of internal "
"store to tune performance. Power of 2 and power of 10 size suffixes are "
"accepted. Type is a single character and may be one of:]:[type[size]]]"
"{"
"[+a?Buffer alignment.]"
"[+b?Input reserve buffer size.]"
"[+c?Input chunk size; sort chunks of this size and disable "
"merge.]"
"[+i?Input buffer size.]"
"[+m?Maximum number of intermediate merge files.]"
"[+p?Input sort size; sort chunks of this size before merge.]"
"[+o?Output buffer size.]"
"[+r?Maximum record size.]"
"[+I?Decompress the input if it is compressed.]"
"[+O?\bgzip\b(1) compress the output.]"
"}"
"[X:test?Enables implementation defined test code. Some or all of these "
"may be disabled.]:[test]"
"{"
"[+dump?List detailed information on the option settings.]"
"[+io?List io file paths.]"
"[+keys?List the canonical key for each record.]"
"[+read?Force input file read by disabling memory mapping.]"
"[+show?Show setup information and exit before sorting.]"
"[+test?Immediatly exit with status 0; used to verify this "
"implementation]"
"}"
"[D:debug?Sets the debug trace level. Higher levels produce more "
"output.]# [level]"
"[S|y?Equivalent to \b-zp\b\asize\a; if \asize\a has no suffix then \bki\b "
"is assumed.]:[size]"
"\n"
"\n[ file ... ]\n"
"\n"
"[+?+\apos1\a -\apos2\a is the classical alternative to \b-k\b, with "
"counting from 0 instead of 1, and pos2 designating next-after-last "
"instead of last character of the key. A missing character count in "
"\apos2\a means 0, which in turn excludes any \b-t\b tab character from "
"the end of the key. Thus +1 -1.3 is the same as \b-k\b 2,2.3 and +1r -3 "
"is the same as \b-k\b 2r,3.]"
"[+?Under option \b-t\b\ax\a fields are strings separated by \ax\a; "
"otherwise fields are non-empty strings separated by white space. White "
"space before a field is part of the field, except under option \b-b\b. "
"A \bb\b flag may be attached independently to \apos1\a and \apos2\a.]"
"[+?When there are multiple sort keys, later keys are compared only "
"after all earlier keys compare equal. Except under option \b-s\b, lines "
"with all keys equal are ordered with all bytes significant. \b-S\b "
"turns off \b-s\b, the last occurrence, left-to-right, takes affect.]"
"[+?Sorting is done by a method determined by the \b-x\b option. \b-L\b "
"lists the available methods. rasp (radix+splay-tree) is the default and "
"current all-around best.]"
"[+?Single-letter options may be combined into a single string, such as "
"\b-cnrt:\b. The option combination \b-di\b and the combination of "
"\b-n\b with any of \b-diM\b are improper. Posix argument conventions "
"are supported.]"
"[+?Options \b-b\b, \b-c\b, \b-d\b, \b-f\b, \b-i\b, \b-k\b, \b-m\b, "
"\b-n\b, \b-o\b, \b-r\b, \b-t\b, and \b-u\b are in the Posix and/or "
"X/Open standards.]"
"[+DIAGNOSTICS?\asort\a comments and exits with non-zero status for "
"various trouble conditions and for disorder discovered under option "
"\b-c\b.]"
"[+SEE ALSO?\bcomm\b(1), \bjoin\b(1), \buniq\b(1), \brecsort\b(3)]"
"[+CAVEATS?The never-documented default \apos1\a=0 for cases such as "
"\bsort -1\b has been abolished. An input file overwritten by \b-o\b is "
"not replaced until the entire output file is generated in the same "
"directory as the input, at which point the input is renamed.]"
;
#include <sfio_t.h>
#include <ast.h>
#include <error.h>
#include <ctype.h>
#include <fs3d.h>
#include <ls.h>
#include <option.h>
#include <recsort.h>
#include <recfmt.h>
#include <sfdcgzip.h>
#include <vmalloc.h>
#include <wait.h>
#include <iconv.h>
#include <dlldefs.h>
#define INMIN (1024) /* min input buffer size */
#define INBRK (64*INMIN) /* default heap increment */
#define INMAX (1024*INBRK) /* max input buffer size */
#define INREC (16*INMIN) /* record begin chunk size */
#define TEST_dump 0x80000000 /* dump the state before sort */
#define TEST_io 0x40000000 /* dump io files */
#define TEST_keys 0x20000000 /* dump keys */
#define TEST_read 0x10000000 /* force sfread() */
#define TEST_show 0x08000000 /* show but don't do */
#define TEST_reserve 0x04000000 /* force sfreserve() */
#define pathstdin(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdin")||streq(s,"/dev/fd/0"))
#define pathstdout(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdout")||streq(s,"/dev/fd/1"))
typedef struct Part_s
{
Sfdisc_t disc; /* sfio discipline */
off_t offset; /* file offset */
off_t size; /* total size at offset */
off_t remain; /* read size remaining */
} Part_t;
typedef struct Job_s
{
off_t offset; /* file part offset */
off_t size; /* file part size */
size_t chunk; /* file part chunk */
int intermediates; /* number of intermediate files */
} Job_t;
typedef struct Sort_s
{
Rskeydisc_t disc; /* rskey discipline */
Rs_t* rec; /* rsopen() context */
Rskey_t* key; /* rskeyopen() context */
Rsdefkey_f defkeyf; /* real defkeyf if TEST_keys */
Sfio_t* tp; /* TEST_keys tmp stream */
Sfio_t* op; /* output stream */
Job_t* jobs; /* multi-proc job table */
char* overwrite; /* -o input overwrite tmp file */
char* buf; /* input buffer */
Sfio_t* opened; /* fileopen() peek stream */
size_t cur; /* input buffer index */
size_t hit; /* input buffer index overflow */
size_t end; /* max input buffer index */
size_t bufsize; /* input reserve buffer size */
off_t total; /* total size of single file */
unsigned long test; /* test bit mask */
int child; /* in child process */
int chunk; /* chunk the input (no merge) */
int hadstdin; /* already has - on input */
int map; /* sfreserve() input */
int mfiles; /* multi-stage files[] count */
int nfiles; /* files[] count */
int xfiles; /* max files[] count */
int preserve; /* rename() tmp output to input */
int single; /* one input file */
int verbose; /* trace main actions */
int zip; /* sfdcgzip SF_* flags */
Sfio_t* files[OPEN_MAX > 68 ? 64 : (OPEN_MAX-4)];
} Sort_t;
/*
* optget() info discipline function
*/
static int
optinfo(Opt_t* op, Sfio_t* sp, const char* s, Optdisc_t* dp)
{
register iconv_list_t* ic;
register int n;
if (streq(s, "codesets"))
{
n = 0;
for (ic = iconv_list(NiL); ic; ic = iconv_list(ic))
if (ic->ccode >= 0)
n += sfprintf(sp, "[%c:%s?%s]", ic->match[ic->match[0] == '('], ic->name, ic->desc);
return n;
}
else if (streq(s, "methods"))
return rskeylist(NiL, sp, 1);
return 0;
}
/*
* handle RS_VERIFY event
*/
static int
verify(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
{
if (op == RS_VERIFY)
error(3, "disorder at record %lld", (Sflong_t)((Rsobj_t*)data)->order);
return 0;
}
static int
verify_silent(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
{
if (op == RS_VERIFY)
exit(1);
return 0;
}
/*
* return read stream for path
*/
static Sfio_t*
fileopen(register Sort_t* sp, const char* path)
{
Sfio_t* fp;
if (fp = sp->opened)
sp->opened = 0;
else
{
if (pathstdin(path))
{
if (sp->hadstdin)
error(3, "%s: can only read once", path);
sp->hadstdin = 1;
fp = sfstdin;
}
else if (!(fp = sfopen(NiL, path, "r")))
error(ERROR_SYSTEM|3, "%s: cannot open", path);
if (rsfileread(sp->rec, fp, path))
{
if (fp != sfstdin)
sfclose(fp);
return 0;
}
sfset(fp, SF_SHARE, 0);
if (sp->zip & SF_READ)
sfdcgzip(fp, 0);
}
return fp;
}
/*
* prevent ERROR_USAGE|4 messages from exiting
*/
static void
noexit(int code)
{
}
/*
* list info for one plugin on the standard error
*/
static void
showlib(Sort_t* sp, Rskey_t* kp, const char* name, const char* style)
{
char* args;
char buf[128];
if (style)
sfsprintf(args = buf, sizeof(buf), "%s,%s", name, style);
else
args = (char*)name;
if (!rslib(sp->rec, kp, args, RS_IGNORE) && !style)
sfprintf(sfstdout, "%s\t--library=%s,${style}\n", name, name);
}
/*
* list info for all [selected] plugins on the standard error
*/
static int
showplugins(Sort_t* sp, Rskey_t* kp, const char* style)
{
Dllscan_t* dls;
Dllent_t* dle;
char* name;
void (*oexit)(int);
if (streq(style, "list"))
style = 0;
else
{
oexit = error_info.exit;
error_info.exit = noexit;
}
if (*kp->input)
while (name = *kp->input++)
showlib(sp, kp, name, style);
else if (dls = dllsopen("sort", NiL, NiL))
{
while (dle = dllsread(dls))
showlib(sp, kp, dle->name, style);
dllsclose(dls);
}
if (style)
error_info.exit = oexit;
return 0;
}
struct Lib_s; typedef struct Lib_s Lib_t;
struct Lib_s
{
Lib_t* next;
char* name;
};
/*
* process argv as in sort(1)
*/
static int
parse(register Sort_t* sp, char** argv)
{
register Rskey_t* key = sp->key;
register int n;
register char* s;
char* e;
char* p;
char** a;
char** v;
size_t z;
int i;
int map;
char* plugins = 0;
Lib_t* firstlib = 0;
Lib_t* lastlib = 0;
Lib_t* lib;
Recfmt_t r;
int obsolescent = 1;
char opt[64];
Optdisc_t optdisc;
struct stat st;
optinit(&optdisc, optinfo);
for (;;)
{
switch (optget(argv, usage))
{
case 0:
break;
case 'c':
case 'C':
obsolescent = 0;
key->meth = Rsverify;
key->disc->events = RS_VERIFY;
key->disc->eventf = opt_info.option[1] == 'C' ? verify_silent : verify;
continue;
case 'E':
case 'J':
sfsprintf(opt, sizeof(opt), "%c%s", opt_info.option[1], opt_info.arg);
if (rskeyopt(key, opt, 1))
return 0;
continue;
case 'j':
key->nproc = opt_info.num;
continue;
case 'k':
if (rskey(key, opt_info.arg, 0))
return -1;
continue;
case 'l':
if (!(lib = newof(0, Lib_t, 1, 0)))
error(ERROR_SYSTEM|3, "out of space");
lib->name = opt_info.arg;
if (lastlib)
lastlib = lastlib->next = lib;
else
firstlib = lastlib = lib;
continue;
case 'm':
key->merge = !!opt_info.num;
continue;
case 'o':
key->output = opt_info.arg;
continue;
case 's':
if (opt_info.num)
key->type &= ~RS_DATA;
else
key->type |= RS_DATA;
continue;
case 't':
if (key->tab[0])
error(1, "%s: %s conflicts with %s", opt_info.option, *opt_info.arg, key->tab);
if ((n = mbsize(opt_info.arg)) < 1)
{
error(1, "%s: %s: invalid tab character", opt_info.option, opt_info.arg);
n = 0;
}
if (*(opt_info.arg + n) || n >= sizeof(key->tab))
error(1, "%s: %s: single character expected", opt_info.option, opt_info.arg);
memcpy(key->tab, opt_info.arg, n);
key->tab[n] = 0;
continue;
case 'u':
key->type &= ~RS_DATA;
key->type |= RS_UNIQ;
continue;
case 'v':
key->verbose = !!opt_info.num;
sp->verbose = key->verbose || (key->test & TEST_show);
continue;
case 'x':
if (!(key->meth = rskeymeth(key, opt_info.arg)))
error(2, "%s: unknown method", opt_info.arg);
continue;
case 'z':
if (isalpha(n = *(s = opt_info.arg)))
s++;
else
n = 'r';
size:
z = strton(s, &e, NiL, 1);
if (*e == '%')
{
error(2, "%s %c%s: %% not supported -- do you really want that much memory?", opt_info.option, n, s);
return -1;
}
if (*e || z < ((n == 'm' || n == 'o' || n == 'r' || isupper(n)) ? 0 : 512))
{
error(2, "%s %c%s: invalid size", opt_info.option, n, s);
return -1;
}
switch (n)
{
case 'a':
key->alignsize = z;
break;
case 'b':
sp->bufsize = z;
break;
case 'c':
sp->chunk = 1;
key->alignsize = key->insize = z;
break;
case 'i':
key->insize = z;
break;
case 'm':
if (z <= 0 || z > elementsof(sp->files))
z = elementsof(sp->files);
sp->xfiles = z;
break;
case 'p':
key->procsize = z;
break;
case 'o':
key->outsize = z;
break;
case 'r':
key->recsize = z;
break;
case 'I':
sp->zip |= SF_READ;
break;
case 'O':
sp->zip |= SF_WRITE;
break;
}
continue;
case 'D':
error_info.trace = -opt_info.num;
continue;
case 'K':
if (opt_info.offset)
{
opt_info.offset = 0;
opt_info.index++;
}
if (rskey(key, opt_info.arg, *opt_info.option))
return -1;
continue;
case 'L':
rskeylist(key, sfstdout, 0);
exit(0);
case 'P':
plugins = opt_info.arg;
continue;
case 'R':
key->disc->data = recstr(opt_info.arg, &e);
if (*e)
{
error(2, "%s: invalid record format", opt_info.arg);
return -1;
}
continue;
case 'S':
case 'y':
n = 'p';
s = opt_info.arg;
if (*s && *(e = s + strlen(s) - 1) != '%' && !isalpha(*e))
{
sfsprintf(opt, sizeof(opt), "%ski", s);
s = opt;
}
goto size;
case 'T':
pathtemp(NiL, 0, opt_info.arg, "/TMPPATH", NiL);
continue;
case 'X':
s = opt_info.arg;
opt_info.num = strton(s, &e, NiL, 1);
if (*e)
{
if (streq(s, "dump"))
opt_info.num = TEST_dump;
else if (streq(s, "io"))
opt_info.num = TEST_io;
else if (streq(s, "keys"))
opt_info.num = TEST_keys;
else if (streq(s, "read"))
opt_info.num = TEST_read;
else if (streq(s, "reserve"))
opt_info.num = TEST_reserve;
else if (streq(s, "show"))
opt_info.num = TEST_show;
else if (streq(s, "test"))
{
sfprintf(sfstdout, "ok\n");
exit(0);
}
else
error(1, "%s: unknown test", s);
}
if (*opt_info.option == '+')
key->test &= ~opt_info.num;
else
key->test |= opt_info.num;
sp->test = key->test;
sp->verbose = key->verbose || (key->test & TEST_show);
continue;
case '?':
error(ERROR_USAGE|4, "%s", opt_info.arg);
return 1;
case ':':
error(2, "%s", opt_info.arg);
return -1;
default:
opt[0] = opt_info.option[1];
opt[1] = 0;
if (rskeyopt(key, opt, 1))
return 0;
continue;
}
break;
}
argv += opt_info.index;
if (obsolescent && (opt_info.index <= 1 || !streq(*(argv - 1), "--")))
{
/*
* check for obsolescent `-o output' after first file operand
*/
a = v = argv;
while (s = *a++)
{
if (*s == '-' && *(s + 1) == 'o')
{
if (!*(s += 2) && !(s = *a++))
{
error(2, "-o: output argument expected");
break;
}
key->output = s;
}
else
*v++ = s;
}
*v = 0;
}
key->input = argv;
/*
* disciplines have the opportunity to modify key info
*/
while (lib = firstlib)
{
if (rslib(sp->rec, key, lib->name, 0))
return 1;
firstlib = firstlib->next;
free(lib);
}
/*
* plugins list bails early
*/
if (plugins)
exit(showplugins(sp, key, plugins));
/*
* record format chicanery
*/
if (map = RECTYPE(key->disc->data) == REC_method && REC_M_INDEX(key->disc->data) == REC_M_path)
for (n = 0, i = -1; p = key->input[n]; n++)
if (s = strrchr(p, '%'))
{
r = recstr(s + 1, &e);
if (!*e || *e == '.' && e > (s + 1))
{
if (r != key->disc->data && i >= 0 && (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_ATTRIBUTES(r) != REC_V_ATTRIBUTES(key->disc->data)))
{
error(2, "%s: format %s incompatible with %s format %s", p, fmtrec(r, 0), key->input[i], fmtrec(key->disc->data, 0));
return 1;
}
if (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_SIZE(key->disc->data) < REC_V_SIZE(r))
key->disc->data = r;
i = n;
}
}
if (RECTYPE(key->disc->data) == REC_method && ((n = REC_M_INDEX(key->disc->data)) == REC_M_path || n == REC_M_data))
{
if ((sp->opened = fileopen(sp, key->input[0])) && (s = sfreserve(sp->opened, SF_UNBOUND, SF_LOCKR)))
{
struct stat st;
z = sfvalue(sp->opened);
if (fstat(sffileno(sp->opened), &st) || st.st_size < z)
st.st_size = 0;
key->disc->data = recfmt(s, z, st.st_size);
sfread(sp->opened, s, 0);
}
else
{
z = sp->opened ? sfvalue(sp->opened) : -1;
key->disc->data = REC_N_TYPE();
}
if (z && key->disc->data == REC_N_TYPE())
error(3, "%s: record format cannot be determined from data sample", key->input[0]);
}
if (RECTYPE(key->disc->data) == REC_fixed)
key->fixed = REC_F_SIZE(key->disc->data);
if (map && key->output && key->disc->data != REC_N_TYPE() && (stat(key->output, &st) || S_ISREG(st.st_mode)))
{
if (p = strrchr(key->output, '/'))
s = p + 1;
else
s = key->output;
if (!strchr(s, '%'))
{
p = key->output;
if (!(e = strrchr(s, '.')))
e = s + strlen(s);
if (RECTYPE(key->disc->data) == REC_variable && !REC_V_SIZE(key->disc->data))
key->disc->data |= ((1<<15)-1);
if (!(s = strdup(sfprints("%-*.*s%%%s%s", e - p, e - p, p, fmtrec(key->disc->data, 1), e))))
error(ERROR_SYSTEM|3, "out of space");
key->output = s;
if (sp->verbose)
error(0, "%s rename output %s => %s", error_info.id, p, s);
}
}
if (sp->verbose)
error(0, "%s %s record format", error_info.id, fmtrec(key->disc->data, 0));
return error_info.errors != 0;
}
/*
* dump keys to stderr
*/
static ssize_t
dumpkey(Rs_t* rs, unsigned char* dat, size_t datlen, unsigned char* key, size_t keylen, Rsdisc_t* disc)
{
Sort_t* sp = (Sort_t*)RSKEYDISC(disc);
ssize_t n;
int i;
char buf[2];
if ((n = (*sp->defkeyf)(rs, dat, datlen, key, keylen, disc)) > 0)
{
buf[1] = 0;
for (i = 0; i < n; i++)
{
buf[0] = key[i];
sfputr(sp->tp, fmtesc(buf), -1);
}
sfprintf(sfstderr, "key: %s\n", sfstruse(sp->tp));
}
return n;
}
/*
* initialize sp from argv
*/
static int
init(register Sort_t* sp, Rskeydisc_t* dp, char** argv)
{
register Rskey_t* key;
register char* s;
register char** p;
char* t;
int n;
unsigned long x;
unsigned long z;
size_t fixed;
struct stat is;
struct stat os;
memset(sp, 0, sizeof(*sp));
sp->xfiles = elementsof(sp->files);
sfset(sfstdout, SF_SHARE, 0);
sfset(sfstderr, SF_SHARE, 0);
Vmdcsbrk->round = INBRK;
dp->version = RSKEY_VERSION;
dp->flags = 0;
dp->errorf = errorf;
if (!(sp->key = key = rskeyopen(dp, NiL)) || !(sp->rec = rsnew(key->disc)))
return -1;
z = key->insize = 2 * INMAX;
#if 0
if (conformance(0, 0))
#endif
key->type |= RS_DATA;
if ((n = strtol(astconf("PAGESIZE", NiL, NiL), &t, 0)) > 0 && !*t)
key->alignsize = n;
if ((n = parse(sp, argv)) || rskeyinit(key))
{
rskeyclose(key);
if (n < 0)
error(ERROR_USAGE|4, "%s", optusage(NiL));
return -1;
}
/*
* finalize the buffer dimensions
*/
if ((x = key->insize) != z)
z = 0;
if (x > INMAX)
x = INMAX;
else if (x < INMIN && !sp->chunk)
x = INMIN;
if (sp->single = !key->input[1])
{
if (!(sp->opened = fileopen(sp, key->input[0])))
error(ERROR_SYSTEM|3, "%s: cannot open", key->input[0]);
if (fstat(sffileno(sp->opened), &is))
error(ERROR_SYSTEM|3, "%s: cannot stat", key->input[0]);
if (!S_ISREG(is.st_mode) || sp->opened->disc) /* XXX: need sfio call to test if any disc pushed */
{
sp->total = 0;
sp->test |= TEST_read;
}
else if (x > (sp->total = is.st_size))
x = sp->total;
}
else
sp->test |= TEST_read;
if (sp->zip & SF_READ)
sp->test |= TEST_read;
fixed = key->fixed;
if ((sp->test & TEST_reserve) || !(sp->test & TEST_read))
{
sp->map = 1;
if (z)
x = key->procsize;
if (fixed)
x += fixed - x % fixed;
}
else
{
if (z)
x = key->procsize;
for (;;)
{
if (fixed)
x += fixed - x % fixed;
if (sp->buf = (char*)vmalign(Vmheap, x, key->alignsize))
break;
if ((x >>= 1) < INMIN)
error(ERROR_SYSTEM|3, "out of space");
}
sp->hit = x - key->alignsize;
}
if (sp->test & TEST_keys)
{
if (!key->disc->defkeyf)
error(2, "no key function to intercept");
else if (!(sp->tp = sfstropen()))
error(ERROR_SYSTEM|3, "out of space");
else
{
sp->defkeyf = key->disc->defkeyf;
key->disc->defkeyf = dumpkey;
}
}
if (key->nproc > 1)
{
off_t offset;
off_t total;
off_t size;
size_t chunk;
int i;
Job_t* jp;
if (!sp->map || pathstdin(key->input[0]))
{
uno:
key->nproc = 1;
}
else if ((n = (sp->total + key->procsize - 1) / (key->procsize)) <= 1)
goto uno;
else
{
if (n < key->nproc)
key->nproc = n;
else
n = key->nproc;
if (!(sp->jobs = vmnewof(Vmheap, 0, Job_t, n, 0)))
goto uno;
size = (sp->total + n - 1) / n;
if (fixed)
{
if (size % fixed)
size += fixed - size % fixed;
i = (size + x - 1) / x;
if (i * n > sp->xfiles)
{
error(1, "multi-process multi-stage not implemented; falling back to one processor");
goto uno;
}
chunk = size / i;
if (chunk % fixed)
chunk += fixed - chunk % fixed;
size = chunk * i;
offset = 0;
total = sp->total;
for (jp = sp->jobs; jp < sp->jobs + n; jp++)
{
jp->offset = offset;
if (size > total)
size = total;
total -= (jp->size = size);
jp->chunk = chunk;
jp->intermediates = i;
offset += size;
}
if (key->procsize > chunk)
key->procsize = chunk;
else
{
size = key->procsize;
i = (chunk + size - 1) / size;
size = chunk / i;
if (size % fixed)
size += fixed - size % fixed;
key->procsize = size;
}
}
else
{
register char* s;
register char* t;
register char* b;
off_t ideal;
off_t scan;
char* file;
Sfio_t* ip;
i = (size + x - 1) / x;
if (i * n > sp->xfiles)
{
error(1, "multi-process multi-stage not implemented; falling back to one processor");
goto uno;
}
chunk = (size + i - 1) / i;
size = ideal = chunk * i;
if ((scan = INREC) >= ideal)
scan = (ideal / 32) * 4;
offset = 0;
total = sp->total;
file = key->input[0];
if (!(ip = fileopen(sp, file)))
error(ERROR_SYSTEM|3, "%s: cannot read", file);
for (jp = sp->jobs; jp < sp->jobs + n; jp++)
{
jp->offset = offset;
if (((size = ideal) + scan) >= total)
size = total;
else
{
/*UNDENT...*/
/*
* snoop around for the closest record boundary
*/
size -= scan / 2;
if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
error(ERROR_SYSTEM|3, "%s: record boundary seek error at offset %lld", file, (Sflong_t)offset + size);
if (!(b = (char*)sfreserve(ip, scan, 0)))
error(ERROR_SYSTEM|3, "%s: record boundary read error at offset %lld", file, (Sflong_t)offset + size);
s = t = b + scan / 2 - 1;
while (*s++ != '\n')
{
if (t < b)
{
bigger:
if (((size += scan) + offset) >= (total - scan))
error(3, "%s: monster record at offset %lld", (Sflong_t)offset);
if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
error(ERROR_SYSTEM|3, "%s: record boundary input seek error at %lld", file, (Sflong_t)offset + size);
if (!(b = (char*)sfreserve(ip, scan, 0)))
error(ERROR_SYSTEM|3, "%s: record boundary read error at %lld", file, (Sflong_t)offset + size);
t = (s = b) + scan;
do
{
if (s >= t)
goto bigger;
} while (*s++ != '\n');
break;
}
if (*t-- == '\n')
{
s = t + 2;
break;
}
}
size += (s - b);
/*...INDENT*/
}
total -= (jp->size = size);
jp->chunk = (size + i - 1) / i;
if (jp->chunk > chunk)
chunk = jp->chunk;
jp->intermediates = i;
offset += size;
}
if (rsfileclose(sp->rec, ip))
return -1;
sfclose(ip);
key->procsize = (key->procsize > chunk) ? chunk : chunk / ((chunk + key->procsize - 1) / key->procsize);
}
}
}
key->insize = sp->end = x;
/*
* check the output file for clash with the input files
*/
n = stat("/dev/null", &is);
if (pathstdout(key->output))
{
key->output = "/dev/stdout";
sp->op = sfstdout;
if (!n && !fstat(sffileno(sp->op), &os) && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
key->type |= RS_IGNORE;
}
else if (key->input)
{
if (!stat(key->output, &os))
{
if (!n && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
key->type |= RS_IGNORE;
else if (eaccess(key->output, W_OK))
error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
else if (!fs3d(FS3D_TEST) || !iview(&os))
{
p = key->input;
while (s = *p++)
if (!pathstdin(s))
{
if (stat(s, &is))
error(ERROR_SYSTEM|2, "%s: not found", s);
else if (os.st_dev == is.st_dev && os.st_ino == is.st_ino)
{
if (t = strrchr(key->output, '/'))
{
s = key->output;
*t = 0;
}
else
s = ".";
if (sp->overwrite = pathtemp(NiL, 0, s, error_info.id, &n))
sp->op = sfnew(sfstdout, NiL, SF_UNBOUND, n, SF_WRITE);
if (t) *t = '/';
if (!sp->op || fstat(n, &is))
error(ERROR_SYSTEM|3, "%s: cannot create overwrite file %s", key->output, sp->overwrite);
if (os.st_uid != is.st_uid || os.st_gid != is.st_gid)
sp->preserve = 1;
break;
}
}
}
}
if (!sp->overwrite && !(sp->op = sfopen(sfstdout, key->output, "w")))
error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
}
if (rsfilewrite(sp->rec, sp->op, key->output))
return -1;
if (key->outsize > 0)
sfsetbuf(sp->op, NiL, key->outsize);
if (sp->zip & SF_WRITE)
sfdcgzip(sp->op, 0);
/*
* finally ready for recsort now
*/
if (rsinit(sp->rec, key->meth, key->procsize, key->type, key))
{
error(ERROR_SYSTEM|2, "sort library initialization error");
rskeyclose(key);
return -1;
}
if (sp->rec->meth->type == RS_MTCOPY)
sp->chunk = 1;
if (sp->test & TEST_io)
{
for (n = 0; s = key->input[n]; n++)
error(0, "%s input[%d]\t\"%s\"", error_info.id, n, s);
if (s = key->output)
error(0, "%s output\t\"%s\"", error_info.id, s);
}
return 0;
}
/*
* close sp->files and push fp if not 0
*/
static void
clear(register Sort_t* sp, Sfio_t* fp)
{
register int i;
for (i = fp ? sp->mfiles : 0; i < sp->nfiles; i++)
{
rstempclose(sp->rec, sp->files[i]);
sp->files[i] = 0;
}
if (fp)
{
sp->files[sp->mfiles++] = fp;
sp->nfiles = sp->mfiles;
if (sp->mfiles >= (sp->xfiles - 1))
sp->mfiles = 0;
}
else
sp->nfiles = sp->mfiles = 0;
}
/*
* flush the intermediate data
* r is the partial record offset
* updated r is returned
*/
static ssize_t
flush(register Sort_t* sp, register size_t r)
{
register Sfio_t* fp;
register size_t n;
register size_t m;
register size_t b;
if (sp->chunk)
{
/*
* skip merge and output sorted chunk
*/
if (rswrite(sp->rec, sp->op, RS_OTEXT))
{
if (!error_info.errors)
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
return -1;
}
}
else if (sp->rec->meth->type != RS_MTVERIFY)
{
/*
* write to an intermediate file and rewind for rsmerge
*/
if (!(fp = sp->files[sp->nfiles]))
{
if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
error(ERROR_SYSTEM|3, "cannot create intermediate sort file %d", sp->nfiles);
sp->files[sp->nfiles] = fp;
}
sp->nfiles++;
if (sp->verbose)
error(0, "%s write intermediate", error_info.id);
if (rswrite(sp->rec, fp, 0))
{
error(ERROR_SYSTEM|2, "intermediate sort file write error");
return -1;
}
if (rstempread(sp->rec, fp))
{
error(ERROR_SYSTEM|2, "intermediate sort file rewind error");
return -1;
}
/*
* multi-stage merge when open file limit exceeded
*/
if (sp->nfiles >= sp->xfiles)
{
if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
error(ERROR_SYSTEM|3, "cannot create intermediate merge file");
if (sp->verbose)
error(0, "%s merge multi-stage intermediate", error_info.id);
if (rsmerge(sp->rec, fp, sp->files + sp->mfiles, sp->nfiles - sp->mfiles, 0))
{
error(ERROR_SYSTEM|2, "intermediate merge file write error");
return -1;
}
if (rstempread(sp->rec, fp))
{
error(ERROR_SYSTEM|2, "intermediate merge file rewind error");
return -1;
}
clear(sp, fp);
}
}
/*
* slide over partial record data so the next read is aligned
*/
if (!sp->map && (m = sp->cur - r))
{
n = roundof(m, sp->key->alignsize) - m;
if (n < r)
{
m = n;
while (r < sp->cur)
sp->buf[n++] = sp->buf[r++];
sp->cur = n;
}
else
{
b = r;
r += m;
n += m;
sp->cur = n;
while (r > b)
sp->buf[--n] = sp->buf[--r];
m = n;
}
}
else
m = sp->cur = 0;
return m;
}
/*
* input the records for file ip
*/
static int
input(register Sort_t* sp, Sfio_t* ip, const char* name)
{
register ssize_t n;
register ssize_t p;
register ssize_t m;
register ssize_t r;
size_t z;
char* b;
int c;
char del[2];
/*
* align the read buffer and
* loop on insize chunks
*/
error_info.file = ip == sfstdin ? (char*)0 : (char*)name;
m = -1;
z = 0;
if (sp->bufsize)
sfsetbuf(ip, NiL, sp->bufsize);
else if (sfsize(ip) > SF_BUFSIZE)
{
if (sp->map)
sfsetbuf(ip, NiL, z = sp->end);
else
sfsetbuf(ip, NiL, 0);
}
r = sp->cur = roundof(sp->cur, sp->key->alignsize);
p = 0;
for (;;)
{
if (sp->cur > sp->hit)
{
if (sp->single && !sp->nfiles && sp->total == (sp->map ? 0 : p))
break;
if ((r = flush(sp, r)) < 0)
return -1;
}
if (!sp->map)
n = sfeof(ip) ? 0 : sfread(ip, sp->buf + sp->cur, sp->end - sp->cur);
else
{
sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
n = sfvalue(ip);
if (!sp->buf)
{
if (m < 0 && n < -m && z == sp->end)
{
sfsetbuf(ip, NiL, z = 2 * sp->end);
sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
n = sfvalue(ip);
if (sp->verbose && n)
error(0, "%s buffer boundary expand to %I*d", error_info.id, sizeof(n), n);
}
if (!sp->buf && n > 0 && !(sp->buf = sfreserve(ip, n, SF_LOCKR)))
n = -1;
}
}
if (n <= 0)
{
if (n < 0)
error(ERROR_SYSTEM|3, "read error");
if (sp->cur <= r)
break;
if (sp->key->fixed)
{
error(1, "incomplete record length=%lld", (Sflong_t)(sp->cur - r));
break;
}
if (RECTYPE(sp->key->disc->data) == REC_delimited && sp->buf[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
{
sp->buf[sp->cur++] = c;
if (c == '\n')
error(1, "newline appended");
else
{
del[0] = c;
del[1] = 0;
error(1, "%s appended", fmtquote(del, "'", NiL, 1, 0));
}
}
}
sp->cur += n;
process:
if (sp->verbose && !sp->child)
error(ERROR_PROMPT, "%s process %lld ->", error_info.id, (Sflong_t)(sp->cur - r));
if ((p = rsprocess(sp->rec, sp->buf + r, sp->cur - r)) < 0)
error(ERROR_SYSTEM|3, "sort error");
if (sp->verbose)
{
if (sp->child)
error(0, "%s process %lld -> %lld", error_info.id, (Sflong_t)(sp->cur - r), (Sflong_t)p);
else
error(0, " %lld", (Sflong_t)p);
}
if (sp->map)
{
if (sp->map > 2)
break;
sfread(ip, sp->buf, p);
if (p)
{
m = -(n - p + 1);
if (((sp->total -= p) / 3) < (sp->end / 2) && sp->total > sp->end)
{
if ((r = flush(sp, r)) < 0)
return -1;
sfsetbuf(ip, NiL, sp->total);
}
}
else if (sp->map == 1)
{
sp->map++;
m = -(n + 1);
}
else if (n > sp->end)
{
error(2, "monster record", n, sp->end, sp->cur, p);
break;
}
else if (sp->key->fixed)
{
error(1, "incomplete record length=%ld", n - p);
break;
}
else
{
sp->cur = n - p;
if (!(b = vmnewof(Vmheap, 0, char, sp->cur, 1)))
error(ERROR_SYSTEM|3, "out of space");
memcpy(b, sp->buf + p, sp->cur);
if (RECTYPE(sp->key->disc->data) == REC_delimited && b[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
{
b[sp->cur++] = '\n';
if (c == '\n')
error(1, "newline appended");
else
{
del[0] = c;
del[1] = 0;
error(1, "%s appended", fmtquote(del, "'", "'", 1, 0));
}
}
sp->buf = b;
sp->map++;
goto process;
}
}
else
r += p;
}
error_info.file = 0;
return 0;
}
/*
* sfio part discipline read
*/
static ssize_t
partread(Sfio_t* fp, Void_t* buf, size_t size, Sfdisc_t* dp)
{
register Part_t* pp = (Part_t*)dp;
if (pp->remain <= 0)
return 0;
if (size > pp->remain)
size = pp->remain;
pp->remain -= size;
return sfrd(fp, buf, size, dp);
}
/*
* sfio part discipline seek
*/
static Sfoff_t
partseek(Sfio_t* fp, Sfoff_t lloffset, int op, Sfdisc_t* dp)
{
register Part_t* pp = (Part_t*)dp;
off_t offset = lloffset;
switch (op)
{
case SEEK_SET:
offset += pp->offset;
break;
case SEEK_CUR:
offset += pp->offset;
break;
case SEEK_END:
offset = pp->offset + pp->size - offset;
op = SEEK_SET;
break;
}
if ((offset = sfsk(fp, offset, op, dp)) >= 0)
{
offset -= pp->offset;
pp->remain = pp->size - offset;
}
return offset;
}
/*
* job control
* requires single named input file
* no multi-stage merge
*/
static void
jobs(register Sort_t* sp)
{
register Job_t* jp;
register Job_t* xp;
register int i;
register int j;
register int f;
int status;
char* file;
Sfio_t* ip;
Part_t part;
char id[32];
sp->single = 0;
if (sp->verbose)
error(0, "%s %d processes %lld total", error_info.id, sp->key->nproc, (Sflong_t)sp->total);
xp = sp->jobs + sp->key->nproc;
if (sp->test & TEST_show)
{
for (jp = sp->jobs; jp < xp; jp++)
error(0, "%s#%d pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, jp - sp->jobs + 1, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
exit(0);
}
f = 0;
for (jp = sp->jobs; jp < xp; jp++)
for (i = 0; i < jp->intermediates; i++)
if (!(sp->files[f++] = rstempwrite(sp->rec, (Sfio_t*)0)))
error(ERROR_SYSTEM|3, "cannot create intermediate file %d", i);
part.disc.readf = partread;
part.disc.writef = 0;
part.disc.seekf = partseek;
part.disc.exceptf = 0;
part.disc.disc = 0;
file = sp->key->input[0];
j = 0;
for (jp = sp->jobs; jp < xp; jp++)
{
ip = fileopen(sp, file);
switch (fork())
{
case -1:
error(ERROR_SYSTEM|3, "not enough child processes");
case 0:
sp->child = 1;
sfsprintf(id, sizeof(id), "%s#%d", error_info.id, jp - sp->jobs + 1);
error_info.id = id;
sp->end = jp->chunk;
part.offset = jp->offset;
sp->total = part.size = part.remain = jp->size;
sfdisc(ip, &part.disc);
for (i = 0; i < jp->intermediates; i++)
sp->files[i] = sp->files[j++];
while (i < f)
sp->files[i++] = 0;
if (sp->verbose)
error(0, "%s pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
exit(input(sp, ip, file) < 0);
}
if (rsfileclose(sp->rec, ip))
exit(1);
sfclose(ip);
j += jp->intermediates;
}
sp->nfiles = f;
i = 0;
j = sp->key->nproc;
while (j > 0)
{
if (wait(&status) != -1)
{
if (status)
i++;
j--;
}
else if (errno != EINTR)
{
error(ERROR_SYSTEM|3, "%d process%s did not complete", j, j == 1 ? "" : "es");
break;
}
}
if (i)
error(3, "%d child process%s failed", i, i == 1 ? "" : "es");
}
/*
* all done
*/
static void
done(register Sort_t* sp)
{
while (rsdisc(sp->rec, NiL, RS_POP));
if ((sfsync(sp->op) || sp->op != sfstdout && rsfileclose(sp->rec, sp->op)) && !error_info.errors)
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
if (rsclose(sp->rec))
error(2, "sort error");
if (sp->map > 2)
vmfree(Vmheap, sp->buf);
/*
* if the output would have overwritten an input
* file now is the time to commit to it
*/
if (sp->overwrite)
{
if (error_info.errors)
remove(sp->overwrite);
else if (sp->preserve)
{
Sfio_t* ip;
Sfio_t* op;
if (ip = sfopen(NiL, sp->overwrite, "r"))
{
if (op = sfopen(NiL, sp->key->output, "w"))
{
if ((sfmove(ip, op, SF_UNBOUND, -1) < 0 || sfclose(op) || !sfeof(ip)) && !error_info.errors)
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
sfclose(op);
}
else
error(ERROR_SYSTEM|2, "%s: cannot write", sp->key->output);
sfclose(ip);
remove(sp->overwrite);
}
else
error(ERROR_SYSTEM|2, "%s: cannot read", sp->overwrite);
sp->preserve = 0;
}
else if (remove(sp->key->output) || rename(sp->overwrite, sp->key->output))
error(ERROR_SYSTEM|2, "%s: cannot overwrite", sp->key->output);
free(sp->overwrite);
sp->overwrite = 0;
}
if (rskeyclose(sp->key))
error(2, "sort key error");
}
int
main(int argc, char** argv)
{
register char* s;
register Sfio_t* fp;
char** merge;
Sort_t sort;
error_info.id = "sort";
if (init(&sort, &sort.disc, argv))
exit(1);
if (sort.test & TEST_dump)
{
sfprintf(sfstderr, "main\n\tintermediates=%d\n", sort.xfiles);
rskeydump(sort.key, sfstderr);
}
if (sort.key->type & RS_CAT)
{
while (s = *sort.key->input++)
{
fp = fileopen(&sort, s);
if (sfmove(fp, sfstdout, SF_UNBOUND, -1) < 0 || !sfeof(fp))
error(ERROR_SYSTEM|2, "%s: read error", s);
if (sferror(sfstdout) || rsfileclose(sort.rec, fp))
break;
}
}
else
{
merge = sort.key->merge && sort.key->input[0] && sort.key->input[1] ? sort.key->input : (char**)0;
fp = 0;
if (sort.jobs)
jobs(&sort);
else if (sort.test & TEST_show)
exit(0);
else
while (s = *sort.key->input++)
{
fp = fileopen(&sort, s);
if (merge)
{
if (sort.nfiles >= sort.xfiles)
{
clear(&sort, NiL);
sort.key->input = merge;
merge = 0;
if (fp != sfstdin)
sfclose(fp);
fp = 0;
continue;
}
sort.files[sort.nfiles++] = fp;
}
else if (input(&sort, fp, s) < 0)
break;
else if (fp != sfstdin && !sort.map)
{
sfclose(fp);
fp = 0;
}
}
if (sort.nfiles)
{
if (sort.cur && flush(&sort, sort.cur) < 0)
return 1;
if (sort.verbose)
error(0, "%s merge text", error_info.id);
if (rsmerge(sort.rec, sort.op, sort.files, sort.nfiles, merge ? RS_TEXT : RS_OTEXT))
error(ERROR_SYSTEM|3, "merge error");
clear(&sort, NiL);
}
else
{
if (sort.verbose)
error(0, "%s write text", error_info.id);
if (rswrite(sort.rec, sort.op, RS_OTEXT) && !error_info.errors)
error(ERROR_SYSTEM|3, "%s: write error", sort.key->output);
if (fp && fp != sfstdin)
sfclose(fp);
}
}
done(&sort);
exit(error_info.errors != 0);
}