istream-chain.c revision d730192e34fbedbc590a5abc7351e5af5e120c5f
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen/* Copyright (c) 2003-2013 Dovecot authors, see the included COPYING file */
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen#include "lib.h"
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen#include "llist.h"
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen#include "istream-private.h"
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen#include "istream-chain.h"
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainenstruct chain_istream;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainenstruct istream_chain_link {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream_chain_link *prev, *next;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream *stream;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen bool eof;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen};
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenstruct istream_chain {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream_chain_link *head, *tail;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct chain_istream *stream;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen};
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenstruct chain_istream {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream_private istream;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen size_t prev_stream_left, prev_skip;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream_chain chain;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen};
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
737561538a2dcdcda948a1da2830a612d8263a23Timo Sirainenstatic void ATTR_NULL(2)
643a81fff9003cba13deb49a565a3c8171da524dTimo Siraineni_stream_chain_append_internal(struct istream_chain *chain,
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream *stream)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen{
64b61cd24d630223478ccbe1934b9f60f0881f59Timo Sirainen struct istream_chain_link *link;
64b61cd24d630223478ccbe1934b9f60f0881f59Timo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen if (stream == NULL && chain->tail != NULL && chain->tail->stream == NULL)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen return;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen link = i_new(struct istream_chain_link, 1);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen link->stream = stream;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen link->eof = stream == NULL;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen if (stream != NULL)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_stream_ref(stream);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (chain->head == NULL && stream != NULL) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (chain->stream->istream.max_buffer_size == 0) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen chain->stream->istream.max_buffer_size =
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen stream->real_stream->max_buffer_size;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen } else {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_set_max_buffer_size(stream,
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen chain->stream->istream.max_buffer_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen DLLIST2_APPEND(&chain->head, &chain->tail, link);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen}
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenvoid i_stream_chain_append(struct istream_chain *chain, struct istream *stream)
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen{
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_chain_append_internal(chain, stream);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen}
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainenvoid i_stream_chain_append_eof(struct istream_chain *chain)
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen{
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_stream_chain_append_internal(chain, NULL);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen}
737561538a2dcdcda948a1da2830a612d8263a23Timo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainenstatic void
643a81fff9003cba13deb49a565a3c8171da524dTimo Siraineni_stream_chain_set_max_buffer_size(struct iostream_private *stream,
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen size_t max_size)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen{
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct chain_istream *cstream = (struct chain_istream *)stream;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream_chain_link *link = cstream->chain.head;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->istream.max_buffer_size = max_size;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen while (link != NULL) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (link->stream != NULL)
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_set_max_buffer_size(link->stream, max_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen link = link->next;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen}
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenstatic void i_stream_chain_destroy(struct iostream_private *stream)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen{
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct chain_istream *cstream = (struct chain_istream *)stream;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream_chain_link *link = cstream->chain.head;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen while (link != NULL) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream_chain_link *next = link->next;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (link->stream != NULL)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_stream_unref(&link->stream);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_free(link);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen link = next;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen }
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen}
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenstatic void i_stream_chain_read_next(struct chain_istream *cstream)
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen{
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen struct istream_chain_link *link = cstream->chain.head;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream *prev_input;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen const unsigned char *data;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen size_t data_size, size;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_assert(link != NULL && link->stream != NULL);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen i_assert(link->stream->eof);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen prev_input = link->stream;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen data = i_stream_get_data(prev_input, &data_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen DLLIST2_REMOVE(&cstream->chain.head, &cstream->chain.tail, link);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_free(link);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen /* a) we have more streams, b) we have EOF, c) we need to wait
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen for more streams */
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen link = cstream->chain.head;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (link != NULL && link->stream != NULL)
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_seek(link->stream, 0);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (cstream->istream.buffer == cstream->istream.w_buffer) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen /* we've already buffered the prev_input */
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen } else {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen /* we already verified that the data size is less than the
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen maximum buffer size */
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->istream.pos = 0;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (data_size > 0) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (!i_stream_try_alloc(&cstream->istream, data_size, &size))
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_unreached();
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_assert(size >= data_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen memcpy(cstream->istream.w_buffer, data, data_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->istream.skip = 0;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->istream.pos = data_size;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->prev_stream_left = data_size;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_skip(prev_input, data_size);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_stream_unref(&prev_input);
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen}
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainenstatic ssize_t i_stream_chain_read(struct istream_private *stream)
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen{
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct chain_istream *cstream = (struct chain_istream *)stream;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen struct istream_chain_link *link = cstream->chain.head;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen const unsigned char *data;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen size_t size, data_size, cur_data_pos, new_pos, bytes_skipped;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen size_t new_bytes_count;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen ssize_t ret;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (link != NULL && link->eof) {
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen stream->istream.eof = TRUE;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen return -1;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen }
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen i_assert(stream->skip >= cstream->prev_skip);
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen bytes_skipped = stream->skip - cstream->prev_skip;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen if (cstream->prev_stream_left == 0) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen /* no need to worry about buffers, skip everything */
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen } else if (bytes_skipped < cstream->prev_stream_left) {
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen /* we're still skipping inside buffer */
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen cstream->prev_stream_left -= bytes_skipped;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen bytes_skipped = 0;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen } else {
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen /* done with the buffer */
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen bytes_skipped -= cstream->prev_stream_left;
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen cstream->prev_stream_left = 0;
6c00502d4ece417ead501db8f0ee3e8287ba4459Timo Sirainen }
643a81fff9003cba13deb49a565a3c8171da524dTimo Sirainen stream->pos -= bytes_skipped;
stream->skip -= bytes_skipped;
stream->buffer += bytes_skipped;
cstream->prev_skip = stream->skip;
if (link == NULL) {
i_assert(bytes_skipped == 0);
return 0;
}
i_stream_skip(link->stream, bytes_skipped);
i_assert(stream->pos >= stream->skip + cstream->prev_stream_left);
cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left);
data = i_stream_get_data(link->stream, &data_size);
if (data_size > cur_data_pos)
ret = 0;
else {
/* need to read more */
i_assert(cur_data_pos == data_size);
ret = i_stream_read(link->stream);
if (ret == -2 || ret == 0)
return ret;
if (ret == -1) {
if (link->stream->stream_errno != 0) {
stream->istream.stream_errno =
link->stream->stream_errno;
return -1;
}
/* EOF of this stream, go to next stream */
i_stream_chain_read_next(cstream);
cstream->prev_skip = stream->skip;
return i_stream_chain_read(stream);
}
/* we read something */
data = i_stream_get_data(link->stream, &data_size);
}
if (cstream->prev_stream_left == 0) {
/* we can point directly to the current stream's buffers */
stream->buffer = data;
stream->pos -= stream->skip;
stream->skip = 0;
new_pos = data_size;
} else if (data_size == cur_data_pos) {
/* nothing new read */
i_assert(ret == 0 || ret == -1);
stream->buffer = stream->w_buffer;
new_pos = stream->pos;
} else {
/* we still have some of the previous stream left. merge the
new data with it. */
i_assert(data_size > cur_data_pos);
new_bytes_count = data_size - cur_data_pos;
if (!i_stream_try_alloc(stream, new_bytes_count, &size)) {
stream->buffer = stream->w_buffer;
return -2;
}
stream->buffer = stream->w_buffer;
if (new_bytes_count > size)
new_bytes_count = size;
memcpy(stream->w_buffer + stream->pos,
data + cur_data_pos, new_bytes_count);
new_pos = stream->pos + new_bytes_count;
}
ret = new_pos > stream->pos ? (ssize_t)(new_pos - stream->pos) :
(ret == 0 ? 0 : -1);
stream->pos = new_pos;
cstream->prev_skip = stream->skip;
return ret;
}
struct istream *i_stream_create_chain(struct istream_chain **chain_r)
{
struct chain_istream *cstream;
cstream = i_new(struct chain_istream, 1);
cstream->chain.stream = cstream;
cstream->istream.max_buffer_size = 256;
cstream->istream.iostream.destroy = i_stream_chain_destroy;
cstream->istream.iostream.set_max_buffer_size =
i_stream_chain_set_max_buffer_size;
cstream->istream.read = i_stream_chain_read;
cstream->istream.istream.readable_fd = FALSE;
cstream->istream.istream.blocking = FALSE;
cstream->istream.istream.seekable = FALSE;
*chain_r = &cstream->chain;
return i_stream_create(&cstream->istream, NULL, -1);
}