iostream-pump.c revision 3bc5b80ae9075e5d940616553f81e8acdd4bd88f
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher/* Copyright (c) 2002-2017 Dovecot authors, see the included COPYING file
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher */
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher#include "lib.h"
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher#include "buffer.h"
b355dcb54194f498921743ca33304eac35d89718Stephen Gallagher#include "str.h"
b355dcb54194f498921743ca33304eac35d89718Stephen Gallagher#include "iostream-pump.h"
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher#include "istream.h"
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher#include "ostream.h"
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher#include <unistd.h>
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
524ceecc11f3d458eb3c1cf1489c3ff6ccb22226Jakub Hrozek#undef iostream_pump_set_completion_callback
7a14e8f66c0e932fe2954d792614a3b61d444bd1Jakub Hrozek
7797e361155f7ce937085fd98e360469d7baf1b6Jakub Hrozekstruct iostream_pump {
64a424ec1b268427822c646f7781e26e56c197f6Jakub Hrozek struct istream *input;
2ea6196484055397cc4bc011c5960f790431fa9dStephen Gallagher struct ostream *output;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher struct io *io;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher unsigned int ref;
7a14e8f66c0e932fe2954d792614a3b61d444bd1Jakub Hrozek
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher iostream_pump_callback_t *callback;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher void *context;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher bool completed;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher};
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozekstatic
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozekvoid iostream_pump_copy(struct iostream_pump *pump)
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek{
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek enum ostream_send_istream_result res;
2ea6196484055397cc4bc011c5960f790431fa9dStephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_cork(pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher size_t old_size = o_stream_get_max_buffer_size(pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_set_max_buffer_size(pump->output,
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher I_MIN(IO_BLOCK_SIZE,
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_get_max_buffer_size(pump->output)));
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher res = o_stream_send_istream(pump->output, pump->input);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_set_max_buffer_size(pump->output, old_size);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_uncork(pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher switch(res) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
2ea6196484055397cc4bc011c5960f790431fa9dStephen Gallagher io_remove(&pump->io);
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek pump->callback(FALSE, pump->context);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher io_remove(&pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher io_remove(&pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher /* flush it */
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher switch (o_stream_flush(pump->output)) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case -1:
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->callback(FALSE, pump->context);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher break;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher case 0:
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher pump->completed = TRUE;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher break;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher default:
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher pump->callback(TRUE, pump->context);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher break;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher }
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
52261fe16203dec6e6f69177c6d0a810b47d073fStephen Gallagher return;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher }
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek i_unreached();
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek}
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozekstatic
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozekint iostream_pump_flush(struct iostream_pump *pump)
2ea6196484055397cc4bc011c5960f790431fa9dStephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher int ret;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher if ((ret = o_stream_flush(pump->output)) <= 0) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher if (ret < 0)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->callback(FALSE, pump->context);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return ret;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher }
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher if (pump->completed) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->callback(TRUE, pump->context);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return 1;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher }
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher if (pump->io == NULL) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
52261fe16203dec6e6f69177c6d0a810b47d073fStephen Gallagher io_set_pending(pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher }
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return ret;
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek}
ea929f1b022fc2cb77dec89b0e12accef983ec85Jakub Hrozek
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagherstruct iostream_pump *
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagheriostream_pump_create(struct istream *input, struct ostream *output)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher i_assert(input != NULL &&
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher output != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher /* ref streams */
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher i_stream_ref(input);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_ref(output);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher /* create pump */
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher struct iostream_pump *pump = i_new(struct iostream_pump, 1);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher pump->input = input;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->output = output;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher pump->ref = 1;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher return pump;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallaghervoid iostream_pump_start(struct iostream_pump *pump)
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher i_assert(pump->callback != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher /* add flush handler */
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher /* make IO objects */
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher /* make sure we do first read right away */
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher io_set_pending(pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagherstruct istream *iostream_pump_get_input(struct iostream_pump *pump)
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher return pump->input;
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagherstruct ostream *iostream_pump_get_output(struct iostream_pump *pump)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher return pump->output;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallaghervoid iostream_pump_set_completion_callback(struct iostream_pump *pump,
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek iostream_pump_callback_t *callback, void *context)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->callback = callback;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->context = context;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallaghervoid iostream_pump_ref(struct iostream_pump *pump)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL && pump->ref > 0);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->ref++;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
65a9065538fd85e6ead925d344e6b421900eb8c2Jakub Hrozek
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallaghervoid iostream_pump_unref(struct iostream_pump **pump_r)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump_r != NULL && *pump_r != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher struct iostream_pump *pump = *pump_r;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher *pump_r = NULL;
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump->ref > 0);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher if (--pump->ref == 0) {
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher iostream_pump_stop(pump);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_unref(&pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_stream_unref(&pump->input);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_free(pump);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher }
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallaghervoid iostream_pump_stop(struct iostream_pump *pump)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_assert(pump != NULL);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_unset_flush_callback(pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher io_remove(&pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallaghervoid iostream_pump_switch_ioloop(struct iostream_pump *pump)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher{
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher i_assert(pump != NULL);
1008001f34abb42df75f840db17f14a83f0c21d4Stephen Gallagher if (pump->io != NULL)
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher pump->io = io_loop_move_io(&pump->io);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher o_stream_switch_ioloop(pump->output);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher i_stream_switch_ioloop(pump->input);
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher}
6b0f9cd2ee601121cb7fe1d9ad8ebce782aa8f39Stephen Gallagher