bcb4e51a409d94ae670de96afb8483a4f7855294Stephan Bosch/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "lib.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "buffer.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "str.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "iostream-pump.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "istream.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include "ostream.h"
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#include <unistd.h>
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi#undef iostream_pump_set_completion_callback
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistruct iostream_pump {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi struct istream *input;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi struct ostream *output;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi struct io *io;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi unsigned int ref;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi iostream_pump_callback_t *callback;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi void *context;
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen bool waiting_output;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi bool completed;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi};
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistatic
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_copy(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi enum ostream_send_istream_result res;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
84b746f56209d4a85af73cd26850e75f519ae0b0Timo Sirainen o_stream_cork(pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi size_t old_size = o_stream_get_max_buffer_size(pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_set_max_buffer_size(pump->output,
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi I_MIN(IO_BLOCK_SIZE,
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_get_max_buffer_size(pump->output)));
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi res = o_stream_send_istream(pump->output, pump->input);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_set_max_buffer_size(pump->output, old_size);
84b746f56209d4a85af73cd26850e75f519ae0b0Timo Sirainen o_stream_uncork(pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi switch(res) {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen io_remove(&pump->io);
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR, pump->context);
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen return;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi io_remove(&pump->io);
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, pump->context);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen pump->waiting_output = TRUE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi io_remove(&pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen pump->waiting_output = FALSE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi io_remove(&pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* flush it */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi switch (o_stream_flush(pump->output)) {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case -1:
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, pump->context);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi break;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case 0:
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen pump->waiting_output = TRUE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->completed = TRUE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi break;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi default:
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi break;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen pump->waiting_output = FALSE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_unreached();
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistatic
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomiint iostream_pump_flush(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi int ret;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi if ((ret = o_stream_flush(pump->output)) <= 0) {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi if (ret < 0)
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, pump->context);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return ret;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen pump->waiting_output = FALSE;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi if (pump->completed) {
39435f00a35a276d329283179b3e7e0351482939Timo Sirainen pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return 1;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi if (pump->io == NULL) {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi io_set_pending(pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return ret;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistruct iostream_pump *
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomiiostream_pump_create(struct istream *input, struct ostream *output)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(input != NULL &&
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi output != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* ref streams */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_stream_ref(input);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_ref(output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* create pump */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi struct iostream_pump *pump = i_new(struct iostream_pump, 1);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->input = input;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->output = output;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->ref = 1;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return pump;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_start(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump->callback != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* add flush handler */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* make IO objects */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi /* make sure we do first read right away */
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi io_set_pending(pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistruct istream *iostream_pump_get_input(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return pump->input;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomistruct ostream *iostream_pump_get_output(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi return pump->output;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_set_completion_callback(struct iostream_pump *pump,
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi iostream_pump_callback_t *callback, void *context)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->callback = callback;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->context = context;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_ref(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL && pump->ref > 0);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi pump->ref++;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_unref(struct iostream_pump **pump_r)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump_r != NULL && *pump_r != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi struct iostream_pump *pump = *pump_r;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi *pump_r = NULL;
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump->ref > 0);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi if (--pump->ref == 0) {
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi iostream_pump_stop(pump);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_unref(&pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_stream_unref(&pump->input);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_free(pump);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi }
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_stop(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_unset_flush_callback(pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
5f1d689131a75c39f064cbd4202373e7edf78f18Josef 'Jeff' Sipek io_remove(&pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainenbool iostream_pump_is_waiting_output(struct iostream_pump *pump)
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen{
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen return pump->waiting_output;
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen}
fe4a5467c998dfd79a071416068ca59d5a6a388fTimo Sirainen
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomivoid iostream_pump_switch_ioloop(struct iostream_pump *pump)
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi{
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_assert(pump != NULL);
3bc5b80ae9075e5d940616553f81e8acdd4bd88fTimo Sirainen if (pump->io != NULL)
3bc5b80ae9075e5d940616553f81e8acdd4bd88fTimo Sirainen pump->io = io_loop_move_io(&pump->io);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi o_stream_switch_ioloop(pump->output);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi i_stream_switch_ioloop(pump->input);
08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4Aki Tuomi}