istream-seekable.c revision 2454dfa32c93c20a8522c6ed42fe057baaac9f9a
2454dfa32c93c20a8522c6ed42fe057baaac9f9aStephan Bosch/* Copyright (c) 2005-2017 Dovecot authors, see the included COPYING file */
fcca16701767c6b92227a9ee125de69d257882f6Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen unsigned int cur_idx;
e2ce8d4a6ac5d82a906178148453e7613fab9ba0Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream,
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
a6f281d078ed03d555802c1a8e15fefce80132dcTimo Sirainenstatic void unref_streams(struct seekable_istream *sstream)
a6f281d078ed03d555802c1a8e15fefce80132dcTimo Sirainen unsigned int i;
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
a94936bafd127680184da114c6a177b37ff656e5Timo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen unsigned int i;
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen struct istream_private *stream = &sstream->istream;
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen const unsigned char *buffer;
fcca16701767c6b92227a9ee125de69d257882f6Timo Sirainen fd = sstream->fd_callback(&path, sstream->context);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* copy our currently read buffer to it */
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen if (write_full(fd, sstream->membuf->data, sstream->membuf->used) < 0) {
9e406b04bb5bed7d73aeed375c40c6a3fea1a2cbTimo Sirainen i_error("istream-seekable: write_full(%s) failed: %m", path);
bace943c67e6cd14ce6c994f533d82a3caad5bf1Timo Sirainen i_stream_create_fd_autoclose(&fd, sstream->istream.max_buffer_size);
ac6bba612af5207c24f6f02497d64b0ea03e7bbdTimo Sirainen i_stream_set_name(sstream->fd_input, t_strdup_printf(
ac6bba612af5207c24f6f02497d64b0ea03e7bbdTimo Sirainen "(seekable temp-istream for: %s)", i_stream_get_name(&stream->istream)));
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen /* read back the data we just had in our buffer */
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen buffer = i_stream_get_data(sstream->fd_input, &size);
7f74811b78f8915e73dffc88bb49009e98b6846dTimo Sirainen if ((ret = i_stream_read(sstream->fd_input)) <= 0) {
7bcb308d0e13dfa48b483b0addccd889a77bb598Timo Sirainen i_error("istream-seekable: Couldn't read back "
7f74811b78f8915e73dffc88bb49009e98b6846dTimo Sirainen "in-memory input %s: %s",
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainenstatic ssize_t read_more(struct seekable_istream *sstream)
8d56f3334e22619abf56833d290bb1f49ac6722cTimo Sirainen while ((ret = i_stream_read(sstream->cur_input)) == -1) {
2c42748505ef4aed83ff59b34e50ed5606900c86Timo Sirainen io_stream_set_error(&sstream->istream.iostream,
2c42748505ef4aed83ff59b34e50ed5606900c86Timo Sirainen "read(%s) failed: %s",
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* go to next stream */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen sstream->cur_input = sstream->input[sstream->cur_idx++];
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* last one, EOF */
d10cb4d7a80571af21f776c65604442bf09b1765Timo Sirainen sstream->size = sstream->istream.istream.v_offset;
5069adb2f5b3609fff9a0a705c6edeae56e0030aTimo Sirainen /* see if stream has pending data */
3785910c303507db5f629684e6dde2cc7f83668eTimo Sirainen size = i_stream_get_data_size(sstream->cur_input);
c09f9f95db314e7482c95e502e1c56ed6c555797Timo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
252db51b6c0a605163326b3ea5d09e9936ca3b29Timo Sirainen struct istream_private *stream = &sstream->istream;
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen const unsigned char *data;
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen if (stream->istream.v_offset + stream->pos >= sstream->membuf->used) {
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* need to read more */
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen if (sstream->membuf->used >= stream->max_buffer_size)
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen /* read more to buffer */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* we should have more now. */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen stream->buffer = CONST_PTR_OFFSET(sstream->membuf->data, offset);
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen struct istream_private *stream = &sstream->istream;
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen buffer_create_dynamic(default_pool, sstream->write_peak);
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen data = buffer_append_space_unsafe(sstream->membuf, sstream->write_peak);
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
9e406b04bb5bed7d73aeed375c40c6a3fea1a2cbTimo Sirainen i_error("istream-seekable: read(%s) failed: %m", sstream->temp_path);
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen const unsigned char *data;
f2b95f63ebdf77dba4dac938cf8c65c839f1067dTimo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* copy everything to temp file and use it as the stream */
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* need to read more */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* save to our file */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
9e406b04bb5bed7d73aeed375c40c6a3fea1a2cbTimo Sirainen i_error("istream-seekable: write_full(%s) failed: %m",
34830cefe1757de0ffca67acdc529d5bc8b06b66Timo Sirainen if (i_stream_seekable_write_failed(sstream) < 0)
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
7efee0bb408b0d5253e41997857bdda57855cdc7Timo Sirainen stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
4da70fe8c9cb6e57b36103d78ab1e9c8654f76d9Timo Sirainen ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
a94936bafd127680184da114c6a177b37ff656e5Timo Siraineni_stream_seekable_stat(struct istream_private *stream, bool exact)
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
d10cb4d7a80571af21f776c65604442bf09b1765Timo Sirainen /* we've already reached EOF and know the size */
31fc5ca967261b131646ff9719eb1fb23888ed1aTimo Sirainen /* we want to know the full size of the file, so read until
31fc5ca967261b131646ff9719eb1fb23888ed1aTimo Sirainen we're finished */
31fc5ca967261b131646ff9719eb1fb23888ed1aTimo Sirainen } while ((ret = i_stream_seekable_read(stream)) > 0);
31fc5ca967261b131646ff9719eb1fb23888ed1aTimo Sirainen i_panic("i_stream_stat() used for non-blocking "
31fc5ca967261b131646ff9719eb1fb23888ed1aTimo Sirainen i_stream_skip(&stream->istream, stream->pos - stream->skip);
8255a22cccf3b0ccf38206c594941820ac1c9e00Timo Sirainen /* using a file backed buffer, we can use real fstat() */
01f4ee4a0243f3fe9af763e1a540cd5cff0d63f5Timo Sirainen if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
8255a22cccf3b0ccf38206c594941820ac1c9e00Timo Sirainen /* buffer is completely in memory */
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen stream->statbuf.st_size = sstream->membuf->used;
be51dfea768ad502e08ebd02917138f7a0f8f625Timo Sirainenstatic void i_stream_seekable_seek(struct istream_private *stream,
be51dfea768ad502e08ebd02917138f7a0f8f625Timo Sirainen /* seeking backwards */
be51dfea768ad502e08ebd02917138f7a0f8f625Timo Sirainen /* we can't skip over data we haven't yet read and written to
be51dfea768ad502e08ebd02917138f7a0f8f625Timo Sirainen i_stream_default_seek_nonseekable(stream, v_offset, mark);
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Siraineni_streams_merge(struct istream *input[], size_t max_buffer_size,
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Sirainen int (*fd_callback)(const char **path_r, void *context),
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen const unsigned char *data;
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen unsigned int count;
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen /* if any of the streams isn't blocking, set ourself also nonblocking */
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen for (count = 0; input[count] != NULL; count++) {
5702c81e2d788449c3bc207eb9c19e539458ad9eTimo Sirainen sstream->membuf = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
597dba3488c648ffb375ee4a552bd52ac4346979Timo Sirainen sstream->istream.max_buffer_size = max_buffer_size;
fe363b433b8038a69b55169da9dca27892ad7d18Timo Sirainen sstream->input = i_new(struct istream *, count + 1);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen memcpy(sstream->input, input, sizeof(*input) * count);
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen /* initialize our buffer from first stream's pending data */
2201e2cc1b3f744dac61c2bf8095bcb6b5719540Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainen sstream->istream.iostream.close = i_stream_seekable_close;
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainen sstream->istream.iostream.destroy = i_stream_seekable_destroy;
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainen sstream->istream.iostream.set_max_buffer_size =
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainen sstream->istream.read = i_stream_seekable_read;
a94936bafd127680184da114c6a177b37ff656e5Timo Sirainen sstream->istream.stat = i_stream_seekable_stat;
be51dfea768ad502e08ebd02917138f7a0f8f625Timo Sirainen sstream->istream.seek = i_stream_seekable_seek;
9511a40d933181045343110c8101b75887062aaeTimo Sirainen return i_stream_create(&sstream->istream, NULL, -1);
c395e7d730eb4ee17e2b619acec487637a785110Timo Sirainenstatic bool inputs_are_seekable(struct istream *input[])
c395e7d730eb4ee17e2b619acec487637a785110Timo Sirainen unsigned int count;
c395e7d730eb4ee17e2b619acec487637a785110Timo Sirainen for (count = 0; input[count] != NULL; count++) {
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Siraineni_stream_create_seekable(struct istream *input[],
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Sirainen int (*fd_callback)(const char **path_r, void *context),
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Sirainen /* If all input streams are seekable, use concat istream instead */
4d33a3133e8484ebd00f677f457cda82f1365b84Timo Sirainen return i_streams_merge(input, max_buffer_size, fd_callback, context);
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Sirainenstatic int seekable_fd_callback(const char **path_r, void *context)
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Sirainen fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
9e406b04bb5bed7d73aeed375c40c6a3fea1a2cbTimo Sirainen i_error("istream-seekable: safe_mkstemp(%s) failed: %m", str_c(path));
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Sirainen /* we just want the fd, unlink it */
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Sirainen /* shouldn't happen.. */
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Siraineni_stream_create_seekable_path(struct istream *input[],
6967fa47dde9f2726bd86019a50627dacf2d7509Timo Sirainen stream = i_stream_create_seekable(input, max_buffer_size,