istream-seekable.c revision 64ae2f0ba5b94092f9edd439905c3272719c23c3
76b43e4417bab52e913da39b5f5bc2a130d3f149Timo Sirainen/* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "lib.h"
e05a4c4136fec723f019bee8383103080203f127Timo Sirainen#include "buffer.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "close-keep-errno.h"
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen#include "hex-binary.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "randgen.h"
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen#include "write-full.h"
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen#include "istream-internal.h"
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen#include "istream-concat.h"
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen#include "istream-seekable.h"
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen
fdc557286bc9f92c5f3bb49096ff6e2bcec0ea79Timo Sirainen#include <sys/stat.h>
e05a4c4136fec723f019bee8383103080203f127Timo Sirainen#include <fcntl.h>
e05a4c4136fec723f019bee8383103080203f127Timo Sirainen#include <unistd.h>
e05a4c4136fec723f019bee8383103080203f127Timo Sirainen#include <time.h>
09c3a491f4f6ccebe290c7709bdc0d79a187610bTimo Sirainen
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen#define BUF_INITIAL_SIZE (1024*32)
1efb8b95e87ca3940cc74f9553a3c8ca5d85ead3Timo Sirainen
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainenstruct seekable_istream {
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen struct istream_private istream;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen char *temp_prefix;
24fc71a693331ffe77e2b6d81c70aca6fa055e47Timo Sirainen uoff_t write_peak;
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainen
e05a4c4136fec723f019bee8383103080203f127Timo Sirainen buffer_t *buffer;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct istream **input, *cur_input;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct istream *fd_input;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen unsigned int cur_idx;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen int fd;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen};
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic void i_stream_seekable_close(struct iostream_private *stream)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
01937f71b3ae0d5b30b813372f44a3e7e86c89dcTimo Sirainen unsigned int i;
01937f71b3ae0d5b30b813372f44a3e7e86c89dcTimo Sirainen
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen sstream->fd = -1;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (sstream->fd_input != NULL)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_stream_close(sstream->fd_input);
53ea5cb26b07671cfd3eaecd9383f80fb6332ff6Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen i_stream_close(sstream->input[i]);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen}
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
53ea5cb26b07671cfd3eaecd9383f80fb6332ff6Timo Sirainenstatic void i_stream_seekable_destroy(struct iostream_private *stream)
8a3d609fdd84f5938c82e8e7eeb84a24ab41b317Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen unsigned int i;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (sstream->buffer != NULL)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen buffer_free(&sstream->buffer);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen if (sstream->fd_input != NULL)
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen i_stream_unref(&sstream->fd_input);
01937f71b3ae0d5b30b813372f44a3e7e86c89dcTimo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
01937f71b3ae0d5b30b813372f44a3e7e86c89dcTimo Sirainen i_stream_unref(&sstream->input[i]);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen i_free(sstream->temp_prefix);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen}
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainenstatic void
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Siraineni_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen size_t max_size)
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen{
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen unsigned int i;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
a2f250a332dfc1e6cd4ffd196c621eb9dbf7b8a1Timo Sirainen sstream->istream.max_buffer_size = max_size;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen if (sstream->fd_input != NULL)
013e3b3942e9550fde619a0b3ce6bdd04edc4268Timo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
013e3b3942e9550fde619a0b3ce6bdd04edc4268Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
013e3b3942e9550fde619a0b3ce6bdd04edc4268Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
a2f250a332dfc1e6cd4ffd196c621eb9dbf7b8a1Timo Sirainen}
db87d16551d1081ada01f787ea21aa3ed1402c31Timo Sirainen
db87d16551d1081ada01f787ea21aa3ed1402c31Timo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
db87d16551d1081ada01f787ea21aa3ed1402c31Timo Sirainen{
db87d16551d1081ada01f787ea21aa3ed1402c31Timo Sirainen unsigned char randbuf[8];
9293bf90039454f47e94e4ba3722a775cfa7d25cTimo Sirainen const char *path;
9293bf90039454f47e94e4ba3722a775cfa7d25cTimo Sirainen struct stat st;
db87d16551d1081ada01f787ea21aa3ed1402c31Timo Sirainen int fd;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
8e371a3ce32bd64288786855b8ce0cb63f19f7d1Timo Sirainen /* create a temporary file */
72898747f93d746d6d110bec483a86c5fe3b4698Timo Sirainen for (;;) {
72898747f93d746d6d110bec483a86c5fe3b4698Timo Sirainen random_fill_weak(randbuf, sizeof(randbuf));
72898747f93d746d6d110bec483a86c5fe3b4698Timo Sirainen path = t_strconcat(sstream->temp_prefix,
72898747f93d746d6d110bec483a86c5fe3b4698Timo Sirainen dec2str(time(NULL)), ".",
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen dec2str(getpid()), ".",
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen binary_to_hex(randbuf, sizeof(randbuf)),
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen NULL);
4d2211dac61c615c5bdfd501ea54d46c89d41b0fTimo Sirainen if (stat(path, &st) == 0)
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen continue;
ccec5f82349eae44087900c0e64ed1fd5a1a6fcaTimo Sirainen
4d2211dac61c615c5bdfd501ea54d46c89d41b0fTimo Sirainen if (errno != ENOENT) {
4d2211dac61c615c5bdfd501ea54d46c89d41b0fTimo Sirainen i_error("stat(%s) failed: %m", path);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen return -1;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen }
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen fd = open(path, O_RDWR | O_EXCL | O_CREAT, 0600);
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen if (fd != -1)
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen break;
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen
ccec5f82349eae44087900c0e64ed1fd5a1a6fcaTimo Sirainen if (errno != EEXIST) {
cb5ff22eba16bc08b3fa2fc34bfc0f02d387b436Timo Sirainen i_error("creat(%s) failed: %m", path);
cb5ff22eba16bc08b3fa2fc34bfc0f02d387b436Timo Sirainen return -1;
ccec5f82349eae44087900c0e64ed1fd5a1a6fcaTimo Sirainen }
cb5ff22eba16bc08b3fa2fc34bfc0f02d387b436Timo Sirainen }
cb5ff22eba16bc08b3fa2fc34bfc0f02d387b436Timo Sirainen
4d2211dac61c615c5bdfd501ea54d46c89d41b0fTimo Sirainen /* we just want the fd, unlink it */
cb5ff22eba16bc08b3fa2fc34bfc0f02d387b436Timo Sirainen if (unlink(path) < 0) {
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen /* shouldn't happen.. */
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen i_error("unlink(%s) failed: %m", path);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen close_keep_errno(fd);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return -1;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen /* copy our currently read buffer to it */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen if (write_full(fd, sstream->buffer->data, sstream->buffer->used) < 0) {
d6badc27cd6e8d3398877b6766cb0aaeef3a7800Timo Sirainen i_error("write_full(%s) failed: %m", path);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen close_keep_errno(fd);
return -1;
}
sstream->write_peak = sstream->buffer->used;
buffer_free(&sstream->buffer);
sstream->fd = fd;
sstream->fd_input =
i_stream_create_fd(fd, sstream->istream.max_buffer_size, TRUE);
return 0;
}
static ssize_t read_more(struct seekable_istream *sstream)
{
size_t size;
ssize_t ret;
if (sstream->cur_input == NULL) {
sstream->istream.istream.eof = TRUE;
return -1;
}
while ((ret = i_stream_read(sstream->cur_input)) < 0) {
if (!sstream->cur_input->eof) {
/* full / error */
sstream->istream.istream.stream_errno =
sstream->cur_input->stream_errno;
return ret;
}
/* go to next stream */
sstream->cur_input = sstream->input[sstream->cur_idx++];
if (sstream->cur_input == NULL) {
/* last one, EOF */
sstream->istream.istream.eof = TRUE;
return -1;
}
/* see if stream has pending data */
(void)i_stream_get_data(sstream->cur_input, &size);
if (size != 0)
return size;
}
return ret;
}
static bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
{
struct istream_private *stream = &sstream->istream;
const unsigned char *data;
size_t size, pos, offset;
if (stream->istream.v_offset +
(stream->pos - stream->skip) >= sstream->buffer->used) {
/* need to read more */
if (sstream->buffer->used >= stream->max_buffer_size)
return FALSE;
/* read more to buffer */
*ret_r = read_more(sstream);
if (*ret_r <= 0)
return TRUE;
/* we should have more now. */
data = i_stream_get_data(sstream->cur_input, &size);
buffer_append(sstream->buffer, data, size);
i_stream_skip(sstream->cur_input, size);
}
offset = stream->istream.v_offset;
stream->buffer = CONST_PTR_OFFSET(sstream->buffer->data, offset);
pos = sstream->buffer->used - offset;
*ret_r = pos - stream->pos;
i_assert(*ret_r > 0);
stream->pos = pos;
return TRUE;
}
static ssize_t i_stream_seekable_read(struct istream_private *stream)
{
struct seekable_istream *sstream = (struct seekable_istream *)stream;
const unsigned char *data;
size_t size, pos;
ssize_t ret;
stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
stream->pos -= stream->skip;
stream->skip = 0;
if (sstream->buffer != NULL) {
if (read_from_buffer(sstream, &ret))
return ret;
/* copy everything to temp file and use it as the stream */
if (copy_to_temp_file(sstream) < 0) {
i_assert(errno != 0);
stream->istream.stream_errno = errno;
i_stream_close(&stream->istream);
return -1;
}
i_assert(sstream->buffer == NULL);
}
while (stream->istream.v_offset + stream->pos >= sstream->write_peak) {
/* need to read more */
ret = read_more(sstream);
if (ret <= 0)
return ret;
/* save to our file */
data = i_stream_get_data(sstream->cur_input, &size);
if (write_full(sstream->fd, data, size) < 0) {
i_assert(errno != 0);
stream->istream.stream_errno = errno;
i_error("write_full(%s...) failed: %m",
sstream->temp_prefix);
i_stream_close(&stream->istream);
return -1;
}
i_stream_sync(sstream->fd_input);
i_stream_skip(sstream->cur_input, size);
sstream->write_peak += size;
}
i_stream_seek(sstream->fd_input, stream->istream.v_offset);
ret = i_stream_read(sstream->fd_input);
if (ret <= 0) {
stream->istream.eof = sstream->fd_input->eof;
stream->istream.stream_errno =
sstream->fd_input->stream_errno;
}
stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
stream->pos -= stream->skip;
stream->skip = 0;
ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
stream->pos = pos;
return ret;
}
static void i_stream_seekable_seek(struct istream_private *stream,
uoff_t v_offset, bool mark ATTR_UNUSED)
{
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
}
static const struct stat *
i_stream_seekable_stat(struct istream_private *stream, bool exact)
{
struct seekable_istream *sstream = (struct seekable_istream *)stream;
uoff_t old_offset;
ssize_t ret;
if (sstream->buffer != NULL) {
/* we want to know the full size of the file, so read until
we're finished */
old_offset = stream->istream.v_offset;
do {
i_stream_skip(&stream->istream,
stream->pos - stream->skip);
} while ((ret = i_stream_seekable_read(stream)) > 0);
if (ret == 0) {
i_panic("i_stream_stat() used for non-blocking "
"seekable stream");
}
i_stream_skip(&stream->istream, stream->pos - stream->skip);
i_stream_seek(&stream->istream, old_offset);
}
if (sstream->fd_input != NULL) {
/* using a file backed buffer, we can use real fstat() */
return i_stream_stat(sstream->fd_input, exact);
} else {
/* buffer is completely in memory */
i_assert(sstream->buffer != NULL);
stream->statbuf.st_size = sstream->buffer->used;
return &stream->statbuf;
}
}
struct istream *
i_stream_create_seekable(struct istream *input[],
size_t max_buffer_size, const char *temp_prefix)
{
struct seekable_istream *sstream;
const unsigned char *data;
unsigned int count;
size_t size;
bool blocking = TRUE;
/* If all input streams are seekable, use concat istream instead */
for (count = 0; input[count] != NULL; count++) {
if (!input[count]->seekable)
break;
}
if (input[count] == NULL)
return i_stream_create_concat(input);
/* if any of the streams isn't blocking, set ourself also nonblocking */
for (count = 0; input[count] != NULL; count++) {
if (!input[count]->blocking)
blocking = FALSE;
i_stream_ref(input[count]);
}
i_assert(count != 0);
sstream = i_new(struct seekable_istream, 1);
sstream->temp_prefix = i_strdup(temp_prefix);
sstream->buffer = buffer_create_dynamic(default_pool, BUF_INITIAL_SIZE);
sstream->istream.max_buffer_size = max_buffer_size;
sstream->input = i_new(struct istream *, count + 1);
memcpy(sstream->input, input, sizeof(*input) * count);
sstream->cur_input = sstream->input[0];
/* initialize our buffer from first stream's pending data */
data = i_stream_get_data(sstream->cur_input, &size);
buffer_append(sstream->buffer, data, size);
i_stream_skip(sstream->cur_input, size);
sstream->istream.iostream.close = i_stream_seekable_close;
sstream->istream.iostream.destroy = i_stream_seekable_destroy;
sstream->istream.iostream.set_max_buffer_size =
i_stream_seekable_set_max_buffer_size;
sstream->istream.read = i_stream_seekable_read;
sstream->istream.seek = i_stream_seekable_seek;
sstream->istream.stat = i_stream_seekable_stat;
sstream->istream.istream.blocking = blocking;
sstream->istream.istream.seekable = TRUE;
return i_stream_create(&sstream->istream, NULL, -1);
}