/***********************************************************************
* *
* This software is part of the ast package *
* Copyright (c) 1996-2011 AT&T Intellectual Property *
* and is licensed under the *
* Eclipse Public License, Version 1.0 *
* by AT&T Intellectual Property *
* *
* A copy of the License is available at *
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
* *
* Information and Software Systems Research *
* AT&T Research *
* Florham Park NJ *
* *
* Phong Vo <kpv@research.att.com> *
* Glenn Fowler <gsf@research.att.com> *
* *
***********************************************************************/
#include "rshdr.h"
/* Merging streams of sorted records.
** Strategy:
** 1. Each stream is represented by a current least records.
** A cache of read-ahead records are kept for each stream.
** 2. Streams are sorted by representative records and by positions
** for stability.
**
** Written by Kiem-Phong Vo (07/08/96)
*/
typedef struct _merge_s
Sfio_t* f; /* input stream */
} Merge_t;
}
} \
} \
}
do \
} \
} \
} while (0)
} \
}
/* write out any pending records */
#if __STD_C
#else
#endif
{
n -= 1;
}
return -1;
}
return 0;
}
/* Read new records from stream mg */
#if __STD_C
#else
#endif
{
return -1;
/* release key memory */
if(defkeyf)
s_key = 0;
}
return -1;
{ if(type&RS_DSAMELEN)
}
else for(s = RS_RESERVE, o = 0;;) /* make sure we have at least 1 record */
#if _PACKAGE_ast
return -1;
if (datalen <= x)
break;
}
else
#endif
break;
}
return -1;
else if(o == x)
{ datalen = x;
break;
}
else
{ o = x;
s += RS_RESERVE;
continue;
}
return -1;
}
}
else
return -1;
}
}
/* define length of next record */
if(!(type&RS_DSAMELEN) )
#if _PACKAGE_ast
else
#endif
}
if(datalen < 0)
return -1;
}
}
/* get data for at least 1 record */
}
/* fast loop for a common case */
{ if(keylen <= 0)
for(;;)
obj += 1;
goto done;
}
}
for(;; )
if(defkeyf)
s_key = 0;
}
if(!m_key)
return -1;
}
}
s_key = s;
}
if(s < 0)
return -1;
}
c_key += s;
s_key -= s;
}
else
}
obj += 1;
goto done;
if(type&RS_DSAMELEN)
goto done;
}
else
{
#if _PACKAGE_ast
goto done;
}
else
#endif
goto done;
else
}
}
else
goto done;
if(type&RS_DSAMELEN)
goto done;
}
else
goto done;
#if _PACKAGE_ast
else
#endif
}
if(datalen < 0)
return -1;
}
{
#if _PACKAGE_ast
if (!(rsc & ~0xff))
#endif
goto done;
}
}
}
}
done:
return 0;
}
#if __STD_C
#else
#endif
{
int ret;
return ret;
}
#if __STD_C
#else
Sfio_t* f; /* input stream */
int pos; /* stream position for resolving equal records */
#endif
{
}
}
mg->f = f;
/* make sure that Sfio will use mmap if appropriate */
/* get a decent size buffer to work with */
round /= 4;
}
/* fill first cache */
}
return mg;
}
/* compare two records. RS_REVERSE is taken care of here too. */
{ switch(len) \
} \
} \
}
#if __STD_C
#else
int reverse;
#endif
{
reg int c;
l -= d;
l -= SIZEOF_LONG;
if(d != 0)
return reverse ? -d : d;
l -= d;
return reverse ? -d : d;
}
else return 0;
}
/* The stream list is kept in reverse order to ease data movement.
** Ties are broken by stream positions to preserve stability.
*/
#if __STD_C
#else
int n;
#endif
{
r = (l = list) + n;
if(n > 4)
{ while(l != r)
{ m = l + (r-l)/2;
l = r = m;
else if(cmp > 0)
l = l == m ? r : m;
else r = m;
}
}
else
{ l = r+1; break; }
else if(cmp == 0)
{ l = r; break; }
}
}
if(cmp == 0)
break;
else *l = mg;
}
else
{ for(r = list+n; r > l; --r)
*r = *(r-1);
n += 1;
}
return n;
}
/* move data from stream mg->f to output stream rs->f */
#if __STD_C
#else
ssize_t n;
#endif
{
#if 0
static const char* event[] = { "TERMINATE", "ACCEPT", "INSERT", "DELETE", "DONE", "[5]", "[6]", "[7]" };
#endif
/* easy case, just copy everything over, let Sfio worry about it */
{ if(rsrv)
}
if(mgrsrv)
}
}
for(n_obj = n < 0 ? 0 : n;; )
{ if(n_obj == 0)
break;
n_obj = 1;
else
if(n_obj == 0)
break;
}
}
}
}
if(n_obj < 0)
if(notify)
do
{ for (;;)
goto done;
if (c == RS_DELETE)
break;
}
break;
}
} while (c == RS_INSERT);
}
}
else
{
for(;;)
else
}
if((len -= w) == 0)
break;
}
}
n_obj = 0;
}
{ uchar *t;
ssize_t s, o, x;
for(s = RS_RESERVE, o = 0;;) /* make sure we have at least 1 record */
#if _PACKAGE_ast
goto done;
if (len <= x)
break;
}
else
#endif
break;
}
else if(o == x)
{ len = x;
break;
}
else
{ o = x;
s += RS_RESERVE;
continue;
}
{ if(!s)
ret = 0;
goto done;
}
}
if(len <= 0)
{ ret = 0;
goto done;
}
if(notify)
{
do
{ for (;;)
goto done;
if (c == RS_DELETE)
break;
}
break;
}
} while (c == RS_INSERT);
}
else
}
}
#if _PACKAGE_ast
else if (rsc & ~0xff)
goto done;
}
if (notify)
do
{ for (;;)
goto done;
if (c == RS_DELETE)
break;
}
break;
}
} while (c == RS_INSERT);
}
else
}
}
#endif
else
else
}
if (notify)
do
{ for (;;)
goto done;
if (c == RS_DELETE)
break;
}
break;
}
} while (c == RS_INSERT);
}
else
}
}
if(n > 0)
break;
}
ret = 0;
done:
else
}
else
}
return ret;
}
/* write out a bunch of records from stream f */
#if __STD_C
#else
reg int n; /* total in equivalence class */
#endif
{
break;
}
break;
}
}
else
{ /* write out head object with count */
}
return -1;
}
}
return 0;
}
#if __STD_C
#else
int n;
#endif
{
reg int k;
for(k = 0; k <= n; ++k)
}
}
return -1;
}
/* merging streams of sorted records */
#if __STD_C
#else
Sfio_t* f; /* output stream */
int n; /* number of such streams */
int type; /* RS_ITEXT|RS_OTEXT */
#endif
{
if(n <= 0)
return 0;
/* make sure f is writable */
return -1;
return -1;
rs->f = f;
/* construct a list of streams sorted in reverse order */
for(n_list = 0, k = 0; k < n; ++k)
while(n_list > 0)
{ if(uniq)
{ /* we assume here that mg->f is RS_UNIQ */
}
}
for(;;)
}
if(!(mg = m) )
break;
}
}
else /* write out the union of the equi-class */
for(;;)
}
if(!(mg = m))
break;
}
}
}
else if((k = n_list-1) >= 0)
for(;;)
}
else
break;
}
>= 0 )
goto move_stream;
}
}
}
break;
}
else
continue;
}
if(r == 0) /* new equi-class */
break;
}
else /* new least element */
if(k == 0)
{ n_list = 2;
}
else n_list += 1;
}
break;
}
}
else /* if(!mg->equi && n_list == 0) */
}
/* count all pending objects */
k += 1;
/* add cached objects to output list */
/* write pending objects out with the "right count" */
}
/* now do the remainder */
}
}
return 0;
}