istream-seekable.c revision c1d4780bc0c9017e8e5d366b81e4fad31174c0ad
5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2005-2011 Dovecot authors, see the included COPYING file */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen unsigned int cur_idx;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen unsigned int i;
678d0463849ba777106eb7875f27db07a5d8e3dfTimo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen unsigned int i;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Boschi_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
c649139f889c02154fc9a153728b81619edb5663Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen unsigned int i;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch i_stream_set_max_buffer_size(sstream->fd_input, max_size);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Boschstatic int copy_to_temp_file(struct seekable_istream *sstream)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen struct istream_private *stream = &sstream->istream;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen const unsigned char *buffer;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen fd = sstream->fd_callback(&path, sstream->context);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* copy our currently read buffer to it */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen if (write_full(fd, sstream->buffer->data, sstream->buffer->used) < 0) {
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen i_stream_create_fd(fd, sstream->istream.max_buffer_size, TRUE);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* read back the data we just had in our buffer */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch buffer = i_stream_get_data(sstream->fd_input, &size);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch i_error("istream-seekable: Couldn't read back "
c649139f889c02154fc9a153728b81619edb5663Timo Sirainen "in-memory input %s",
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Boschstatic ssize_t read_more(struct seekable_istream *sstream)
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch while ((ret = i_stream_read(sstream->cur_input)) < 0) {
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch /* full / error */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* go to next stream */
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->cur_input = sstream->input[sstream->cur_idx++];
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* last one, EOF */
dafbec2c6b4275233a78cb137f41dd8041aa1c46Timo Sirainen /* see if stream has pending data */
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch (void)i_stream_get_data(sstream->cur_input, &size);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen struct istream_private *stream = &sstream->istream;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen const unsigned char *data;
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen if (stream->istream.v_offset + stream->pos >= sstream->buffer->used) {
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* need to read more */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen if (sstream->buffer->used >= stream->max_buffer_size)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen (void)i_stream_get_data(sstream->cur_input, &size);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* read more to buffer */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* we should have more now. */
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
a0c453a8edaec90fb0d945c874de0b1845bc7d7eTimo Sirainen stream->buffer = CONST_PTR_OFFSET(sstream->buffer->data, offset);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen struct istream_private *stream = &sstream->istream;
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen buffer_create_dynamic(default_pool, sstream->write_peak);
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen data = buffer_append_space_unsafe(sstream->buffer, sstream->write_peak);
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen i_error("read(%s) failed: %m", sstream->temp_path);
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen const unsigned char *data;
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen /* copy everything to temp file and use it as the stream */
a0c453a8edaec90fb0d945c874de0b1845bc7d7eTimo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
a0c453a8edaec90fb0d945c874de0b1845bc7d7eTimo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen /* need to read more */
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen /* save to our file */
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen if (i_stream_seekable_write_failed(sstream) < 0)
a0c453a8edaec90fb0d945c874de0b1845bc7d7eTimo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainen stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
8d587838c414c48a331f0b54cd7ffd97e5024abdTimo Sirainenstatic void i_stream_seekable_seek(struct istream_private *stream,
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainenstatic const struct stat *
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Siraineni_stream_seekable_stat(struct istream_private *stream, bool exact)
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen /* we want to know the full size of the file, so read until
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen we're finished */
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen } while ((ret = i_stream_seekable_read(stream)) > 0);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen i_panic("i_stream_stat() used for non-blocking "
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen "seekable stream");
acc4e0a41f1c8ef0559a19c280afc1b97b9e0818Timo Sirainen i_stream_skip(&stream->istream, stream->pos - stream->skip);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* using a file backed buffer, we can use real fstat() */
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen return i_stream_stat(sstream->fd_input, exact);
48566ca412a7cf3b42512fd0ec112744778e5da0Timo Sirainen /* buffer is completely in memory */
a0c453a8edaec90fb0d945c874de0b1845bc7d7eTimo Sirainen stream->statbuf.st_size = sstream->buffer->used;
acc4e0a41f1c8ef0559a19c280afc1b97b9e0818Timo Siraineni_stream_create_seekable(struct istream *input[],
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen int (*fd_callback)(const char **path_r, void *context),
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen const unsigned char *data;
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen unsigned int count;
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen /* If all input streams are seekable, use concat istream instead */
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen for (count = 0; input[count] != NULL; count++) {
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch /* if any of the streams isn't blocking, set ourself also nonblocking */
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch for (count = 0; input[count] != NULL; count++) {
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->buffer = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.max_buffer_size = max_buffer_size;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->input = i_new(struct istream *, count + 1);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch memcpy(sstream->input, input, sizeof(*input) * count);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch /* initialize our buffer from first stream's pending data */
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch data = i_stream_get_data(sstream->cur_input, &size);
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.iostream.close = i_stream_seekable_close;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.iostream.destroy = i_stream_seekable_destroy;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.iostream.set_max_buffer_size =
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.read = i_stream_seekable_read;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.seek = i_stream_seekable_seek;
2d7df7973f80011033e8e9fa676d3ff4c14468d8Stephan Bosch sstream->istream.stat = i_stream_seekable_stat;