istream-concat.c revision 7026c16186f543e11af12b8b87f396006db93297
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen/* Copyright (c) 2007 Dovecot authors, see the included COPYING file */
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen#include "lib.h"
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen#include "buffer.h"
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen#include "istream-internal.h"
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen#include "istream-concat.h"
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainenstruct concat_istream {
6cc0546c058f3e6253c6f99727b28dd602712974Timo Sirainen struct istream_private istream;
6cc0546c058f3e6253c6f99727b28dd602712974Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen struct istream **input, *cur_input;
6cc0546c058f3e6253c6f99727b28dd602712974Timo Sirainen uoff_t *input_size;
6cc0546c058f3e6253c6f99727b28dd602712974Timo Sirainen
6cc0546c058f3e6253c6f99727b28dd602712974Timo Sirainen unsigned int cur_idx, unknown_size_idx;
0ce5f96804e81cb0f857e7df32c0272f1eed9377Timo Sirainen size_t prev_size;
0ce5f96804e81cb0f857e7df32c0272f1eed9377Timo Sirainen};
0ce5f96804e81cb0f857e7df32c0272f1eed9377Timo Sirainen
0ce5f96804e81cb0f857e7df32c0272f1eed9377Timo Sirainenstatic void i_stream_concat_close(struct iostream_private *stream)
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen{
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
252db51b6c0a605163326b3ea5d09e9936ca3b29Timo Sirainen unsigned int i;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen i_stream_close(cstream->input[i]);
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen}
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainenstatic void i_stream_concat_destroy(struct iostream_private *stream)
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen{
9b7eeffb5752b500ac62ba1fd01c4a8c4ada14e9Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
9b7eeffb5752b500ac62ba1fd01c4a8c4ada14e9Timo Sirainen unsigned int i;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen
93fa87cf1a96c4f279ec4f5c311820313ba12c34Timo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
93fa87cf1a96c4f279ec4f5c311820313ba12c34Timo Sirainen i_stream_unref(&cstream->input[i]);
93fa87cf1a96c4f279ec4f5c311820313ba12c34Timo Sirainen}
93fa87cf1a96c4f279ec4f5c311820313ba12c34Timo Sirainen
93fa87cf1a96c4f279ec4f5c311820313ba12c34Timo Sirainenstatic void
b565a6a7a66fb9f224d00c06a950e3c1c585c18eTimo Siraineni_stream_concat_set_max_buffer_size(struct iostream_private *stream,
b565a6a7a66fb9f224d00c06a950e3c1c585c18eTimo Sirainen size_t max_size)
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen{
8cb72c59d5ea4e9e5f638d7ec840bb853f5a188eTimo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
8cb72c59d5ea4e9e5f638d7ec840bb853f5a188eTimo Sirainen unsigned int i;
8cb72c59d5ea4e9e5f638d7ec840bb853f5a188eTimo Sirainen
8cb72c59d5ea4e9e5f638d7ec840bb853f5a188eTimo Sirainen cstream->istream.max_buffer_size = max_size;
8cb72c59d5ea4e9e5f638d7ec840bb853f5a188eTimo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen i_stream_set_max_buffer_size(cstream->input[i], max_size);
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen}
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainenstatic void i_stream_concat_read_next(struct concat_istream *cstream)
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen{
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainen const unsigned char *data;
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainen size_t data_size, size;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen i_assert(cstream->cur_input->eof);
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen cstream->cur_idx++;
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen cstream->cur_input = cstream->input[cstream->cur_idx];
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen i_stream_seek(cstream->cur_input, 0);
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen if (cstream->istream.pos == cstream->istream.skip) {
211ed7806d8715ec2280ffbf5d10f0d6e4f1beb2Timo Sirainen i_assert(cstream->prev_size == 0);
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen cstream->istream.skip = 0;
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen cstream->istream.pos = 0;
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen return;
59151b71059df1190acd75d8717ed04a7920c862Timo Sirainen }
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen /* we need to keep the current data */
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen data = cstream->istream.buffer + cstream->istream.skip;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen data_size = cstream->istream.pos - cstream->istream.skip;
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen cstream->istream.skip = cstream->istream.pos = 0;
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen
683eebe490bbe5caec246c535a10ea9f93f5c330Timo Sirainen /* we already verified that the data size is less than the
683eebe490bbe5caec246c535a10ea9f93f5c330Timo Sirainen maximum buffer size */
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen if (!i_stream_get_buffer_space(&cstream->istream, data_size, &size))
5238111c460098d9cc8cc22527026138a278b9a4Timo Sirainen i_unreached();
5238111c460098d9cc8cc22527026138a278b9a4Timo Sirainen i_assert(size >= data_size);
6ef7e31619edfaa17ed044b45861d106a86191efTimo Sirainen
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen cstream->prev_size = data_size;
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainen memcpy(cstream->istream.w_buffer, data, data_size);
de954ff15b495be13007a8aca2c09fd1d356a283Timo Sirainen cstream->istream.pos = data_size;
de954ff15b495be13007a8aca2c09fd1d356a283Timo Sirainen}
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainenstatic ssize_t i_stream_concat_read(struct istream_private *stream)
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen{
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen const unsigned char *data;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen size_t size, pos, skip;
2767104d81e97a109f0aa9758792bfa1da325a97Timo Sirainen ssize_t ret;
0ce5f96804e81cb0f857e7df32c0272f1eed9377Timo Sirainen bool last_stream;
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen if (cstream->cur_input == NULL) {
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen stream->istream.eof = TRUE;
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen return -1;
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen }
d7e72877b7a5085c3addf9729d0bfbe1b5357853Timo Sirainen
d7e72877b7a5085c3addf9729d0bfbe1b5357853Timo Sirainen skip = stream->skip;
d7e72877b7a5085c3addf9729d0bfbe1b5357853Timo Sirainen if (cstream->prev_size > 0) {
d7e72877b7a5085c3addf9729d0bfbe1b5357853Timo Sirainen if (stream->skip < cstream->prev_size) {
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen cstream->prev_size -= stream->skip;
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen skip = 0;
03f5c621d06d6b6d77a145196c9633a7aa64dc78Timo Sirainen } else {
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen /* we don't need the buffer anymore */
c0435c854a0e7246373b9752d163095cc4fbe985Timo Sirainen skip -= cstream->prev_size;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen stream->skip -= cstream->prev_size;
8cdb3234fe3c77e477c7a0e6934678f58fc54d4dTimo Sirainen cstream->prev_size = 0;
8cdb3234fe3c77e477c7a0e6934678f58fc54d4dTimo Sirainen
8cdb3234fe3c77e477c7a0e6934678f58fc54d4dTimo Sirainen i_free_and_null(stream->w_buffer);
8cdb3234fe3c77e477c7a0e6934678f58fc54d4dTimo Sirainen stream->buffer = NULL;
ecc81625167ed96c04c02aa190a1ea5baa65b474Timo Sirainen stream->buffer_size = 0;
}
}
i_stream_skip(cstream->cur_input, skip);
data = i_stream_get_data(cstream->cur_input, &pos);
if (pos > stream->pos)
ret = 0;
else {
/* need to read more */
ret = i_stream_read(cstream->cur_input);
if (ret == -2 || ret == 0)
return ret;
if (ret == -1 && stream->istream.stream_errno != 0) {
stream->istream.stream_errno =
cstream->cur_input->stream_errno;
return -1;
}
/* we either read something or we're at EOF */
last_stream = cstream->input[cstream->cur_idx+1] == NULL;
if (ret == -1 && !last_stream) {
if (stream->pos - stream->skip >=
stream->max_buffer_size)
return -2;
i_stream_concat_read_next(cstream);
return i_stream_concat_read(stream);
}
stream->istream.eof = cstream->cur_input->eof && last_stream;
data = i_stream_get_data(cstream->cur_input, &pos);
}
if (stream->w_buffer == NULL) {
stream->buffer = data;
stream->pos -= stream->skip;
stream->skip = 0;
} else {
if (!i_stream_get_buffer_space(stream, pos, &size))
return -2;
if (pos > size)
pos = size;
memcpy(stream->w_buffer + stream->pos, data, pos);
}
pos += cstream->prev_size;
ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
(ret == 0 ? 0 : -1);
stream->pos = pos;
return ret;
}
static unsigned int
find_v_offset(struct concat_istream *cstream, uoff_t *v_offset)
{
const struct stat *st;
unsigned int i;
for (i = 0; cstream->input[i] != NULL; i++) {
if (*v_offset == 0) {
/* seek to beginning of this stream */
break;
}
if (i == cstream->unknown_size_idx) {
/* we'll need to figure out this stream's size */
st = i_stream_stat(cstream->input[i], TRUE);
if (st == NULL) {
cstream->istream.istream.stream_errno =
cstream->input[i]->stream_errno;
return (unsigned int)-1;
}
/* @UNSAFE */
cstream->input_size[i] = st->st_size;
cstream->unknown_size_idx = i + 1;
}
if (*v_offset < cstream->input_size[i])
break;
*v_offset -= cstream->input_size[i];
}
return i;
}
static void i_stream_concat_seek(struct istream_private *stream,
uoff_t v_offset, bool mark ATTR_UNUSED)
{
struct concat_istream *cstream = (struct concat_istream *)stream;
stream->istream.stream_errno = 0;
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
cstream->prev_size = 0;
cstream->cur_idx = find_v_offset(cstream, &v_offset);
if (cstream->cur_idx == (unsigned int)-1) {
cstream->cur_input = NULL;
return;
}
cstream->cur_input = cstream->input[cstream->cur_idx];
if (cstream->cur_input != NULL)
i_stream_seek(cstream->cur_input, v_offset);
}
static const struct stat *
i_stream_concat_stat(struct istream_private *stream, bool exact ATTR_UNUSED)
{
struct concat_istream *cstream = (struct concat_istream *)stream;
uoff_t v_offset = (uoff_t)-1;
unsigned int i;
/* make sure we have all sizes */
(void)find_v_offset(cstream, &v_offset);
stream->statbuf.st_size = 0;
for (i = 0; i < cstream->unknown_size_idx; i++)
stream->statbuf.st_size += cstream->input_size[i];
return &stream->statbuf;
}
struct istream *i_stream_create_concat(struct istream *input[])
{
struct concat_istream *cstream;
unsigned int count;
size_t max_buffer_size = I_STREAM_MIN_SIZE;
bool blocking = TRUE, seekable = TRUE;
/* if any of the streams isn't blocking or seekable, set ourself also
nonblocking/nonseekable */
for (count = 0; input[count] != NULL; count++) {
size_t cur_max = input[count]->real_stream->max_buffer_size;
if (cur_max > max_buffer_size)
max_buffer_size = cur_max;
if (!input[count]->blocking)
blocking = FALSE;
if (!input[count]->seekable)
seekable = FALSE;
i_stream_ref(input[count]);
}
i_assert(count != 0);
cstream = i_new(struct concat_istream, 1);
cstream->input = i_new(struct istream *, count + 1);
cstream->input_size = i_new(uoff_t, count + 1);
memcpy(cstream->input, input, sizeof(*input) * count);
cstream->cur_input = cstream->input[0];
i_stream_seek(cstream->cur_input, 0);
cstream->istream.iostream.close = i_stream_concat_close;
cstream->istream.iostream.destroy = i_stream_concat_destroy;
cstream->istream.iostream.set_max_buffer_size =
i_stream_concat_set_max_buffer_size;
cstream->istream.max_buffer_size = max_buffer_size;
cstream->istream.read = i_stream_concat_read;
cstream->istream.seek = i_stream_concat_seek;
cstream->istream.stat = i_stream_concat_stat;
cstream->istream.istream.blocking = blocking;
cstream->istream.istream.seekable = seekable;
return i_stream_create(&cstream->istream, -1, 0);
}