ostream-multiplex.c revision 0bb5678d8e52e006c68203594ce48485a8682b79
76b43e4417bab52e913da39b5f5bc2a130d3f149Timo Sirainen/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen#include "lib.h"
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen#include "ioloop.h"
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen#include "array.h"
3343a61404603b21c246783a7963b77833095f31Timo Sirainen#include "ostream-private.h"
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen#include "ostream-multiplex.h"
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen/* all multiplex packets are [1 byte cid][4 byte length][data] */
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainenstruct multiplex_ostream;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainenstruct multiplex_ochannel {
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct ostream_private ostream;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct multiplex_ostream *mstream;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen uint8_t cid;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen buffer_t *buf;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen time_t last_sent;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen bool closed:1;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen};
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainenstruct multiplex_ostream {
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen struct ostream *parent;
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen /* channel 0 is main channel */
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen uint8_t cur_channel;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen unsigned int remain;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen buffer_t *wbuf;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen size_t bufsize;
3f190f4cbb9233a3a6830956cb5c7ae56a577b79Timo Sirainen ARRAY(struct multiplex_ochannel *) channels;
3f190f4cbb9233a3a6830956cb5c7ae56a577b79Timo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen bool destroyed:1;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen};
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainenstatic struct multiplex_ochannel *
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainenget_channel(struct multiplex_ostream *mstream, uint8_t cid)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen{
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen struct multiplex_ochannel **channelp;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen i_assert(mstream != NULL);
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen array_foreach_modifiable(&mstream->channels, channelp) {
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen if (*channelp != NULL && (*channelp)->cid == cid)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen return *channelp;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen }
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen return NULL;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen}
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainenstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen{
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct multiplex_ochannel **channelp;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen if (*channelp != NULL)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen (*channelp)->ostream.ostream.stream_errno = stream_errno;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen}
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainenstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
0d16525a729011f4fced989a3da74d755ea49e6dTimo Sirainen{
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen time_t oldest = ioloop_time;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct multiplex_ochannel *channel = NULL;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct multiplex_ochannel **channelp;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen (*channelp)->buf->used > 0)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen channel = *channelp;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen return channel;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen}
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainenstatic ssize_t
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Siraineno_stream_multiplex_sendv(struct multiplex_ostream *mstream)
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen{
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen struct multiplex_ochannel *channel;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen ssize_t ret = 0;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen if (mstream->bufsize <= mstream->wbuf->used + 5)
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen return -2;
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen while((channel = get_next_channel(mstream)) != NULL) {
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen /* ensure it fits into 32 bit int */
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen if (tmp == 0)
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen break;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen uint32_t len = cpu32_to_be(amt);
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen buffer_append(mstream->wbuf, &channel->cid, 1);
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen buffer_append(mstream->wbuf, &len, 4);
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen buffer_append(mstream->wbuf, channel->buf->data, amt);
ed63764502561bbeb12fe03878fb33a82b89bf27Timo Sirainen buffer_delete(channel->buf, 0, amt);
ed63764502561bbeb12fe03878fb33a82b89bf27Timo Sirainen channel->last_sent = ioloop_time;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen }
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen if (mstream->wbuf->used > 0) {
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen ret = o_stream_send(mstream->parent, mstream->wbuf->data,
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen mstream->wbuf->used);
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen if (ret < 0) {
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen propagate_error(mstream, mstream->parent->stream_errno);
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen return ret;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen }
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen o_stream_flush(mstream->parent);
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen buffer_delete(mstream->wbuf, 0, ret);
c5d71ca49b93e18ffbf197d89239d63678e881d7Timo Sirainen }
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen return ret;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen}
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainenstatic ssize_t
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Siraineno_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen const struct const_iovec *iov, unsigned int iov_count)
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen{
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen ssize_t ret;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen size_t total = 0;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen if (channel->mstream->bufsize <= channel->buf->used)
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen return -2;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen for(unsigned int i=0; i < iov_count; i++) {
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen /* copy data to buffer */
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen size_t tmp = channel->mstream->bufsize - channel->buf->used;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen if (tmp == 0)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen break;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen buffer_append(channel->buf, iov[i].iov_base,
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen I_MIN(tmp, iov[i].iov_len));
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen total += I_MIN(tmp, iov[i].iov_len);
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen }
d5cebe7f98e63d4e2822863ef2faa4971e8b3a5dTimo Sirainen
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainen stream->ostream.offset += total;
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen return ret;
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen return total;
424236b2b88a5a7bbde5cf6a6b32189ca3437629Timo Sirainen}
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen
cd56a23e21f1df3f79648cf07e2f4385e2fadebbTimo Sirainenstatic void
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Siraineno_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen{
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen struct multiplex_ochannel *const *channelp;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen o_stream_flush(&channel->ostream.ostream);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen channel->closed = TRUE;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen if (close_parent) {
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen array_foreach(&channel->mstream->channels, channelp)
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen if (*channelp !=NULL && !(*channelp)->closed)
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen return;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen o_stream_close(channel->mstream->parent);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen }
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen}
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen
7501b9f694460101b41d1d708ebc3ec2b0400b1cTimo Sirainenstatic void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
ccc895c0358108d2304239063e940b7d75f364abTimo Sirainen{
7501b9f694460101b41d1d708ebc3ec2b0400b1cTimo Sirainen struct multiplex_ochannel **channelp;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen /* can't do anything until they are all closed */
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen if (*channelp != NULL)
6060b7c8edf8fce73470d0df6a2479b69b01c537Timo Sirainen return;
7501b9f694460101b41d1d708ebc3ec2b0400b1cTimo Sirainen o_stream_flush(mstream->parent);
7501b9f694460101b41d1d708ebc3ec2b0400b1cTimo Sirainen o_stream_unref(&mstream->parent);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen array_free(&mstream->channels);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen buffer_free(&mstream->wbuf);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen i_free(mstream);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen}
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen
1b5366b2234892f8930a29351da06b193e385150Timo Sirainenstatic void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen{
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen struct multiplex_ochannel **channelp;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen o_stream_multiplex_ochannel_close(stream, TRUE);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen if (channel->buf != NULL)
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen buffer_free(&channel->buf);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen /* delete the channel */
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen array_foreach_modifiable(&channel->mstream->channels, channelp) {
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen if (*channelp != NULL && (*channelp)->cid == channel->cid) {
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen *channelp = NULL;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen break;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen }
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen }
e5afebd2df1d4990f7bec2a839260ff2e6d78168Timo Sirainen o_stream_multiplex_try_destroy(channel->mstream);
ccc895c0358108d2304239063e940b7d75f364abTimo Sirainen}
ff7056842f14fd3b30a2d327dfab165b9d15dd30Timo Sirainen
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainenstatic struct ostream *
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Siraineno_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen{
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen channel->cid = cid;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen channel->buf = buffer_create_dynamic(default_pool, 256);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen channel->mstream = mstream;
ccc895c0358108d2304239063e940b7d75f364abTimo Sirainen channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen channel->ostream.fd = o_stream_get_fd(mstream->parent);
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen array_append(&channel->mstream->channels, &channel, 1);
6060b7c8edf8fce73470d0df6a2479b69b01c537Timo Sirainen
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen return o_stream_create(&channel->ostream, mstream->parent,
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen mstream->bufsize);
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen}
3343a61404603b21c246783a7963b77833095f31Timo Sirainen
73b50eecfc31750a312e2f940023f522eb07178cTimo Sirainenstruct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
73b50eecfc31750a312e2f940023f522eb07178cTimo Sirainen{
3343a61404603b21c246783a7963b77833095f31Timo Sirainen struct multiplex_ochannel *chan =
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
3343a61404603b21c246783a7963b77833095f31Timo Sirainen i_assert(get_channel(chan->mstream, cid) == NULL);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen return o_stream_add_channel_real(chan->mstream, cid);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen}
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen
struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
{
struct multiplex_ostream *mstream;
mstream = i_new(struct multiplex_ostream, 1);
mstream->parent = parent;
mstream->bufsize = bufsize;
mstream->wbuf = buffer_create_dynamic(default_pool, 256);
i_array_init(&mstream->channels, 8);
o_stream_ref(parent);
return o_stream_add_channel_real(mstream, 0);
}
uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
{
struct multiplex_ochannel *channel =
(struct multiplex_ochannel *)stream->real_stream;
return channel->cid;
}