istream-seekable.c revision 2c42748505ef4aed83ff59b34e50ed5606900c86
89a126810703c666309310d0f3189e9834d70b5bTimo Sirainen/* Copyright (c) 2005-2013 Dovecot authors, see the included COPYING file */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
f335accff54f408a8bbb328f8098ad458f2ff58eTimo Sirainen unsigned int cur_idx;
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream,
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic void unref_streams(struct seekable_istream *sstream)
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen unsigned int i;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
345253fb28498b2e0a60f4a2a8644c65feee7e75Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
0f66f12eb4cdbf47670975044c88d8f388bf92dfTimo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
c69a255a68103a50fa3f04a527281a169075403fTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen unsigned int i;
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen struct istream_private *stream = &sstream->istream;
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen const unsigned char *buffer;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen fd = sstream->fd_callback(&path, sstream->context);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* copy our currently read buffer to it */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen if (write_full(fd, sstream->membuf->data, sstream->membuf->used) < 0) {
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen i_stream_create_fd(fd, sstream->istream.max_buffer_size, TRUE);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* read back the data we just had in our buffer */
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen buffer = i_stream_get_data(sstream->fd_input, &size);
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen i_error("istream-seekable: Couldn't read back "
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainen "in-memory input %s",
687d1dee0e92229232aa8be416897b640df67d07Timo Sirainenstatic ssize_t read_more(struct seekable_istream *sstream)
0ce8f754204c7eeb33805993807393f74faf2cd3Timo Sirainen while ((ret = i_stream_read(sstream->cur_input)) == -1) {
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen io_stream_set_error(&sstream->istream.iostream,
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen "read(%s) failed: %s",
eddd9bf1a1369aea4a2715f6be1137da6d17d293Timo Sirainen /* go to next stream */
eddd9bf1a1369aea4a2715f6be1137da6d17d293Timo Sirainen sstream->cur_input = sstream->input[sstream->cur_idx++];
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* last one, EOF */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->size = sstream->istream.istream.v_offset;
65988f5a8abed57e9894fec77105941e046d3490Timo Sirainen /* see if stream has pending data */
65988f5a8abed57e9894fec77105941e046d3490Timo Sirainen size = i_stream_get_data_size(sstream->cur_input);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
e1b83f64e62cc3e8967c75fcc3f9b5dbb243d3b3Timo Sirainen struct istream_private *stream = &sstream->istream;
e1b83f64e62cc3e8967c75fcc3f9b5dbb243d3b3Timo Sirainen const unsigned char *data;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen if (stream->istream.v_offset + stream->pos >= sstream->membuf->used) {
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* need to read more */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen if (sstream->membuf->used >= stream->max_buffer_size)
62394a19cba1a8df01cad66eaa9331a70464441eTimo Sirainen /* read more to buffer */
62394a19cba1a8df01cad66eaa9331a70464441eTimo Sirainen /* we should have more now. */
62394a19cba1a8df01cad66eaa9331a70464441eTimo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen stream->buffer = CONST_PTR_OFFSET(sstream->membuf->data, offset);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen struct istream_private *stream = &sstream->istream;
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen buffer_create_dynamic(default_pool, sstream->write_peak);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen data = buffer_append_space_unsafe(sstream->membuf, sstream->write_peak);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen i_error("read(%s) failed: %m", sstream->temp_path);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen const unsigned char *data;
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen /* copy everything to temp file and use it as the stream */
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen /* need to read more */
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen /* save to our file */
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen if (i_stream_seekable_write_failed(sstream) < 0)
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Siraineni_stream_seekable_stat(struct istream_private *stream, bool exact)
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* we've already reached EOF and know the size */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* we want to know the full size of the file, so read until
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen we're finished */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen } while ((ret = i_stream_seekable_read(stream)) > 0);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen i_panic("i_stream_stat() used for non-blocking "
64510d2cc23a79d2142030bf5bade44baa490db3Timo Sirainen i_stream_skip(&stream->istream, stream->pos - stream->skip);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* using a file backed buffer, we can use real fstat() */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* buffer is completely in memory */
64510d2cc23a79d2142030bf5bade44baa490db3Timo Sirainen stream->statbuf.st_size = sstream->membuf->used;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic void i_stream_seekable_seek(struct istream_private *stream,
e619ecbbc00cba9e6e1e8322caa59776507fac02Timo Sirainen /* seeking backwards */
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* we can't skip over data we haven't yet read and written to
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen i_stream_default_seek_nonseekable(stream, v_offset, mark);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Siraineni_streams_merge(struct istream *input[], size_t max_buffer_size,
64510d2cc23a79d2142030bf5bade44baa490db3Timo Sirainen int (*fd_callback)(const char **path_r, void *context),
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen const unsigned char *data;
b29403032a7ee56c309e94d92fbf1728bacb88f4Timo Sirainen unsigned int count;
919733fcead68b0e9617cfff86ae5c74d097c6cdTimo Sirainen /* if any of the streams isn't blocking, set ourself also nonblocking */
345253fb28498b2e0a60f4a2a8644c65feee7e75Timo Sirainen for (count = 0; input[count] != NULL; count++) {
919733fcead68b0e9617cfff86ae5c74d097c6cdTimo Sirainen sstream->membuf = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
1dba794aa92dc13e6afd7a50a8c33cb19d6aa235Timo Sirainen sstream->istream.max_buffer_size = max_buffer_size;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->input = i_new(struct istream *, count + 1);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen memcpy(sstream->input, input, sizeof(*input) * count);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen /* initialize our buffer from first stream's pending data */
055f4599bba1874fa1148a8fa488517fa077619cTimo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.iostream.close = i_stream_seekable_close;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.iostream.destroy = i_stream_seekable_destroy;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.iostream.set_max_buffer_size =
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.read = i_stream_seekable_read;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.stat = i_stream_seekable_stat;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen sstream->istream.seek = i_stream_seekable_seek;
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainen return i_stream_create(&sstream->istream, NULL, -1);
050975ee630c761ab237fce7b8f84fe189bb02d2Timo Sirainenstatic bool inputs_are_seekable(struct istream *input[])
f335accff54f408a8bbb328f8098ad458f2ff58eTimo Sirainen unsigned int count;
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen for (count = 0; input[count] != NULL; count++) {
2e03303e721a293d796c0287829396f5caea76eaTimo Siraineni_stream_create_seekable(struct istream *input[],
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen int (*fd_callback)(const char **path_r, void *context),
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen /* If all input streams are seekable, use concat istream instead */
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainen return i_streams_merge(input, max_buffer_size, fd_callback, context);
2e03303e721a293d796c0287829396f5caea76eaTimo Sirainenstatic int seekable_fd_callback(const char **path_r, void *context)
int fd;
return fd;
struct istream *
const char *temp_path_prefix)
return stream;