istream-concat.c revision 1c6f6f5bef70f16546b3bc8f4cd5f93f373e82a2
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen/* Copyright (c) 2007-2013 Dovecot authors, see the included COPYING file */
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen#include "lib.h"
d23c747de9d33966483fbdd41f08ad7766da7c5cTimo Sirainen#include "buffer.h"
d23c747de9d33966483fbdd41f08ad7766da7c5cTimo Sirainen#include "istream-private.h"
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen#include "istream-concat.h"
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainenstruct concat_istream {
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen struct istream_private istream;
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen struct istream **input, *cur_input;
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen uoff_t *input_size;
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen unsigned int cur_idx, unknown_size_idx;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen size_t prev_stream_left, prev_skip;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen};
296dca49e4fe6046e0328c67ef1cf4b8077dec9cTimo Sirainen
7fd51f7b0b4d990ec3cfef4e60ee685bf9fb32deTimo Sirainenstatic void i_stream_concat_close(struct iostream_private *stream,
7fd51f7b0b4d990ec3cfef4e60ee685bf9fb32deTimo Sirainen bool close_parent)
7fd51f7b0b4d990ec3cfef4e60ee685bf9fb32deTimo Sirainen{
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen unsigned int i;
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen if (close_parent) {
296dca49e4fe6046e0328c67ef1cf4b8077dec9cTimo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen i_stream_close(cstream->input[i]);
4ac2d38239cea8090154e17faefd77de5a71d882Timo Sirainen }
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen}
4ac2d38239cea8090154e17faefd77de5a71d882Timo Sirainen
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainenstatic void i_stream_concat_destroy(struct iostream_private *stream)
32e1554df9abca74fef0af2ba2e4c37e90a06cd0Timo Sirainen{
eb64c3586d854cddd693f0b811d897399076a441Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
32e1554df9abca74fef0af2ba2e4c37e90a06cd0Timo Sirainen unsigned int i;
a988c3fd9251806e38931a011aaa4006dd081cbdTimo Sirainen
16db188cfddce117500a161302f17ae691b4500eTimo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
b337d3b6871b878d6467d7d8ed600433af5da5a1Timo Sirainen i_stream_unref(&cstream->input[i]);
b337d3b6871b878d6467d7d8ed600433af5da5a1Timo Sirainen i_free(cstream->input);
16db188cfddce117500a161302f17ae691b4500eTimo Sirainen i_free(cstream->input_size);
60b42c6dfdf9edcca8a96b380ef9a0adc60c2464Timo Sirainen i_free(cstream->istream.w_buffer);
16db188cfddce117500a161302f17ae691b4500eTimo Sirainen}
16db188cfddce117500a161302f17ae691b4500eTimo Sirainen
16db188cfddce117500a161302f17ae691b4500eTimo Sirainenstatic void
16db188cfddce117500a161302f17ae691b4500eTimo Siraineni_stream_concat_set_max_buffer_size(struct iostream_private *stream,
16db188cfddce117500a161302f17ae691b4500eTimo Sirainen size_t max_size)
296dca49e4fe6046e0328c67ef1cf4b8077dec9cTimo Sirainen{
296dca49e4fe6046e0328c67ef1cf4b8077dec9cTimo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen unsigned int i;
2b95b7a9f4f06e7640ef431d9e6efc2423cacf1aTimo Sirainen
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen cstream->istream.max_buffer_size = max_size;
6145bd3b17b9135b77b0b42409a0cc3fa0d1b946Timo Sirainen for (i = 0; cstream->input[i] != NULL; i++)
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen i_stream_set_max_buffer_size(cstream->input[i], max_size);
3e0bae44b65f5c46989fcef3d1e07203f496327eTimo Sirainen}
296dca49e4fe6046e0328c67ef1cf4b8077dec9cTimo Sirainen
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainenstatic void i_stream_concat_read_next(struct concat_istream *cstream)
67c47dbb3fde79218320fd38a45c33f61bbf3012Timo Sirainen{
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen const unsigned char *data;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen size_t data_size, size;
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen i_assert(cstream->cur_input->eof);
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen data = i_stream_get_data(cstream->cur_input, &data_size);
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen cstream->cur_idx++;
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen cstream->cur_input = cstream->input[cstream->cur_idx];
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen i_stream_seek(cstream->cur_input, 0);
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen if (cstream->prev_stream_left > 0 || cstream->istream.pos == 0) {
af3f857bb3166ed99595e11a9d18e5b5cc670e1aTimo Sirainen cstream->prev_stream_left += data_size;
67c47dbb3fde79218320fd38a45c33f61bbf3012Timo Sirainen i_assert(cstream->prev_stream_left ==
c58906589cafc32df4c04ffbef933baadd3f2276Timo Sirainen cstream->istream.pos - cstream->istream.skip);
47ede56f4e6eebfe631a1f0febf74d7adcdbcd00Timo Sirainen return;
47ede56f4e6eebfe631a1f0febf74d7adcdbcd00Timo Sirainen }
a64adf62fa33f2463a86f990217b0c9078531a40Timo Sirainen
c396c5cdd510d09aa35875ccfd643c5c21ed1f89Timo Sirainen /* we already verified that the data size is less than the
a64adf62fa33f2463a86f990217b0c9078531a40Timo Sirainen maximum buffer size */
6145bd3b17b9135b77b0b42409a0cc3fa0d1b946Timo Sirainen cstream->istream.pos = 0;
0dffa25d211be541ee3c953b23566a1a990789dfTimo Sirainen if (data_size > 0) {
0dffa25d211be541ee3c953b23566a1a990789dfTimo Sirainen if (!i_stream_try_alloc(&cstream->istream, data_size, &size))
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen i_unreached();
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen i_assert(size >= data_size);
a64adf62fa33f2463a86f990217b0c9078531a40Timo Sirainen }
d9250ee7e2815bb2116134b58f7c860578148d6cTimo Sirainen
a10ed8c47534b4c6b6bf2711ccfe577e720a47b4Timo Sirainen cstream->prev_stream_left = data_size;
092018b35bb1dc5bd61848a38189fe6ac8f791ddTimo Sirainen memcpy(cstream->istream.w_buffer, data, data_size);
7327394e30c1020b9a2a49c72a7e3d0f7803e680Timo Sirainen cstream->istream.skip = 0;
7327394e30c1020b9a2a49c72a7e3d0f7803e680Timo Sirainen cstream->istream.pos = data_size;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen}
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainenstatic ssize_t i_stream_concat_read(struct istream_private *stream)
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen{
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen struct concat_istream *cstream = (struct concat_istream *)stream;
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen const unsigned char *data;
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen size_t size, data_size, cur_data_pos, new_pos, bytes_skipped;
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen size_t new_bytes_count;
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen ssize_t ret;
90804278df6586cceaf1b1b07a44713c01694048Timo Sirainen bool last_stream;
6145bd3b17b9135b77b0b42409a0cc3fa0d1b946Timo Sirainen
6145bd3b17b9135b77b0b42409a0cc3fa0d1b946Timo Sirainen if (cstream->cur_input == NULL) {
6145bd3b17b9135b77b0b42409a0cc3fa0d1b946Timo Sirainen stream->istream.stream_errno = EINVAL;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen return -1;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen }
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen i_assert(stream->skip >= cstream->prev_skip);
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen bytes_skipped = stream->skip - cstream->prev_skip;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen if (cstream->prev_stream_left == 0) {
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen /* no need to worry about buffers, skip everything */
c8cf8a605e0ddea7cb36fe04551aeca5090e684bTimo Sirainen } else if (bytes_skipped < cstream->prev_stream_left) {
c8cf8a605e0ddea7cb36fe04551aeca5090e684bTimo Sirainen /* we're still skipping inside buffer */
c8cf8a605e0ddea7cb36fe04551aeca5090e684bTimo Sirainen cstream->prev_stream_left -= bytes_skipped;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen bytes_skipped = 0;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen } else {
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen /* done with the buffer */
ef50336eefcb9ba99f73c6af37420eaf8857a39bTimo Sirainen bytes_skipped -= cstream->prev_stream_left;
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen cstream->prev_stream_left = 0;
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen }
14376e0584c178306c400750352869cf2aaf6feeTimo Sirainen stream->pos -= bytes_skipped;
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen stream->skip -= bytes_skipped;
68a4946b12583b88fa802e52ebee45cd96056772Timo Sirainen stream->buffer += bytes_skipped;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen cstream->prev_skip = stream->skip;
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen i_stream_skip(cstream->cur_input, bytes_skipped);
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen i_assert(stream->pos >= stream->skip + cstream->prev_stream_left);
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left);
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen data = i_stream_get_data(cstream->cur_input, &data_size);
288d6ef592719f2be3cad9f034e9be05f9839785Timo Sirainen if (data_size > cur_data_pos)
288d6ef592719f2be3cad9f034e9be05f9839785Timo Sirainen ret = 0;
288d6ef592719f2be3cad9f034e9be05f9839785Timo Sirainen else {
288d6ef592719f2be3cad9f034e9be05f9839785Timo Sirainen /* need to read more */
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen i_assert(cur_data_pos == data_size);
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen ret = i_stream_read(cstream->cur_input);
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen if (ret == -2 || ret == 0)
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen return ret;
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen if (ret == -1 && cstream->cur_input->stream_errno != 0) {
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen stream->istream.stream_errno =
32e1554df9abca74fef0af2ba2e4c37e90a06cd0Timo Sirainen cstream->cur_input->stream_errno;
32e1554df9abca74fef0af2ba2e4c37e90a06cd0Timo Sirainen return -1;
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen }
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen /* we either read something or we're at EOF */
a988c3fd9251806e38931a011aaa4006dd081cbdTimo Sirainen last_stream = cstream->input[cstream->cur_idx+1] == NULL;
a988c3fd9251806e38931a011aaa4006dd081cbdTimo Sirainen if (ret == -1 && !last_stream) {
c0a87e5f3316a57e6f915882fa1951d0fbb74a61Timo Sirainen if (stream->pos >= stream->max_buffer_size)
9b706b345064ce8e8a657f54633f009a101298eaTimo Sirainen return -2;
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen i_stream_concat_read_next(cstream);
9874ad56b94788297fdac4eae7cba5d651b48222Timo Sirainen cstream->prev_skip = stream->skip;
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen return i_stream_concat_read(stream);
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen }
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen stream->istream.eof = cstream->cur_input->eof && last_stream;
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen i_assert(ret != -1 || stream->istream.eof);
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen data = i_stream_get_data(cstream->cur_input, &data_size);
957d09e495c33ad1180f82152e5e87e6b51ab04bTimo Sirainen }
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen if (cstream->prev_stream_left == 0) {
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen /* we can point directly to the current stream's buffers */
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen stream->buffer = data;
a3f6d0302a83270253ff9d2ebd4fea0003e9ac95Timo Sirainen stream->pos -= stream->skip;
14b551180cb4ac7acac8b048d8d6d7278541d1f6Timo Sirainen stream->skip = 0;
14b551180cb4ac7acac8b048d8d6d7278541d1f6Timo Sirainen new_pos = data_size;
14b551180cb4ac7acac8b048d8d6d7278541d1f6Timo Sirainen } else if (data_size == cur_data_pos) {
14b551180cb4ac7acac8b048d8d6d7278541d1f6Timo Sirainen /* nothing new read */
14b551180cb4ac7acac8b048d8d6d7278541d1f6Timo Sirainen i_assert(ret == 0 || ret == -1);
e8490a52a1bc71bc53034e68f464435684ad810fTimo Sirainen stream->buffer = stream->w_buffer;
new_pos = stream->pos;
} else {
/* we still have some of the previous stream left. merge the
new data with it. */
i_assert(data_size > cur_data_pos);
new_bytes_count = data_size - cur_data_pos;
if (!i_stream_try_alloc(stream, new_bytes_count, &size)) {
stream->buffer = stream->w_buffer;
return -2;
}
stream->buffer = stream->w_buffer;
if (new_bytes_count > size)
new_bytes_count = size;
memcpy(stream->w_buffer + stream->pos,
data + cur_data_pos, new_bytes_count);
new_pos = stream->pos + new_bytes_count;
}
ret = new_pos > stream->pos ? (ssize_t)(new_pos - stream->pos) :
(ret == 0 ? 0 : -1);
stream->pos = new_pos;
cstream->prev_skip = stream->skip;
return ret;
}
static unsigned int
find_v_offset(struct concat_istream *cstream, uoff_t *v_offset)
{
const struct stat *st;
unsigned int i;
for (i = 0; cstream->input[i] != NULL; i++) {
if (*v_offset == 0) {
/* seek to beginning of this stream */
break;
}
if (i == cstream->unknown_size_idx) {
/* we'll need to figure out this stream's size */
if (i_stream_stat(cstream->input[i], TRUE, &st) < 0) {
io_stream_set_error(&cstream->istream.iostream,
"stat(%s) failed: %s",
i_stream_get_name(cstream->input[i]),
i_stream_get_error(cstream->input[i]));
i_error("istream-concat: stat(%s) failed: %s",
i_stream_get_name(cstream->input[i]),
i_stream_get_error(cstream->input[i]));
cstream->istream.istream.stream_errno =
cstream->input[i]->stream_errno;
return UINT_MAX;
}
/* @UNSAFE */
cstream->input_size[i] = st->st_size;
cstream->unknown_size_idx = i + 1;
}
if (*v_offset < cstream->input_size[i])
break;
*v_offset -= cstream->input_size[i];
}
return i;
}
static void i_stream_concat_seek(struct istream_private *stream,
uoff_t v_offset, bool mark ATTR_UNUSED)
{
struct concat_istream *cstream = (struct concat_istream *)stream;
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
cstream->prev_stream_left = 0;
cstream->prev_skip = 0;
cstream->cur_idx = find_v_offset(cstream, &v_offset);
if (cstream->cur_idx == UINT_MAX) {
/* failed */
cstream->cur_input = NULL;
stream->istream.stream_errno = EINVAL;
return;
}
cstream->cur_input = cstream->input[cstream->cur_idx];
if (cstream->cur_input != NULL)
i_stream_seek(cstream->cur_input, v_offset);
}
static int
i_stream_concat_stat(struct istream_private *stream, bool exact ATTR_UNUSED)
{
struct concat_istream *cstream = (struct concat_istream *)stream;
uoff_t v_offset = (uoff_t)-1;
unsigned int i;
/* make sure we have all sizes */
if (find_v_offset(cstream, &v_offset) == UINT_MAX)
return -1;
stream->statbuf.st_size = 0;
for (i = 0; i < cstream->unknown_size_idx; i++)
stream->statbuf.st_size += cstream->input_size[i];
return 0;
}
struct istream *i_stream_create_concat(struct istream *input[])
{
struct concat_istream *cstream;
unsigned int count;
size_t max_buffer_size = I_STREAM_MIN_SIZE;
bool blocking = TRUE, seekable = TRUE;
/* if any of the streams isn't blocking or seekable, set ourself also
nonblocking/nonseekable */
for (count = 0; input[count] != NULL; count++) {
size_t cur_max = input[count]->real_stream->max_buffer_size;
if (cur_max > max_buffer_size)
max_buffer_size = cur_max;
if (!input[count]->blocking)
blocking = FALSE;
if (!input[count]->seekable)
seekable = FALSE;
i_stream_ref(input[count]);
}
i_assert(count != 0);
cstream = i_new(struct concat_istream, 1);
cstream->input = i_new(struct istream *, count + 1);
cstream->input_size = i_new(uoff_t, count + 1);
memcpy(cstream->input, input, sizeof(*input) * count);
cstream->cur_input = cstream->input[0];
i_stream_seek(cstream->cur_input, 0);
cstream->istream.iostream.close = i_stream_concat_close;
cstream->istream.iostream.destroy = i_stream_concat_destroy;
cstream->istream.iostream.set_max_buffer_size =
i_stream_concat_set_max_buffer_size;
cstream->istream.max_buffer_size = max_buffer_size;
cstream->istream.read = i_stream_concat_read;
cstream->istream.seek = i_stream_concat_seek;
cstream->istream.stat = i_stream_concat_stat;
cstream->istream.istream.readable_fd = FALSE;
cstream->istream.istream.blocking = blocking;
cstream->istream.istream.seekable = seekable;
return i_stream_create(&cstream->istream, NULL, -1);
}