test-iostream-pump.c revision 84b746f56209d4a85af73cd26850e75f519ae0b0
5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2016-2017 Dovecot authors, see the included COPYING file */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "test-lib.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "istream.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "ostream.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "buffer.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "str.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "ioloop.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "iostream-pump.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "istream-failure-at.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include "ostream-failure-at.h"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include <unistd.h>
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include <sys/types.h>
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#include <sys/socket.h>
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenunsigned char data[] = "hello, world";
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenvoid completed(bool success, int *u0)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen{
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen /* to somehow discern between error and success .. */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen (*u0) -= (success ? 1 : 2);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen io_loop_stop(current_ioloop);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen}
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenvoid failed(int *u0)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen{
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen *u0 = -1; /* ensure failure */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen io_loop_stop(current_ioloop);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen}
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstruct nonblock_ctx {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct istream *in;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct ostream *out;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen uoff_t pos, max_size;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen};
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic void pump_nonblocking_timeout(struct nonblock_ctx *ctx)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen{
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen switch (ctx->pos % 4) {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen case 0:
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen break;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen case 1:
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen /* allow more input */
5cba8fd913459269b521513e16e3afcbbd030ac5Timo Sirainen if (ctx->pos/4 == ctx->max_size+1)
5cba8fd913459269b521513e16e3afcbbd030ac5Timo Sirainen test_istream_set_allow_eof(ctx->in, TRUE);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen else
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen test_istream_set_size(ctx->in, ctx->pos/4);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_stream_set_input_pending(ctx->in, TRUE);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen break;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen case 2:
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen break;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen case 3: {
5cba8fd913459269b521513e16e3afcbbd030ac5Timo Sirainen /* allow more output. give always one byte less than the
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen input size so there's something in internal buffer. */
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen size_t size = ctx->pos/4;
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen if (size > 0)
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen test_ostream_set_max_output_size(ctx->out, size-1);
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen break;
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen }
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen }
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen ctx->pos++;
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen}
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainen
9a4e08ad8a4cd4ee70e0c47e3e2eb1ee9f9a818dTimo Sirainenstatic
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenconst char *run_pump(struct istream *in, struct ostream *out, int *counter, buffer_t *out_buffer)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen{
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct iostream_pump *pump;
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct ioloop *ioloop = io_loop_create();
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen io_loop_set_current(ioloop);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct nonblock_ctx ctx = { in, out, 0, 0 };
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen struct timeout *to2 = NULL;
d6b3cfd855c0eebed68be50d3111de1b5a6afeb0Timo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if (!in->blocking) {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen test_assert(i_stream_get_size(in, TRUE, &ctx.max_size) > 0);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen test_istream_set_size(in, 0);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen test_istream_set_allow_eof(in, FALSE);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen test_ostream_set_max_output_size(out, 0);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen to2 = timeout_add_short(0, pump_nonblocking_timeout, &ctx);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen }
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen pump = iostream_pump_create(in, out);
i_stream_unref(&in);
o_stream_unref(&out);
iostream_pump_set_completion_callback(pump, completed, counter);
iostream_pump_start(pump);
alarm(5);
struct timeout *to = timeout_add(3000, failed, counter);
io_loop_run(current_ioloop);
timeout_remove(&to);
timeout_remove(&to2);
alarm(0);
test_assert(*counter == 0);
if (!ctx.in->blocking && ctx.in->stream_errno != 0 &&
ctx.out->stream_errno == 0) {
/* input failed, finish flushing output */
test_ostream_set_max_output_size(ctx.out, (size_t)-1);
test_assert(o_stream_flush(ctx.out) > 0);
} else {
test_assert(o_stream_flush(ctx.out) != 0);
}
const char *ret = t_strdup(str_c(out_buffer));
iostream_pump_unref(&pump);
io_loop_destroy(&ioloop);
return ret;
}
static
void test_iostream_setup(bool block, struct istream **in_r,
struct ostream **out_r, buffer_t **out_buffer_r)
{
*out_buffer_r = buffer_create_dynamic(pool_datastack_create(), 128);
*in_r = test_istream_create_data(data, sizeof(data));
(*in_r)->blocking = block;
if (block)
*out_r = test_ostream_create(*out_buffer_r);
else
*out_r = test_ostream_create_nonblocking(*out_buffer_r, 1);
}
static
void test_iostream_pump_simple(bool block)
{
test_begin(t_strdup_printf("iostream_pump (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in;
struct ostream *out;
buffer_t *buffer;
test_iostream_setup(block, &in, &out, &buffer);
counter = 1;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "hello, world") == 0);
test_end();
}
static
void test_iostream_pump_failure_start_read(bool block)
{
test_begin(t_strdup_printf("iostream_pump failure start-read (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in_2;
struct ostream *out;
buffer_t *buffer;
test_iostream_setup(block, &in_2, &out, &buffer);
struct istream *in = i_stream_create_failure_at(in_2, 0, "test pump fail");
i_stream_unref(&in_2);
counter = 2;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "") == 0);
test_end();
}
static
void test_iostream_pump_failure_mid_read(bool block)
{
test_begin(t_strdup_printf("iostream_pump failure mid-read (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in_2;
struct ostream *out;
buffer_t *buffer;
test_iostream_setup(block, &in_2, &out, &buffer);
struct istream *in = i_stream_create_failure_at(in_2, 4, "test pump fail");
i_stream_unref(&in_2);
counter = 2;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "hell") == 0);
test_end();
}
static
void test_iostream_pump_failure_end_read(bool block)
{
test_begin(t_strdup_printf("iostream_pump failure mid-read (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in_2;
struct ostream *out;
buffer_t *buffer;
test_iostream_setup(block, &in_2, &out, &buffer);
struct istream *in = i_stream_create_failure_at_eof(in_2, "test pump fail");
i_stream_unref(&in_2);
counter = 2;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "hello, world") == 0);
test_end();
}
static
void test_iostream_pump_failure_start_write(bool block)
{
test_begin(t_strdup_printf("iostream_pump failure start-write (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in;
struct ostream *out_2;
buffer_t *buffer;
test_iostream_setup(block, &in, &out_2, &buffer);
struct ostream *out = o_stream_create_failure_at(out_2, 0, "test pump fail");
o_stream_unref(&out_2);
counter = 2;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "") == 0);
test_end();
}
static
void test_iostream_pump_failure_mid_write(bool block)
{
test_begin(t_strdup_printf("iostream_pump failure mid-write (%sblocking)", block ? "" : "non-"));
int counter;
struct istream *in;
struct ostream *out_2;
buffer_t *buffer;
test_iostream_setup(block, &in, &out_2, &buffer);
struct ostream *out = o_stream_create_failure_at(out_2, 4, "test pump fail");
o_stream_unref(&out_2);
counter = 2;
/* "hel" because the last byte is only in internal buffer */
test_assert(strcmp(run_pump(in, out, &counter, buffer), block ? "" : "hel") == 0);
test_end();
}
static
void test_iostream_pump_failure_end_write(bool block)
{
if (!block) {
/* we'll get flushes constantly */
return;
}
test_begin("iostream_pump failure end-write (blocking)");
int counter;
struct istream *in;
struct ostream *out_2;
buffer_t *buffer;
test_iostream_setup(block, &in, &out_2, &buffer);
struct ostream *out = o_stream_create_failure_at_flush(out_2, "test pump fail");
o_stream_unref(&out_2);
counter = 2;
test_assert(strcmp(run_pump(in, out, &counter, buffer), "hello, world") == 0);
test_end();
}
void test_iostream_pump(void)
{
T_BEGIN {
for(int i = 0; i < 2; i++) {
test_iostream_pump_simple(i < 1);
test_iostream_pump_failure_start_read(i < 1);
test_iostream_pump_failure_mid_read(i < 1);
test_iostream_pump_failure_end_read(i < 1);
test_iostream_pump_failure_start_write(i < 1);
test_iostream_pump_failure_mid_write(i < 1);
test_iostream_pump_failure_end_write(i < 1);
}
} T_END;
}