streams.c revision 7c478bd95313f5f23a4c958a745db2134aa03244
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#pragma ident "%Z%%M% %I% %E% SMI"
#include "streams.h"
static const stream_ops_t invalid_ops = {
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
};
stream_t *
stream_new(int src)
{
stream_t *str = safe_realloc(NULL, sizeof (stream_t));
stream_clear(str);
stream_set(str, src);
return (str);
}
void
stream_set(stream_t *str, flag_t flags)
{
if (flags & STREAM_SOURCE_MASK) {
ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY ||
(flags & STREAM_SOURCE_MASK) == STREAM_SINGLE ||
(flags & STREAM_SOURCE_MASK) == STREAM_MMAP ||
(flags & STREAM_SOURCE_MASK) == STREAM_WIDE);
str->s_status &= ~STREAM_SOURCE_MASK;
str->s_status |= flags & STREAM_SOURCE_MASK;
switch (flags & STREAM_SOURCE_MASK) {
case STREAM_NO_SOURCE:
str->s_element_size = 0;
str->s_ops = invalid_ops;
return;
case STREAM_ARRAY:
/*
* Array streams inherit element size.
*/
str->s_ops = stream_array_ops;
break;
case STREAM_MMAP:
str->s_element_size = sizeof (char);
str->s_ops = stream_mmap_ops;
break;
case STREAM_SINGLE:
str->s_element_size = sizeof (char);
str->s_ops = stream_stdio_ops;
break;
case STREAM_WIDE:
str->s_element_size = sizeof (wchar_t);
str->s_ops = stream_wide_ops;
break;
default:
die(EMSG_UNKN_STREAM, str->s_status);
}
}
str->s_status |= (flags & ~STREAM_SOURCE_MASK);
if (str->s_status & STREAM_UNIQUE)
switch (str->s_status & STREAM_SOURCE_MASK) {
case STREAM_SINGLE :
str->s_ops.sop_put_line =
stream_stdio_put_line_unique;
break;
case STREAM_WIDE :
str->s_ops.sop_put_line =
stream_wide_put_line_unique;
break;
default :
break;
}
if (str->s_status & STREAM_INSTANT)
switch (str->s_status & STREAM_SOURCE_MASK) {
case STREAM_SINGLE :
str->s_ops.sop_fetch =
stream_stdio_fetch_overwrite;
break;
case STREAM_WIDE :
str->s_ops.sop_fetch =
stream_wide_fetch_overwrite;
break;
default :
break;
}
}
void
stream_unset(stream_t *streamp, flag_t flags)
{
ASSERT(!(flags & STREAM_SOURCE_MASK));
streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK);
}
int
stream_is_primed(stream_t *streamp)
{
return (streamp->s_status & STREAM_PRIMED);
}
void
stream_clear(stream_t *str)
{
(void) memset(str, 0, sizeof (stream_t));
}
static void
stream_copy(stream_t *dest, stream_t *src)
{
(void) memcpy(dest, src, sizeof (stream_t));
}
void
stream_stat_chain(stream_t *strp)
{
struct stat buf;
stream_t *cur_strp = strp;
while (cur_strp != NULL) {
if (cur_strp->s_status & STREAM_NOTFILE ||
cur_strp->s_status & STREAM_ARRAY) {
cur_strp = cur_strp->s_next;
continue;
}
if (stat(cur_strp->s_filename, &buf) < 0)
die(EMSG_STAT, cur_strp->s_filename);
cur_strp->s_dev = buf.st_dev;
cur_strp->s_ino = buf.st_ino;
cur_strp->s_filesize = buf.st_size;
cur_strp = cur_strp->s_next;
}
}
uint_t
stream_count_chain(stream_t *str)
{
uint_t n = 0;
while (str != NULL) {
n++;
str = str->s_next;
}
return (n);
}
int
stream_open_for_read(sort_t *S, stream_t *str)
{
int fd;
ASSERT(!(str->s_status & STREAM_OUTPUT));
/*
* STREAM_ARRAY streams are open by definition.
*/
if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) {
stream_set(str, STREAM_ARRAY | STREAM_OPEN);
return (1);
}
/*
* Set data type according to locale for input from stdin.
*/
if (str->s_status & STREAM_NOTFILE) {
str->s_type.BF.s_fp = stdin;
stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ?
STREAM_SINGLE : STREAM_WIDE));
return (1);
}
ASSERT(str->s_filename);
#ifndef DEBUG_DISALLOW_MMAP
if (S->m_single_byte_locale &&
str->s_filesize > 0 &&
str->s_filesize < SSIZE_MAX) {
/*
* make mmap() attempt; set s_status and return if successful
*/
fd = open(str->s_filename, O_RDONLY);
if (fd < 0) {
if (errno == EMFILE || errno == ENFILE)
return (-1);
else
die(EMSG_OPEN, str->s_filename);
}
str->s_buffer = mmap(0, str->s_filesize, PROT_READ,
MAP_SHARED, fd, 0);
if (str->s_buffer != MAP_FAILED) {
str->s_buffer_size = str->s_filesize;
str->s_type.SF.s_fd = fd;
stream_set(str, STREAM_MMAP | STREAM_OPEN);
stream_unset(str, STREAM_PRIMED);
return (1);
}
/*
* Otherwise the mmap() failed due to address space exhaustion;
* since we have already opened the file, we close it and drop
* into the normal (STDIO) case.
*/
(void) close(fd);
str->s_buffer = NULL;
}
#endif /* DEBUG_DISALLOW_MMAP */
if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) {
if (errno == EMFILE || errno == ENFILE)
return (-1);
else
die(EMSG_OPEN, str->s_filename);
}
str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE);
if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF,
STDIO_VBUF_SIZE) != 0) {
safe_free(str->s_type.BF.s_vbuf);
str->s_type.BF.s_vbuf = NULL;
}
stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE :
STREAM_WIDE));
stream_unset(str, STREAM_PRIMED);
return (1);
}
void
stream_set_size(stream_t *str, size_t new_size)
{
/*
* p_new_size is new_size rounded upwards to nearest multiple of
* PAGESIZE, since mmap() is going to reserve it in any case. This
* ensures that the far end of the buffer is also aligned, such that we
* obtain aligned pointers if we choose to subtract from it.
*/
size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1);
if (str->s_buffer_size == p_new_size)
return;
if (str->s_buffer != NULL)
(void) munmap(str->s_buffer, str->s_buffer_size);
if (new_size == 0) {
str->s_buffer = NULL;
str->s_buffer_size = 0;
return;
}
str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE,
MAP_PRIVATE, 0);
if (str->s_buffer == MAP_FAILED)
die(EMSG_MMAP);
str->s_buffer_size = p_new_size;
}
void
stream_add_file_to_chain(stream_t **str_chain, char *filename)
{
stream_t *str;
str = stream_new(STREAM_NO_SOURCE);
str->s_filename = filename;
str->s_type.SF.s_fd = -1;
stream_push_to_chain(str_chain, str);
}
void
stream_push_to_chain(stream_t **str_chain, stream_t *streamp)
{
stream_t *cur_streamp = *str_chain;
if (cur_streamp == NULL) {
*str_chain = streamp;
streamp->s_next = NULL;
return;
}
while (cur_streamp->s_next != NULL)
cur_streamp = cur_streamp->s_next;
cur_streamp->s_next = streamp;
streamp->s_previous = cur_streamp;
streamp->s_next = NULL;
}
static void
stream_dump(stream_t *str_in, stream_t *str_out)
{
ASSERT(!(str_in->s_status & STREAM_OUTPUT));
ASSERT(str_out->s_status & STREAM_OUTPUT);
if (str_in->s_current.l_data.sp != NULL)
SOP_PUT_LINE(str_out, &str_in->s_current);
safe_free(str_in->s_current.l_raw_collate.sp);
while (!SOP_EOS(str_in)) {
SOP_FETCH(str_in);
if (str_in->s_current.l_data.sp != NULL)
SOP_PUT_LINE(str_out, &str_in->s_current);
safe_free(str_in->s_current.l_raw_collate.sp);
}
}
/*
* stream_push_to_temporary() with flags set to ST_CACHE merely copies the
* stream_t pointer onto the chain. With flags set to ST_NOCACHE, the stream is
* written out to a file. Stream pointers passed to stream_push_to_temporary()
* must refer to allocated objects, and not to objects created on function
* stacks. Finally, if strp == NULL, stream_push_to_temporary() creates and
* pushes the new stream; the output stream is left open if ST_OPEN is set.
*/
stream_t *
stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags)
{
stream_t *out_streamp;
if (flags & ST_CACHE) {
ASSERT(streamp->s_status & STREAM_ARRAY);
stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY);
stream_push_to_chain(str_chain, streamp);
return (streamp);
}
out_streamp = safe_realloc(NULL, sizeof (stream_t));
if (streamp != NULL) {
stream_copy(out_streamp, streamp);
stream_unset(out_streamp, STREAM_OPEN);
ASSERT(streamp->s_element_size == sizeof (char) ||
streamp->s_element_size == sizeof (wchar_t));
stream_set(out_streamp,
streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE);
out_streamp->s_buffer = NULL;
out_streamp->s_buffer_size = 0;
} else {
stream_clear(out_streamp);
stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE :
STREAM_SINGLE);
}
(void) bump_file_template();
out_streamp->s_filename = strdup(get_file_template());
if (SOP_OPEN_FOR_WRITE(out_streamp) == -1)
return (NULL);
stream_set(out_streamp, STREAM_TEMPORARY);
stream_push_to_chain(str_chain, out_streamp);
if (streamp != NULL) {
/*
* We reset the input stream to the beginning, and copy it in
* sequence to the output stream, freeing the raw_collate field
* as we go.
*/
if (SOP_PRIME(streamp) != PRIME_SUCCEEDED)
die(EMSG_BADPRIME);
stream_dump(streamp, out_streamp);
}
if (!(flags & ST_OPEN)) {
SOP_FREE(out_streamp);
(void) SOP_CLOSE(out_streamp);
}
/*
* Now that we've written this stream to disk, we needn't protect any
* in-memory consumer.
*/
if (streamp != NULL)
streamp->s_consumer = NULL;
return (out_streamp);
}
void
stream_close_all_previous(stream_t *tail_streamp)
{
stream_t *cur_streamp;
ASSERT(tail_streamp != NULL);
cur_streamp = tail_streamp->s_previous;
while (cur_streamp != NULL) {
(void) SOP_FREE(cur_streamp);
if (SOP_IS_CLOSABLE(cur_streamp))
(void) SOP_CLOSE(cur_streamp);
cur_streamp = cur_streamp->s_previous;
}
}
void
stream_unlink_temporary(stream_t *streamp)
{
if (streamp->s_status & STREAM_TEMPORARY) {
(void) SOP_FREE(streamp);
if (streamp->s_ops.sop_unlink)
(void) SOP_UNLINK(streamp);
}
}
/*
* stream_insert() takes input from src stream, converts to each line to
* collatable form, and places a line_rec_t in dest stream, which is of type
* STREAM_ARRAY.
*/
int
stream_insert(sort_t *S, stream_t *src, stream_t *dest)
{
ssize_t i = dest->s_type.LA.s_array_size;
line_rec_t *l_series;
char *l_convert = dest->s_buffer;
int return_val = ST_MEM_AVAIL;
int fetch_result = NEXT_LINE_COMPLETE;
/*
* Scan through until total bytes allowed accumulated, and return.
* Use SOP_FETCH(src) so that this works for all stream types,
* and so that we can repeat until eos.
*
* For each new line, we move back sizeof (line_rec_t) from the end of
* the array buffer, and copy into the start of the array buffer. When
* the pointers meet, or when we exhaust the current stream, we return.
* If we have not filled the current memory allocation, we return
* ST_MEM_AVAIL, else we return ST_MEM_FILLED.
*/
ASSERT(stream_is_primed(src));
ASSERT(dest->s_status & STREAM_ARRAY);
/*LINTED ALIGNMENT*/
l_series = (line_rec_t *)((caddr_t)dest->s_buffer
+ dest->s_buffer_size) - dest->s_type.LA.s_array_size;
if (dest->s_type.LA.s_array_size)
l_convert = l_series->l_collate.sp +
l_series->l_collate_length + src->s_element_size;
/*
* current line has been set prior to entry
*/
src->s_current.l_collate.sp = l_convert;
src->s_current.l_collate_bufsize = (caddr_t)l_series
- (caddr_t)l_convert - sizeof (line_rec_t);
src->s_current.l_raw_collate.sp = NULL;
if (src->s_current.l_collate_bufsize <= 0)
return (ST_MEM_FILLED);
src->s_consumer = dest;
while (src->s_current.l_collate_bufsize > 0 &&
(src->s_current.l_collate_length = S->m_coll_convert(
S->m_fields_head, &src->s_current, FCV_FAIL,
S->m_field_separator)) >= 0) {
ASSERT((char *)l_series > l_convert);
l_series--;
l_convert += src->s_current.l_collate_length;
if ((char *)l_series <= l_convert) {
__S(stats_incr_insert_filled_downward());
l_series++;
return_val = ST_MEM_FILLED;
break;
}
/*
* There's no collision with the lower part of the buffer, so we
* can safely begin processing the line. In the debug case, we
* test for uninitialized data by copying a non-zero pattern.
*/
#ifdef DEBUG
memset(l_series, 0x1ff11ff1, sizeof (line_rec_t));
#endif
copy_line_rec(&src->s_current, l_series);
i++;
if (SOP_EOS(src) ||
(fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE)
break;
src->s_current.l_collate.sp = l_convert;
src->s_current.l_collate_bufsize = (caddr_t)l_series
- (caddr_t)l_convert - sizeof (line_rec_t);
src->s_current.l_raw_collate.sp = NULL;
}
if (fetch_result == NEXT_LINE_INCOMPLETE) {
__S(stats_incr_insert_filled_input());
return_val = ST_MEM_FILLED;
} else if (src->s_current.l_collate_length < 0 ||
src->s_current.l_collate_bufsize <= 0) {
__S(stats_incr_insert_filled_upward());
return_val = ST_MEM_FILLED;
}
if (fetch_result != NEXT_LINE_INCOMPLETE &&
src->s_current.l_collate_length < 0 &&
i == 0)
/*
* There's no room for conversion of our only line; need to
* execute with larger memory.
*/
die(EMSG_MEMORY);
/*
* Set up pointer array to line records.
*/
if (i > dest->s_type.LA.s_array_size)
dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array,
sizeof (line_rec_t *) * i);
dest->s_type.LA.s_array_size = i;
i = 0;
while (i < dest->s_type.LA.s_array_size) {
dest->s_type.LA.s_array[i] = l_series;
l_series++;
i++;
}
/*
* LINES_ARRAY streams are always open.
*/
stream_set(dest, STREAM_OPEN);
return (return_val);
}
/*
* stream_swap_buffer() exchanges the stream's buffer with the proffered one;
* s_current is not adjusted so this is safe only for STREAM_INSTANT.
*/
void
stream_swap_buffer(stream_t *str, char **buf, size_t *size)
{
void *tb = *buf;
size_t ts = *size;
*buf = str->s_buffer;
*size = str->s_buffer_size;
str->s_buffer = tb;
str->s_buffer_size = ts;
}