iostream-pump.c revision fe4a5467c998dfd79a071416068ca59d5a6a388f
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen/* Copyright (c) 2002-2017 Dovecot authors, see the included COPYING file
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "lib.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "buffer.h"
0add8c99ca65e56dbf613595fc37c41aafff3f7fTimo Sirainen#include "str.h"
55a210942dc7da58b2fd0b11bed8da6b030af5c1Timo Sirainen#include "iostream-pump.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "istream.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include "ostream.h"
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen#include <unistd.h>
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0add8c99ca65e56dbf613595fc37c41aafff3f7fTimo Sirainen#undef iostream_pump_set_completion_callback
40ef82c46f6652412b068ebcdac7c3e74840a284Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstruct iostream_pump {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct istream *input;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct ostream *output;
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen struct io *io;
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen unsigned int ref;
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen iostream_pump_callback_t *callback;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen void *context;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen bool waiting_output;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen bool completed;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen};
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstatic
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenvoid iostream_pump_copy(struct iostream_pump *pump)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen enum ostream_send_istream_result res;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen o_stream_cork(pump->output);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen size_t old_size = o_stream_get_max_buffer_size(pump->output);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_set_max_buffer_size(pump->output,
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen I_MIN(IO_BLOCK_SIZE,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_get_max_buffer_size(pump->output)));
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen res = o_stream_send_istream(pump->output, pump->input);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_set_max_buffer_size(pump->output, old_size);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_uncork(pump->output);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen switch(res) {
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
659fe5d24825b160cae512538088020d97a60239Timo Sirainen io_remove(&pump->io);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->callback(FALSE, pump->context);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
659fe5d24825b160cae512538088020d97a60239Timo Sirainen pump->waiting_output = TRUE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen io_remove(&pump->io);
d8b77aef97e89f1ccc5cbdaef77be9052279e35fTimo Sirainen return;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
d8b77aef97e89f1ccc5cbdaef77be9052279e35fTimo Sirainen pump->waiting_output = FALSE;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen io_remove(&pump->io);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* flush it */
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen switch (o_stream_flush(pump->output)) {
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen case -1:
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen pump->callback(FALSE, pump->context);
56f45b3f3ae20e5c933701f4657dda5ef1916855Timo Sirainen break;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen case 0:
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen pump->waiting_output = TRUE;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen pump->completed = TRUE;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen break;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen default:
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen pump->callback(TRUE, pump->context);
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen break;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen }
a928e7efabb1672b1476e597106d4b4b81ac6f3cTimo Sirainen return;
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
e06c0b65c16ccce69bbee009ead14d7d3d17a256Timo Sirainen pump->waiting_output = FALSE;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen return;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen }
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen i_unreached();
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen}
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainenstatic
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainenint iostream_pump_flush(struct iostream_pump *pump)
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen{
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen int ret;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen if ((ret = o_stream_flush(pump->output)) <= 0) {
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen if (ret < 0)
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen pump->callback(FALSE, pump->context);
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen return ret;
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen }
5a07b37a9df398b5189c14872a600384208ab74bTimo Sirainen pump->waiting_output = FALSE;
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen if (pump->completed) {
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen pump->callback(TRUE, pump->context);
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen return 1;
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen }
7797aa2479e99aeb71057b7a2584b2cb72e4d3f8Timo Sirainen
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen if (pump->io == NULL) {
a928e7efabb1672b1476e597106d4b4b81ac6f3cTimo Sirainen pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen io_set_pending(pump->io);
1175f27441385a7011629f295f42708f9a3a4ffcTimo Sirainen }
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return ret;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen}
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstruct iostream_pump *
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Siraineniostream_pump_create(struct istream *input, struct ostream *output)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_assert(input != NULL &&
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen output != NULL);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* ref streams */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_stream_ref(input);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_ref(output);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* create pump */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen struct iostream_pump *pump = i_new(struct iostream_pump, 1);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->input = input;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->output = output;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->ref = 1;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return pump;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen}
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenvoid iostream_pump_start(struct iostream_pump *pump)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_assert(pump != NULL);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_assert(pump->callback != NULL);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* add flush handler */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* make IO objects */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen /* make sure we do first read right away */
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen io_set_pending(pump->io);
93b29720c5141f787bd1861796867e4595c9d084Timo Sirainen}
93b29720c5141f787bd1861796867e4595c9d084Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstruct istream *iostream_pump_get_input(struct iostream_pump *pump)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
8a3d609fdd84f5938c82e8e7eeb84a24ab41b317Timo Sirainen i_assert(pump != NULL);
8a3d609fdd84f5938c82e8e7eeb84a24ab41b317Timo Sirainen return pump->input;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen}
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenstruct ostream *iostream_pump_get_output(struct iostream_pump *pump)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_assert(pump != NULL);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen return pump->output;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen}
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainenvoid iostream_pump_set_completion_callback(struct iostream_pump *pump,
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen iostream_pump_callback_t *callback, void *context)
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen{
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen i_assert(pump != NULL);
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->callback = callback;
0cb2e8eb55e70f8ebe1e8349bdf49e4cbe5d8834Timo Sirainen pump->context = context;
}
void iostream_pump_ref(struct iostream_pump *pump)
{
i_assert(pump != NULL && pump->ref > 0);
pump->ref++;
}
void iostream_pump_unref(struct iostream_pump **pump_r)
{
i_assert(pump_r != NULL && *pump_r != NULL);
struct iostream_pump *pump = *pump_r;
*pump_r = NULL;
i_assert(pump->ref > 0);
if (--pump->ref == 0) {
iostream_pump_stop(pump);
o_stream_unref(&pump->output);
i_stream_unref(&pump->input);
i_free(pump);
}
}
void iostream_pump_stop(struct iostream_pump *pump)
{
i_assert(pump != NULL);
o_stream_unset_flush_callback(pump->output);
io_remove(&pump->io);
}
bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
{
return pump->waiting_output;
}
void iostream_pump_switch_ioloop(struct iostream_pump *pump)
{
i_assert(pump != NULL);
if (pump->io != NULL)
pump->io = io_loop_move_io(&pump->io);
o_stream_switch_ioloop(pump->output);
i_stream_switch_ioloop(pump->input);
}