istream-seekable.c revision 7efee0bb408b0d5253e41997857bdda57855cdc7
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen/* Copyright (C) 2005 Timo Sirainen */
c25356d5978632df6203437e1953bcb29e0c736fTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "lib.h"
e7ca5f820d6a1a8fe549a2966ac707a60e055ef4Timo Sirainen#include "buffer.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "hex-binary.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "randgen.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "write-full.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "istream-internal.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include "istream-seekable.h"
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include <sys/stat.h>
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include <fcntl.h>
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include <unistd.h>
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#include <time.h>
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen#define BUF_INITIAL_SIZE (1024*32)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainenstruct seekable_istream {
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct _istream istream;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen pool_t pool;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen size_t max_buffer_size;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen char *temp_prefix;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen uoff_t write_peak;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen buffer_t *buffer;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct istream **input, *cur_input;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct istream *fd_input;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen unsigned int cur_idx;
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen int fd;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen};
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainenstatic void _close(struct _iostream *stream __attr_unused__)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen{
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen unsigned int i;
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->fd = -1;
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen if (sstream->fd_input != NULL)
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen i_stream_close(sstream->fd_input);
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen i_stream_close(sstream->input[i]);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen}
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainenstatic void _destroy(struct _iostream *stream)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen{
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen unsigned int i;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (sstream->buffer != NULL)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen buffer_free(sstream->buffer);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (sstream->fd_input != NULL)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_stream_unref(sstream->fd_input);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_stream_unref(sstream->input[i]);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen p_free(sstream->pool, sstream->temp_prefix);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen pool_unref(sstream->pool);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen}
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainenstatic void _set_max_buffer_size(struct _iostream *stream, size_t max_size)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen{
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct seekable_istream *sstream = (struct seekable_istream *)stream;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen unsigned int i;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->max_buffer_size = max_size;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (sstream->fd_input != NULL)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_stream_set_max_buffer_size(sstream->fd_input, max_size);
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen for (i = 0; sstream->input[i] != NULL; i++)
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen i_stream_set_max_buffer_size(sstream->input[i], max_size);
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen}
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainenstatic int copy_to_temp_file(struct seekable_istream *sstream)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen{
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen unsigned char randbuf[8];
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen const char *path;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen struct stat st;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen int fd;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen /* create a temporary file */
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen for (;;) {
e7ca5f820d6a1a8fe549a2966ac707a60e055ef4Timo Sirainen random_fill_weak(randbuf, sizeof(randbuf));
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen path = t_strconcat(sstream->temp_prefix, ".",
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen dec2str(time(NULL)), ".",
e622f05170aecc10e71f117d4d7baf55c5d12b77Timo Sirainen dec2str(getpid()), ".",
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen binary_to_hex(randbuf, sizeof(randbuf)),
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen NULL);
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen if (stat(path, &st) == 0)
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen continue;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (errno != ENOENT) {
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_error("stat(%s) failed: %m", path);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen return -1;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen }
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen fd = open(path, O_RDWR | O_EXCL | O_CREAT, 0600);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (fd != -1)
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen break;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen if (errno != EEXIST) {
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen i_error("open(%s) failed: %m", path);
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen return -1;
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen }
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen }
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen /* we just want the fd, unlink it */
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen if (unlink(path) < 0) {
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen /* shouldn't happen.. */
e7ca5f820d6a1a8fe549a2966ac707a60e055ef4Timo Sirainen i_error("unlink(%s) failed: %m", path);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen (void)close(fd);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen return -1;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen }
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen /* copy our currently read buffer to it */
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen if (write_full(fd, sstream->buffer->data, sstream->buffer->used) < 0) {
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_error("write_full(%s) failed: %m", path);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen (void)close(fd);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen return -1;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen }
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->write_peak = sstream->buffer->used;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
2f122b4db3f0d4eeb59ff9d306e54b2009d72cf9Timo Sirainen buffer_free(sstream->buffer);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->buffer = NULL;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->fd = fd;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen sstream->fd_input =
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen i_stream_create_file(fd, sstream->pool,
1b21ddb1b3f7f916627db312046bcded07627ee8Timo Sirainen sstream->max_buffer_size, TRUE);
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen return 0;
7fb70daba4e571eab5b64f496d20b9e37e31141bTimo Sirainen}
static ssize_t read_more(struct seekable_istream *sstream)
{
size_t size;
ssize_t ret;
if (sstream->cur_input == NULL) {
sstream->istream.istream.eof = TRUE;
return -1;
}
while ((ret = i_stream_read(sstream->cur_input)) < 0) {
if (!sstream->cur_input->eof) {
/* error */
sstream->istream.istream.stream_errno =
sstream->cur_input->stream_errno;
return -1;
}
/* go to next stream */
sstream->cur_input = sstream->input[sstream->cur_idx++];
if (sstream->cur_input == NULL) {
/* last one, EOF */
sstream->istream.istream.eof = TRUE;
return -1;
}
/* see if stream has pending data */
(void)i_stream_get_data(sstream->cur_input, &size);
if (size != 0)
return size;
}
return ret;
}
static int read_from_buffer(struct seekable_istream *sstream, ssize_t *ret)
{
struct _istream *stream = &sstream->istream;
const unsigned char *data;
size_t size, pos, offset;
if (stream->istream.v_offset +
(stream->pos - stream->skip) >= sstream->buffer->used) {
/* need to read more */
if (sstream->buffer->used >= sstream->max_buffer_size)
return FALSE;
/* read more to buffer */
*ret = read_more(sstream);
if (*ret <= 0)
return TRUE;
/* we should have more now. */
data = i_stream_get_data(sstream->cur_input, &size);
buffer_append(sstream->buffer, data, size);
i_stream_skip(sstream->cur_input, size);
}
offset = stream->istream.v_offset;
stream->buffer = CONST_PTR_OFFSET(sstream->buffer->data, offset);
pos = sstream->buffer->used - offset;
*ret = pos - stream->pos;
stream->pos = pos;
return TRUE;
}
static ssize_t _read(struct _istream *stream)
{
struct seekable_istream *sstream = (struct seekable_istream *)stream;
const unsigned char *data;
size_t size, pos;
ssize_t ret;
stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
stream->pos -= stream->skip;
stream->skip = 0;
if (sstream->buffer != NULL) {
if (read_from_buffer(sstream, &ret))
return ret;
/* copy everything to temp file and use it as the stream */
if (copy_to_temp_file(sstream) < 0) {
i_stream_close(&stream->istream);
return -1;
}
i_assert(sstream->buffer == NULL);
}
while (stream->istream.v_offset + stream->pos >= sstream->write_peak) {
/* need to read more */
ret = read_more(sstream);
if (ret <= 0)
return ret;
/* save to our file */
data = i_stream_get_data(sstream->cur_input, &size);
if (write_full(sstream->fd, data, size) < 0) {
i_error("write_full(%s...) failed: %m",
sstream->temp_prefix);
i_stream_close(&stream->istream);
return -1;
}
i_stream_sync(sstream->fd_input);
i_stream_skip(sstream->cur_input, size);
sstream->write_peak += size;
}
i_stream_seek(sstream->fd_input, stream->istream.v_offset);
ret = i_stream_read(sstream->fd_input);
if (ret <= 0) {
stream->istream.eof = sstream->fd_input->eof;
stream->istream.stream_errno =
sstream->fd_input->stream_errno;
}
stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
stream->pos -= stream->skip;
stream->skip = 0;
ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
(ret == 0 ? 0 : -1);
stream->pos = pos;
return ret;
}
static void _seek(struct _istream *stream, uoff_t v_offset)
{
stream->istream.stream_errno = 0;
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
}
static const struct stat *_stat(struct _istream *stream)
{
struct seekable_istream *sstream = (struct seekable_istream *)stream;
uoff_t old_offset;
ssize_t ret;
if (sstream->buffer != NULL) {
old_offset = stream->istream.v_offset;
do {
i_stream_skip(&stream->istream, stream->skip);
} while ((ret = _read(stream)) > 0);
if (ret == 0) {
i_panic("i_stream_stat() used for non-blocking "
"seekable stream");
}
i_stream_seek(&stream->istream, old_offset);
}
if (sstream->fd_input != NULL)
return i_stream_stat(sstream->fd_input);
stream->statbuf.st_size = sstream->buffer->used;
return &stream->statbuf;
}
struct istream *
i_stream_create_seekable(struct istream *input[], pool_t pool,
size_t max_buffer_size, const char *temp_prefix)
{
struct seekable_istream *sstream;
const unsigned char *data;
unsigned int count;
size_t size;
for (count = 0; input[count] != NULL; count++)
i_stream_ref(input[count]);
i_assert(count != 0);
pool_ref(pool);
sstream = p_new(pool, struct seekable_istream, 1);
sstream->pool = pool;
sstream->temp_prefix = p_strdup(pool, temp_prefix);
sstream->buffer = buffer_create_dynamic(pool, BUF_INITIAL_SIZE);
sstream->max_buffer_size = max_buffer_size;
sstream->input = p_new(pool, struct istream *, count + 1);
memcpy(sstream->input, input, sizeof(*input) * count);
sstream->cur_input = sstream->input[0];
/* initialize our buffer from first stream's pending data */
data = i_stream_get_data(sstream->cur_input, &size);
buffer_append(sstream->buffer, data, size);
i_stream_skip(sstream->cur_input, size);
sstream->istream.iostream.close = _close;
sstream->istream.iostream.destroy = _destroy;
sstream->istream.iostream.set_max_buffer_size = _set_max_buffer_size;
sstream->istream.read = _read;
sstream->istream.seek = _seek;
sstream->istream.stat = _stat;
return _i_stream_create(&sstream->istream, pool, -1, 0);
}