istream-seekable.c revision e2ce8d4a6ac5d82a906178148453e7613fab9ba0
5f5870385cff47efd2f58e7892f251cf13761528Timo Sirainen/* Copyright (c) 2005-2013 Dovecot authors, see the included COPYING file */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen unsigned int cur_idx;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream,
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic void unref_streams(struct seekable_istream *sstream)
bd63b5b860658b01b1f46f26d406e1e4a9dc019aTimo Sirainen unsigned int i;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
02a54da28f376dd66d7939d8546a196a0045b486Timo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
eacce2276278ce6a8176a9a100807dba50bbfb36Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
eacce2276278ce6a8176a9a100807dba50bbfb36Timo Sirainen unsigned int i;
eacce2276278ce6a8176a9a100807dba50bbfb36Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
eacce2276278ce6a8176a9a100807dba50bbfb36Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen struct istream_private *stream = &sstream->istream;
e1203014de25c8c3d3975a9f4b4a04616df4bba2Timo Sirainen const unsigned char *buffer;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen fd = sstream->fd_callback(&path, sstream->context);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* copy our currently read buffer to it */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen if (write_full(fd, sstream->membuf->data, sstream->membuf->used) < 0) {
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_stream_create_fd(fd, sstream->istream.max_buffer_size, TRUE);
411f318ed3a25fa66c1b932e10df43841e2725c9Timo Sirainen /* read back the data we just had in our buffer */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen buffer = i_stream_get_data(sstream->fd_input, &size);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_error("istream-seekable: Couldn't read back "
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen "in-memory input %s",
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic ssize_t read_more(struct seekable_istream *sstream)
714e2da5096fb52b8845d3c79f9bb26225a606c9Timo Sirainen while ((ret = i_stream_read(sstream->cur_input)) == -1) {
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* go to next stream */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->cur_input = sstream->input[sstream->cur_idx++];
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* last one, EOF */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->size = sstream->istream.istream.v_offset;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* see if stream has pending data */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen size = i_stream_get_data_size(sstream->cur_input);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen struct istream_private *stream = &sstream->istream;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen const unsigned char *data;
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen if (stream->istream.v_offset + stream->pos >= sstream->membuf->used) {
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen /* need to read more */
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen if (sstream->membuf->used >= stream->max_buffer_size)
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen /* read more to buffer */
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen /* we should have more now. */
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen stream->buffer = CONST_PTR_OFFSET(sstream->membuf->data, offset);
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen struct istream_private *stream = &sstream->istream;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen buffer_create_dynamic(default_pool, sstream->write_peak);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen data = buffer_append_space_unsafe(sstream->membuf, sstream->write_peak);
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
4530cfa7456c10cd03fe9120c75f8bcb2f623ba4Timo Sirainen i_error("read(%s) failed: %m", sstream->temp_path);
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen const unsigned char *data;
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen /* copy everything to temp file and use it as the stream */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
45c872f65e4f327ef166c6e2b71bb43e188ac562Timo Sirainen /* need to read more */
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen /* save to our file */
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
0b2f7be9fadfd4026a9174e51170890cde3edf48Timo Sirainen if (i_stream_seekable_write_failed(sstream) < 0)
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
02a54da28f376dd66d7939d8546a196a0045b486Timo Siraineni_stream_seekable_stat(struct istream_private *stream, bool exact)
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* we've already reached EOF and know the size */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* we want to know the full size of the file, so read until
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen we're finished */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen } while ((ret = i_stream_seekable_read(stream)) > 0);
0df9428baed48afaff90b4d4f03792d2fd756a43Timo Sirainen i_panic("i_stream_stat() used for non-blocking "
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_stream_skip(&stream->istream, stream->pos - stream->skip);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* using a file backed buffer, we can use real fstat() */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
04870054863757edf048c81dcce3c5e7dec453cdTimo Sirainen /* buffer is completely in memory */
e593e507ee5ea3869271a631874c5c4b5c7a294dTimo Sirainen stream->statbuf.st_size = sstream->membuf->used;
02a54da28f376dd66d7939d8546a196a0045b486Timo Siraineni_streams_merge(struct istream *input[], size_t max_buffer_size,
ec5fec7eab19e134a2607b7e224b3e14a1771ee0Timo Sirainen int (*fd_callback)(const char **path_r, void *context),
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen const unsigned char *data;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen unsigned int count;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* if any of the streams isn't blocking, set ourself also nonblocking */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen for (count = 0; input[count] != NULL; count++) {
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->membuf = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.max_buffer_size = max_buffer_size;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->input = i_new(struct istream *, count + 1);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen memcpy(sstream->input, input, sizeof(*input) * count);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen /* initialize our buffer from first stream's pending data */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.iostream.close = i_stream_seekable_close;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.iostream.destroy = i_stream_seekable_destroy;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.iostream.set_max_buffer_size =
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.read = i_stream_seekable_read;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen sstream->istream.stat = i_stream_seekable_stat;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen return i_stream_create(&sstream->istream, NULL, -1);
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainenstatic bool inputs_are_seekable(struct istream *input[])
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen unsigned int count;
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen for (count = 0; input[count] != NULL; count++) {
e5b723864630e40c9028808ef417dd3d6fbf495bTimo Siraineni_stream_create_seekable(struct istream *input[],
e5b723864630e40c9028808ef417dd3d6fbf495bTimo Sirainen int (*fd_callback)(const char **path_r, void *context),
e5b723864630e40c9028808ef417dd3d6fbf495bTimo Sirainen /* If all input streams are seekable, use concat istream instead */
e5b723864630e40c9028808ef417dd3d6fbf495bTimo Sirainen return i_streams_merge(input, max_buffer_size, fd_callback, context);
e5b723864630e40c9028808ef417dd3d6fbf495bTimo Sirainenstatic int seekable_fd_callback(const char **path_r, void *context)
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
0df9428baed48afaff90b4d4f03792d2fd756a43Timo Sirainen i_error("safe_mkstemp(%s) failed: %m", str_c(path));
0df9428baed48afaff90b4d4f03792d2fd756a43Timo Sirainen /* we just want the fd, unlink it */
0df9428baed48afaff90b4d4f03792d2fd756a43Timo Sirainen /* shouldn't happen.. */
02a54da28f376dd66d7939d8546a196a0045b486Timo Sirainen i_error("unlink(%s) failed: %m", str_c(path));
714e2da5096fb52b8845d3c79f9bb26225a606c9Timo Siraineni_stream_create_seekable_path(struct istream *input[],
6882df5fbca4a09cdaa95f54d70bb31b5920528cTimo Sirainen stream = i_stream_create_seekable(input, max_buffer_size,