5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "lib.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ioloop.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "array.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ostream-private.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch#include "ostream-multiplex.h"
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch/* all multiplex packets are [1 byte cid][4 byte length][data] */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstruct multiplex_ostream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstruct multiplex_ochannel {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct ostream_private ostream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ostream *mstream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch uint8_t cid;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_t *buf;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch time_t last_sent;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch bool closed:1;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch};
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstruct multiplex_ostream {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct ostream *parent;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* channel 0 is main channel */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch uint8_t cur_channel;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch unsigned int remain;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_t *wbuf;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t bufsize;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch ARRAY(struct multiplex_ochannel *) channels;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch bool destroyed:1;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch};
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct multiplex_ochannel *
7384b4e78eaab44693c985192276e31322155e32Stephan Boschget_channel(struct multiplex_ostream *mstream, uint8_t cid)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel **channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_assert(mstream != NULL);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->cid == cid)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return *channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return NULL;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel **channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (*channelp)->ostream.ostream.stream_errno = stream_errno;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch time_t oldest = ioloop_time;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = NULL;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel **channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (*channelp)->buf->used > 0)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel = *channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return channel;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic ssize_t
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_sendv(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch ssize_t ret = 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (mstream->bufsize <= mstream->wbuf->used + 5)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return -2;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch while((channel = get_next_channel(mstream)) != NULL) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* ensure it fits into 32 bit int */
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch if (tmp == 0)
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch break;
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch uint32_t len = cpu32_to_be(amt);
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch buffer_append(mstream->wbuf, &channel->cid, 1);
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch buffer_append(mstream->wbuf, &len, 4);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_append(mstream->wbuf, channel->buf->data, amt);
56d1345c43bbd28c36b7faa85e4163bd9e874290Timo Sirainen buffer_delete(channel->buf, 0, amt);
30d917bcd48d70af0371baf27571cc198d621a62Timo Sirainen channel->last_sent = ioloop_time;
9d0aee99a8c80d71137aa9b8c216cc203bec7a9aTimo Sirainen }
9d0aee99a8c80d71137aa9b8c216cc203bec7a9aTimo Sirainen
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen if (mstream->wbuf->used > 0) {
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen ret = o_stream_send(mstream->parent, mstream->wbuf->data,
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen mstream->wbuf->used);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (ret < 0) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch propagate_error(mstream, mstream->parent->stream_errno);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return ret;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_delete(mstream->wbuf, 0, ret);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return ret;
a8c4e79ff50fac21b05a7368b052583d410ca15cTimo Sirainen}
a8c4e79ff50fac21b05a7368b052583d410ca15cTimo Sirainen
70505f4839520ac67895992621c97d2480c22e7fTimo Sirainenstatic ssize_t
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch const struct const_iovec *iov, unsigned int iov_count)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch ssize_t ret;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t total = 0;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (channel->mstream->bufsize <= channel->buf->used)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return -2;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch for(unsigned int i=0; i < iov_count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* copy data to buffer */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t tmp = channel->mstream->bufsize - channel->buf->used;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (tmp == 0)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch break;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_append(channel->buf, iov[i].iov_base,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch I_MIN(tmp, iov[i].iov_len));
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch total += I_MIN(tmp, iov[i].iov_len);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
9dc01e0d10a61cab14867b26bf0d2d1dcf8ad978Timo Sirainen
9dc01e0d10a61cab14867b26bf0d2d1dcf8ad978Timo Sirainen stream->ostream.offset += total;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return ret;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return total;
aacf2a69acc59e9382578d6f4e030788abc79706Timo Sirainen}
aacf2a69acc59e9382578d6f4e030788abc79706Timo Sirainen
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *const *channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen channel->closed = TRUE;
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen if (close_parent) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach(&channel->mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp !=NULL && !(*channelp)->closed)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch o_stream_close(channel->mstream->parent);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
636d0f43138468f8efe685a681326b123f660e49Timo Sirainenstatic void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel **channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* can't do anything until they are all closed */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch o_stream_unref(&mstream->parent);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_free(&mstream->channels);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_free(&mstream->wbuf);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch i_free(mstream);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
636d0f43138468f8efe685a681326b123f660e49Timo Sirainenstatic void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
636d0f43138468f8efe685a681326b123f660e49Timo Sirainen{
636d0f43138468f8efe685a681326b123f660e49Timo Sirainen struct multiplex_ochannel **channelp;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch o_stream_unref(&channel->ostream.parent);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (channel->buf != NULL)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_free(&channel->buf);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* delete the channel */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&channel->mstream->channels, channelp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->cid == channel->cid) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch *channelp = NULL;
fb1be3de0159d6a10e916ad992e2bc53be64c6d5Timo Sirainen break;
fb1be3de0159d6a10e916ad992e2bc53be64c6d5Timo Sirainen }
fb1be3de0159d6a10e916ad992e2bc53be64c6d5Timo Sirainen }
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch o_stream_multiplex_try_destroy(channel->mstream);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Sirainen
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Sirainenstatic struct ostream *
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Siraineno_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Sirainen{
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->cid = cid;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->buf = buffer_create_dynamic(default_pool, 256);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->mstream = mstream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.fd = o_stream_get_fd(mstream->parent);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_append(&channel->mstream->channels, &channel, 1);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return o_stream_create(&channel->ostream, mstream->parent,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch mstream->bufsize);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch}
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstruct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen{
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen struct multiplex_ochannel *chan =
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainen i_assert(get_channel(chan->mstream, cid) == NULL);
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen return o_stream_add_channel_real(chan->mstream, cid);
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen}
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainenstruct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen{
56d1345c43bbd28c36b7faa85e4163bd9e874290Timo Sirainen struct multiplex_ostream *mstream;
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen mstream = i_new(struct multiplex_ostream, 1);
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen mstream->parent = parent;
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen mstream->bufsize = bufsize;
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen mstream->wbuf = buffer_create_dynamic(default_pool, 256);
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen i_array_init(&mstream->channels, 8);
d47b9f1bd7274c7b2d9049c2e1718d1cf89cc572Timo Sirainen o_stream_ref(parent);
d47b9f1bd7274c7b2d9049c2e1718d1cf89cc572Timo Sirainen
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen return o_stream_add_channel_real(mstream, 0);
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainen}
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainen
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainenuint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen{
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen struct multiplex_ochannel *channel =
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen return channel->cid;
}