istream-seekable.c revision 3e55775bc7a37ebc05e06c04cafb32eee9888e87
45312f52ff3a3d4c137447be4c7556500c2f8bf2Timo Sirainen/* Copyright (c) 2005-2017 Dovecot authors, see the included COPYING file */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "lib.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "buffer.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "str.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "memarea.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "read-full.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "write-full.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "safe-mkstemp.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "istream-private.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "istream-concat.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include "istream-seekable.h"
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#include <unistd.h>
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen#define BUF_INITIAL_SIZE (1024*32)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstruct seekable_istream {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct istream_private istream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen char *temp_path;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen uoff_t write_peak;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen uoff_t size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t buffer_peak;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen int (*fd_callback)(const char **path_r, void *context);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen void *context;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct istream **input, *cur_input;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct istream *fd_input;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen unsigned int cur_idx;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen int fd;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen bool free_context;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen};
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream,
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen bool close_parent ATTR_UNUSED)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->fd = -1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (sstream->fd_input != NULL)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_close(sstream->fd_input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic void unref_streams(struct seekable_istream *sstream)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen unsigned int i;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_unref(&sstream->input[i]);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen i_stream_free_buffer(&sstream->istream);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_unref(&sstream->fd_input);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen unref_streams(sstream);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (sstream->free_context)
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_free(sstream->context);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_free(sstream->temp_path);
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen i_free(sstream->input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic void
beffc30d933c5e134c45cc871852a8427eba7e70Timo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t max_size)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen unsigned int i;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->istream.max_buffer_size = max_size;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (sstream->fd_input != NULL)
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen struct istream_private *stream = &sstream->istream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen const char *path;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen const unsigned char *buffer;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t size;
7026c16186f543e11af12b8b87f396006db93297Timo Sirainen int fd;
7026c16186f543e11af12b8b87f396006db93297Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen fd = sstream->fd_callback(&path, sstream->context);
7026c16186f543e11af12b8b87f396006db93297Timo Sirainen if (fd == -1)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return -1;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen /* copy our currently read buffer to it */
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_assert(stream->pos <= sstream->buffer_peak);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (write_full(fd, stream->buffer, sstream->buffer_peak) < 0) {
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (!ENOSPACE(errno))
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_error("istream-seekable: write_full(%s) failed: %m", path);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_close_fd(&fd);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen return -1;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen }
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->temp_path = i_strdup(path);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->write_peak = sstream->buffer_peak;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->fd = fd;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->fd_input = i_stream_create_fd_autoclose(&fd,
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen I_MAX(stream->pos, sstream->istream.max_buffer_size));
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_stream_set_name(sstream->fd_input, t_strdup_printf(
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen "(seekable temp-istream for: %s)", i_stream_get_name(&stream->istream)));
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* read back the data we just had in our buffer */
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen for (;;) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen buffer = i_stream_get_data(sstream->fd_input, &size);
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (size >= stream->pos)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen break;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen ssize_t ret;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if ((ret = i_stream_read(sstream->fd_input)) <= 0) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(ret != 0);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(ret != -2);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_error("istream-seekable: Couldn't read back "
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen "in-memory input %s: %s",
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_get_name(&stream->istream),
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_get_error(sstream->fd_input));
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_destroy(&sstream->fd_input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_close_fd(&sstream->fd);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return -1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* Set the max buffer size only after we've already read everything
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen into memory. For example with istream-data it's possible that
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen more data exists in buffer than max_buffer_size. */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input,
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->istream.max_buffer_size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->buffer = buffer;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen i_stream_free_buffer(&sstream->istream);
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen return 0;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic ssize_t read_more(struct seekable_istream *sstream)
c09f9f95db314e7482c95e502e1c56ed6c555797Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen ssize_t ret;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen if (sstream->cur_input == NULL) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->istream.istream.eof = TRUE;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return -1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen while ((ret = i_stream_read(sstream->cur_input)) == -1) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (sstream->cur_input->stream_errno != 0) {
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen io_stream_set_error(&sstream->istream.iostream,
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen "read(%s) failed: %s",
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_get_name(sstream->cur_input),
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen i_stream_get_error(sstream->cur_input));
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen sstream->istream.istream.eof = TRUE;
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen sstream->istream.istream.stream_errno =
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->cur_input->stream_errno;
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen return -1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* go to next stream */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->cur_input = sstream->input[sstream->cur_idx++];
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (sstream->cur_input == NULL) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* last one, EOF */
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen sstream->size = sstream->istream.istream.v_offset;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->istream.istream.eof = TRUE;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen unref_streams(sstream);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return -1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* see if stream has pending data */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size = i_stream_get_data_size(sstream->cur_input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (size != 0)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return ret;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct istream_private *stream = &sstream->istream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen const unsigned char *data;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t size, avail_size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (stream->pos < sstream->buffer_peak) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* This could be the first read() or we could have already
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen seeked backwards. */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(stream->pos == 0 && stream->skip == 0);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->skip = stream->istream.v_offset;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->pos = sstream->buffer_peak;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size = stream->pos - stream->skip;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen } else {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* need to read more */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(stream->pos == sstream->buffer_peak);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size = sstream->cur_input == NULL ? 0 :
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_get_data_size(sstream->cur_input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (size == 0) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* read more to buffer */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen *ret_r = read_more(sstream);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (*ret_r == 0 || *ret_r == -1)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return TRUE;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* we should have more now. */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen data = i_stream_get_data(sstream->cur_input, &size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(size > 0);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen /* change skip to 0 temporarily so i_stream_try_alloc() won't try to
beffc30d933c5e134c45cc871852a8427eba7e70Timo Sirainen compress the buffer. */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t old_skip = stream->skip;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->skip = 0;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen bool have_space = i_stream_try_alloc(stream, size, &avail_size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->skip = old_skip;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (!have_space)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return FALSE;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
7026c16186f543e11af12b8b87f396006db93297Timo Sirainen if (size > avail_size)
7026c16186f543e11af12b8b87f396006db93297Timo Sirainen size = avail_size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen memcpy(stream->w_buffer + stream->pos, data, size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->pos += size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen sstream->buffer_peak += size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_skip(sstream->cur_input, size);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen *ret_r = size;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(*ret_r > 0);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return TRUE;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainenstatic int i_stream_seekable_write_failed(struct seekable_istream *sstream)
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen{
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen struct istream_private *stream = &sstream->istream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen void *data;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(sstream->fd != -1);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->max_buffer_size = (size_t)-1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen data = i_stream_alloc(stream, sstream->write_peak);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_error("istream-seekable: read(%s) failed: %m", sstream->temp_path);
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen memarea_unref(&stream->memarea);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return -1;
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen }
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen i_stream_destroy(&sstream->fd_input);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_close_fd(&sstream->fd);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen i_free_and_null(sstream->temp_path);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return 0;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen}
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainenstatic ssize_t i_stream_seekable_read(struct istream_private *stream)
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen{
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen const unsigned char *data;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen size_t size, pos;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen ssize_t ret;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (sstream->fd == -1) {
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (read_from_buffer(sstream, &ret))
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return ret;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen /* copy everything to temp file and use it as the stream */
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (copy_to_temp_file(sstream) < 0) {
4ce6338bf945cccfff9e4ce7cc6aa2246851b84aTimo Sirainen stream->max_buffer_size = (size_t)-1;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen if (!read_from_buffer(sstream, &ret))
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_unreached();
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen return ret;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_assert(sstream->fd != -1);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen }
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->pos -= stream->skip;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen stream->skip = 0;
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen
da2aa032ccfa8e7e4a4380ef738014549f4d2c2dTimo Sirainen i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
ab7b5b9286104974c2a572a499ccf8b56c5d2955Timo Sirainen /* need to read more */
9511a40d933181045343110c8101b75887062aaeTimo Sirainen if (sstream->cur_input == NULL ||
f89cb43088c8b46d12d66ac924724b53ab14ce66Timo Sirainen i_stream_get_data_size(sstream->cur_input) == 0) {
ret = read_more(sstream);
if (ret == -1 || ret == 0)
return ret;
}
/* save to our file */
data = i_stream_get_data(sstream->cur_input, &size);
ret = write(sstream->fd, data, size);
if (ret <= 0) {
if (ret < 0 && !ENOSPACE(errno)) {
i_error("istream-seekable: write_full(%s) failed: %m",
sstream->temp_path);
}
if (i_stream_seekable_write_failed(sstream) < 0)
return -1;
if (!read_from_buffer(sstream, &ret))
i_unreached();
return ret;
}
i_stream_sync(sstream->fd_input);
i_stream_skip(sstream->cur_input, ret);
sstream->write_peak += ret;
}
i_stream_seek(sstream->fd_input, stream->istream.v_offset);
ret = i_stream_read(sstream->fd_input);
if (ret <= 0) {
stream->istream.eof = sstream->fd_input->eof;
stream->istream.stream_errno =
sstream->fd_input->stream_errno;
} else {
ret = -2;
}
stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
stream->pos -= stream->skip;
stream->skip = 0;
ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
stream->pos = pos;
return ret;
}
static int
i_stream_seekable_stat(struct istream_private *stream, bool exact)
{
struct seekable_istream *sstream = (struct seekable_istream *)stream;
const struct stat *st;
uoff_t old_offset;
ssize_t ret;
if (sstream->size != (uoff_t)-1) {
/* we've already reached EOF and know the size */
stream->statbuf.st_size = sstream->size;
return 0;
}
/* we want to know the full size of the file, so read until
we're finished */
old_offset = stream->istream.v_offset;
do {
i_stream_skip(&stream->istream,
stream->pos - stream->skip);
} while ((ret = i_stream_seekable_read(stream)) > 0);
if (ret == 0) {
i_panic("i_stream_stat() used for non-blocking "
"seekable stream %s offset %"PRIuUOFF_T,
i_stream_get_name(sstream->cur_input),
sstream->cur_input->v_offset);
}
i_stream_skip(&stream->istream, stream->pos - stream->skip);
i_stream_seek(&stream->istream, old_offset);
unref_streams(sstream);
if (stream->istream.stream_errno != 0)
return -1;
if (sstream->fd_input != NULL) {
/* using a file backed buffer, we can use real fstat() */
if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
return -1;
stream->statbuf = *st;
} else {
/* buffer is completely in memory */
i_assert(sstream->fd == -1);
stream->statbuf.st_size = stream->pos;
}
return 0;
}
static void i_stream_seekable_seek(struct istream_private *stream,
uoff_t v_offset, bool mark)
{
if (v_offset <= stream->istream.v_offset) {
/* seeking backwards */
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
} else {
/* we can't skip over data we haven't yet read and written to
our buffer/temp file */
i_stream_default_seek_nonseekable(stream, v_offset, mark);
}
}
struct istream *
i_streams_merge(struct istream *input[], size_t max_buffer_size,
int (*fd_callback)(const char **path_r, void *context),
void *context) ATTR_NULL(4)
{
struct seekable_istream *sstream;
const unsigned char *data;
unsigned int count;
size_t size;
bool blocking = TRUE;
i_assert(max_buffer_size > 0);
/* if any of the streams isn't blocking, set ourself also nonblocking */
for (count = 0; input[count] != NULL; count++) {
if (!input[count]->blocking)
blocking = FALSE;
i_stream_ref(input[count]);
}
i_assert(count != 0);
sstream = i_new(struct seekable_istream, 1);
sstream->fd_callback = fd_callback;
sstream->context = context;
sstream->istream.max_buffer_size = max_buffer_size;
sstream->fd = -1;
sstream->size = (uoff_t)-1;
sstream->input = i_new(struct istream *, count + 1);
memcpy(sstream->input, input, sizeof(*input) * count);
sstream->cur_input = sstream->input[0];
sstream->istream.iostream.close = i_stream_seekable_close;
sstream->istream.iostream.destroy = i_stream_seekable_destroy;
sstream->istream.iostream.set_max_buffer_size =
i_stream_seekable_set_max_buffer_size;
sstream->istream.read = i_stream_seekable_read;
sstream->istream.stat = i_stream_seekable_stat;
sstream->istream.seek = i_stream_seekable_seek;
sstream->istream.istream.readable_fd = FALSE;
sstream->istream.istream.blocking = blocking;
sstream->istream.istream.seekable = TRUE;
(void)i_stream_create(&sstream->istream, NULL, -1);
/* initialize our buffer from first stream's pending data */
data = i_stream_get_data(sstream->cur_input, &size);
if (size > 0) {
memcpy(i_stream_alloc(&sstream->istream, size), data, size);
sstream->buffer_peak = size;
i_stream_skip(sstream->cur_input, size);
}
return &sstream->istream.istream;
}
static bool inputs_are_seekable(struct istream *input[])
{
unsigned int count;
for (count = 0; input[count] != NULL; count++) {
if (!input[count]->seekable)
return FALSE;
}
return TRUE;
}
struct istream *
i_stream_create_seekable(struct istream *input[],
size_t max_buffer_size,
int (*fd_callback)(const char **path_r, void *context),
void *context)
{
i_assert(max_buffer_size > 0);
/* If all input streams are seekable, use concat istream instead */
if (inputs_are_seekable(input))
return i_stream_create_concat(input);
return i_streams_merge(input, max_buffer_size, fd_callback, context);
}
static int seekable_fd_callback(const char **path_r, void *context)
{
char *temp_path_prefix = context;
string_t *path;
int fd;
path = t_str_new(128);
str_append(path, temp_path_prefix);
fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
if (fd == -1) {
i_error("istream-seekable: safe_mkstemp(%s) failed: %m", str_c(path));
return -1;
}
/* we just want the fd, unlink it */
if (i_unlink(str_c(path)) < 0) {
/* shouldn't happen.. */
i_close_fd(&fd);
return -1;
}
*path_r = str_c(path);
return fd;
}
struct istream *
i_stream_create_seekable_path(struct istream *input[],
size_t max_buffer_size,
const char *temp_path_prefix)
{
struct seekable_istream *sstream;
struct istream *stream;
i_assert(temp_path_prefix != NULL);
i_assert(max_buffer_size > 0);
if (inputs_are_seekable(input))
return i_stream_create_concat(input);
stream = i_stream_create_seekable(input, max_buffer_size,
seekable_fd_callback,
i_strdup(temp_path_prefix));
sstream = (struct seekable_istream *)stream->real_stream;
sstream->free_context = TRUE;
return stream;
}