istream-seekable.c revision 3fe5eaeb32a8bbc31ce0673793b1c37f72d00d47
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen/* Copyright (c) 2005-2017 Dovecot authors, see the included COPYING file */
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
a64adf62fa33f2463a86f990217b0c9078531a40Timo Sirainen unsigned int cur_idx;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream,
563273bdac80393af63b9520cbf4d24cc0efd028Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic void unref_streams(struct seekable_istream *sstream)
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainen unsigned int i;
2615df45a8027948a474abe5e817b34b0499c171Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
3e564425db51f3921ce4de11859777135fdedd15Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
563273bdac80393af63b9520cbf4d24cc0efd028Timo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
57a8c6a95e4bce3eeaba36985adb81c07dd683ffTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
563273bdac80393af63b9520cbf4d24cc0efd028Timo Sirainen unsigned int i;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
563273bdac80393af63b9520cbf4d24cc0efd028Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
ff7056842f14fd3b30a2d327dfab165b9d15dd30Timo Sirainen struct istream_private *stream = &sstream->istream;
f1743785713e7632459d623d5df2108f4b93accbTimo Sirainen const unsigned char *buffer;
ccc895c0358108d2304239063e940b7d75f364abTimo Sirainen fd = sstream->fd_callback(&path, sstream->context);
c5ab90cfad9cc3e33bcb1baeb30ffc82a7b7053aTimo Sirainen /* copy our currently read buffer to it */
c5ab90cfad9cc3e33bcb1baeb30ffc82a7b7053aTimo Sirainen if (write_full(fd, sstream->membuf->data, sstream->membuf->used) < 0) {
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen i_error("istream-seekable: write_full(%s) failed: %m", path);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen sstream->fd_input = i_stream_create_fd_autoclose(&fd,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen I_MAX(stream->pos, sstream->istream.max_buffer_size));
88187ee880b4829443e0d55ea7d145d9d5880217Timo Sirainen i_stream_set_name(sstream->fd_input, t_strdup_printf(
88187ee880b4829443e0d55ea7d145d9d5880217Timo Sirainen "(seekable temp-istream for: %s)", i_stream_get_name(&stream->istream)));
d3442384ca53d4b18a493db7dd0b000f470419cfTimo Sirainen /* read back the data we just had in our buffer */
d3442384ca53d4b18a493db7dd0b000f470419cfTimo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
d3442384ca53d4b18a493db7dd0b000f470419cfTimo Sirainen buffer = i_stream_get_data(sstream->fd_input, &size);
d3442384ca53d4b18a493db7dd0b000f470419cfTimo Sirainen if ((ret = i_stream_read(sstream->fd_input)) <= 0) {
d756ebcfa96bd7cff02097c8f26df9df368b81b1Timo Sirainen i_error("istream-seekable: Couldn't read back "
d756ebcfa96bd7cff02097c8f26df9df368b81b1Timo Sirainen "in-memory input %s: %s",
4b41116563110d00330896a568eff1078c382827Timo Sirainen /* Set the max buffer size only after we've already read everything
4b41116563110d00330896a568eff1078c382827Timo Sirainen into memory. For example with istream-data it's possible that
5137d2d80255938a0f5fb8f3c1a21b34cf11ada3Timo Sirainen more data exists in buffer than max_buffer_size. */
5137d2d80255938a0f5fb8f3c1a21b34cf11ada3Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input,
ccec5f82349eae44087900c0e64ed1fd5a1a6fcaTimo Sirainenstatic ssize_t read_more(struct seekable_istream *sstream)
cb05ecbd96ddb5e53c1850d27434541138a3f284Timo Sirainen while ((ret = i_stream_read(sstream->cur_input)) == -1) {
61f5256ef248d35459b53534ae428bf6d016e1c5Timo Sirainen io_stream_set_error(&sstream->istream.iostream,
e3796bfd2bc0fd5ba664893d346df9334a5b3af0Timo Sirainen "read(%s) failed: %s",
923eb3dde28e4d8841c14fd6b4a69635b7070c3eTimo Sirainen /* go to next stream */
923eb3dde28e4d8841c14fd6b4a69635b7070c3eTimo Sirainen sstream->cur_input = sstream->input[sstream->cur_idx++];
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen /* last one, EOF */
ecdce39e5ef4b62eefa9f5818f17d153fd5d710aTimo Sirainen sstream->size = sstream->istream.istream.v_offset;
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen /* see if stream has pending data */
b8835b8a21c617ceb82ddc5a176243faf36aa8f7Timo Sirainen size = i_stream_get_data_size(sstream->cur_input);
e015e2f7e7f48874495f9df8b0dd192b7ffcb5ccTimo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
e015e2f7e7f48874495f9df8b0dd192b7ffcb5ccTimo Sirainen struct istream_private *stream = &sstream->istream;
f95b3d29bc56f139c18c880aa574a0ca72b0cffbTimo Sirainen const unsigned char *data;
c4b376dd6e0c423006d7ac83a39253bcaf8e7c47Timo Sirainen if (stream->istream.v_offset + stream->pos >= sstream->membuf->used) {
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen /* need to read more */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (sstream->membuf->used >= stream->max_buffer_size)
e3aeeb634245e80d4f643f8d2eea11d6b72336d8Timo Sirainen /* read more to buffer */
bd4d0a1a7c0626452b8d82f37e3ec07267ac9896Timo Sirainen /* we should have more now. */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen stream->buffer = CONST_PTR_OFFSET(sstream->membuf->data, offset);
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
04870054863757edf048c81dcce3c5e7dec453cdTimo Sirainen struct istream_private *stream = &sstream->istream;
f23ede27743c1aa03eacbfc634d6a10de9110c91Timo Sirainen buffer_create_dynamic(default_pool, sstream->write_peak);
32ee977e189266744ef69ac4e832fd3111d6f949Timo Sirainen data = buffer_append_space_unsafe(sstream->membuf, sstream->write_peak);
47001341950b8588c5f3a96b75864dab48e279aeTimo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
47001341950b8588c5f3a96b75864dab48e279aeTimo Sirainen i_error("istream-seekable: read(%s) failed: %m", sstream->temp_path);
5fb3bff645380804c9db2510940c41db6b8fdb01Timo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
5fb3bff645380804c9db2510940c41db6b8fdb01Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
4bbd396aa6198c84f3f7763b6e8a63a26e97e141Timo Sirainen const unsigned char *data;
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen /* copy everything to temp file and use it as the stream */
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen /* need to read more */
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen /* save to our file */
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen i_error("istream-seekable: write_full(%s) failed: %m",
83bb013a99f0936995f9c7a1077822662d8fefdbTimo Sirainen if (i_stream_seekable_write_failed(sstream) < 0)
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen i_stream_seek(sstream->fd_input, stream->istream.v_offset);
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
1eb17e61d3d38372674aa0c55caedb0185a985f5Timo Siraineni_stream_seekable_stat(struct istream_private *stream, bool exact)
b0a901f1dbe9e05ac1c92a0974af6bce0274f31aTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
b0a901f1dbe9e05ac1c92a0974af6bce0274f31aTimo Sirainen /* we've already reached EOF and know the size */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* we want to know the full size of the file, so read until
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen we're finished */
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen } while ((ret = i_stream_seekable_read(stream)) > 0);
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen i_panic("i_stream_stat() used for non-blocking "
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen i_stream_skip(&stream->istream, stream->pos - stream->skip);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* using a file backed buffer, we can use real fstat() */
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
e63bdfedcf61e1a9ee21990140cbd0d0638da7e1Timo Sirainen /* buffer is completely in memory */
3dd0679b6f24be0287cc42d7a60bbf59cdf8b637Timo Sirainen stream->statbuf.st_size = sstream->membuf->used;
3dd0679b6f24be0287cc42d7a60bbf59cdf8b637Timo Sirainenstatic void i_stream_seekable_seek(struct istream_private *stream,
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen /* seeking backwards */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* we can't skip over data we haven't yet read and written to
a423d985ba7261661475811c22b21b80ec765a71Timo Sirainen i_stream_default_seek_nonseekable(stream, v_offset, mark);
e3aeeb634245e80d4f643f8d2eea11d6b72336d8Timo Siraineni_streams_merge(struct istream *input[], size_t max_buffer_size,
e3aeeb634245e80d4f643f8d2eea11d6b72336d8Timo Sirainen int (*fd_callback)(const char **path_r, void *context),
b19a1420da0618a10edf67c2cfd13c8c8633057aTimo Sirainen const unsigned char *data;
e050e5c9b1688765f1fdfce9b7141f7b614383fdTimo Sirainen unsigned int count;
d756ebcfa96bd7cff02097c8f26df9df368b81b1Timo Sirainen /* if any of the streams isn't blocking, set ourself also nonblocking */
e015e2f7e7f48874495f9df8b0dd192b7ffcb5ccTimo Sirainen for (count = 0; input[count] != NULL; count++) {
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen sstream->membuf = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen sstream->istream.max_buffer_size = max_buffer_size;
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen sstream->input = i_new(struct istream *, count + 1);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen memcpy(sstream->input, input, sizeof(*input) * count);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* initialize our buffer from first stream's pending data */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen sstream->istream.iostream.close = i_stream_seekable_close;
ff7056842f14fd3b30a2d327dfab165b9d15dd30Timo Sirainen sstream->istream.iostream.destroy = i_stream_seekable_destroy;
a27e065f1a1f91c7fbdf7c2ea1c387441af0cbb3Timo Sirainen sstream->istream.iostream.set_max_buffer_size =
ebe6df72f1309135f02b6a4d2aef1e81a073f91cTimo Sirainen sstream->istream.read = i_stream_seekable_read;
910fa4e4204a73d3d24c03f3059dd24e727ca057Timo Sirainen sstream->istream.stat = i_stream_seekable_stat;
4bbd396aa6198c84f3f7763b6e8a63a26e97e141Timo Sirainen sstream->istream.seek = i_stream_seekable_seek;
unsigned int count;
return FALSE;
return TRUE;
struct istream *
void *context)
int fd;
return fd;
struct istream *
const char *temp_path_prefix)
return stream;