ostream-multiplex.c revision 0bb5678d8e52e006c68203594ce48485a8682b79
76b43e4417bab52e913da39b5f5bc2a130d3f149Timo Sirainen/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen/* all multiplex packets are [1 byte cid][4 byte length][data] */
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Sirainen /* channel 0 is main channel */
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen unsigned int remain;
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainenget_channel(struct multiplex_ostream *mstream, uint8_t cid)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen array_foreach_modifiable(&mstream->channels, channelp) {
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen if (*channelp != NULL && (*channelp)->cid == cid)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainenstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen (*channelp)->ostream.ostream.stream_errno = stream_errno;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainenstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Siraineno_stream_multiplex_sendv(struct multiplex_ostream *mstream)
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen if (mstream->bufsize <= mstream->wbuf->used + 5)
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen while((channel = get_next_channel(mstream)) != NULL) {
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen /* ensure it fits into 32 bit int */
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
8ed8c821ba8aab0b4ed0375f87d48737ef0e0d8eTimo Sirainen buffer_append(mstream->wbuf, &channel->cid, 1);
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen buffer_append(mstream->wbuf, channel->buf->data, amt);
8fcff4c5b52f24d9c681805fdf06b486f1d0fcbeTimo Sirainen ret = o_stream_send(mstream->parent, mstream->wbuf->data,
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen propagate_error(mstream, mstream->parent->stream_errno);
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Siraineno_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen const struct const_iovec *iov, unsigned int iov_count)
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen if (channel->mstream->bufsize <= channel->buf->used)
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen for(unsigned int i=0; i < iov_count; i++) {
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen /* copy data to buffer */
0a601ada15c7fe82f0db895fc2068b71b3a5243cTimo Sirainen size_t tmp = channel->mstream->bufsize - channel->buf->used;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
04ab375449dd97eed50ada88dd0df2abab01cfeeTimo Siraineno_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen array_foreach(&channel->mstream->channels, channelp)
7501b9f694460101b41d1d708ebc3ec2b0400b1cTimo Sirainenstatic void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen /* can't do anything until they are all closed */
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
1b5366b2234892f8930a29351da06b193e385150Timo Sirainenstatic void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen o_stream_multiplex_ochannel_close(stream, TRUE);
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen /* delete the channel */
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen array_foreach_modifiable(&channel->mstream->channels, channelp) {
1b5366b2234892f8930a29351da06b193e385150Timo Sirainen if (*channelp != NULL && (*channelp)->cid == channel->cid) {
e5afebd2df1d4990f7bec2a839260ff2e6d78168Timo Sirainen o_stream_multiplex_try_destroy(channel->mstream);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainenstatic struct ostream *
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Siraineno_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen channel->buf = buffer_create_dynamic(default_pool, 256);
ccc895c0358108d2304239063e940b7d75f364abTimo Sirainen channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen channel->ostream.fd = o_stream_get_fd(mstream->parent);
690bafa70767e3f6e98bbfd62ad4a26be2387ea9Timo Sirainen array_append(&channel->mstream->channels, &channel, 1);
f1901fd21906911f7be075c965ac882f6a87b4c3Timo Sirainen return o_stream_create(&channel->ostream, mstream->parent,
73b50eecfc31750a312e2f940023f522eb07178cTimo Sirainenstruct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
b5e6f6f27c1461f0f9f202615eeb738a645188c3Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
3343a61404603b21c246783a7963b77833095f31Timo Sirainen i_assert(get_channel(chan->mstream, cid) == NULL);