5a580c3a38ced62d4bcc95b8ac7c4f2935b5d294Timo Sirainen/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch/* all multiplex packets are [1 byte cid][4 byte length][data] */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* channel 0 is main channel */
7384b4e78eaab44693c985192276e31322155e32Stephan Boschget_channel(struct multiplex_ostream *mstream, uint8_t cid)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->cid == cid)
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch (*channelp)->ostream.ostream.stream_errno = stream_errno;
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_sendv(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (mstream->bufsize <= mstream->wbuf->used + 5)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch while((channel = get_next_channel(mstream)) != NULL) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* ensure it fits into 32 bit int */
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
eb325a5a90c1d2655e74972bde0de6a699d2c864Stephan Bosch buffer_append(mstream->wbuf, &channel->cid, 1);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch buffer_append(mstream->wbuf, channel->buf->data, amt);
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen ret = o_stream_send(mstream->parent, mstream->wbuf->data,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch propagate_error(mstream, mstream->parent->stream_errno);
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch const struct const_iovec *iov, unsigned int iov_count)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (channel->mstream->bufsize <= channel->buf->used)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch for(unsigned int i=0; i < iov_count; i++) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* copy data to buffer */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch size_t tmp = channel->mstream->bufsize - channel->buf->used;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
7384b4e78eaab44693c985192276e31322155e32Stephan Boscho_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach(&channel->mstream->channels, channelp)
636d0f43138468f8efe685a681326b123f660e49Timo Sirainenstatic void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* can't do anything until they are all closed */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&mstream->channels, channelp)
636d0f43138468f8efe685a681326b123f660e49Timo Sirainenstatic void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch /* delete the channel */
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_foreach_modifiable(&channel->mstream->channels, channelp) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch if (*channelp != NULL && (*channelp)->cid == channel->cid) {
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch o_stream_multiplex_try_destroy(channel->mstream);
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Sirainenstatic struct ostream *
1a9a35a6b307f8d5b25345af55e40a99162b4072Timo Siraineno_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->buf = buffer_create_dynamic(default_pool, 256);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch channel->ostream.fd = o_stream_get_fd(mstream->parent);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch array_append(&channel->mstream->channels, &channel, 1);
7384b4e78eaab44693c985192276e31322155e32Stephan Bosch return o_stream_create(&channel->ostream, mstream->parent,
7384b4e78eaab44693c985192276e31322155e32Stephan Boschstruct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainen i_assert(get_channel(chan->mstream, cid) == NULL);
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainen return o_stream_add_channel_real(chan->mstream, cid);
415e16c3dc185578695b7d88e561a52de6c8b1b1Timo Sirainenstruct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
35e962a9186b4e9b2001628c1d7b55c24b33ce84Timo Sirainen mstream->wbuf = buffer_create_dynamic(default_pool, 256);
ba1c847d0af4afe4787ed470d0c818e948e184e2Timo Sirainenuint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)