ostream-multiplex.c revision 2707567dbba16966d76a986fba09f639659a24e1
183bea41fa640dc8117f3eb45ff935cd81377a84Timo Sirainen/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen/* all multiplex packets are [1 byte cid][4 byte length][data] */
029cfcdce65b284d5230adf1c920a5f526b03b5cTimo Sirainen /* channel 0 is main channel */
029cfcdce65b284d5230adf1c920a5f526b03b5cTimo Sirainen unsigned int remain;
90c23747727c85f80e4e8eed7968f0edbeac7ac5Timo Sirainenget_channel(struct multiplex_ostream *mstream, uint8_t cid)
90c23747727c85f80e4e8eed7968f0edbeac7ac5Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp) {
90c23747727c85f80e4e8eed7968f0edbeac7ac5Timo Sirainen if (*channelp != NULL && (*channelp)->cid == cid)
90c23747727c85f80e4e8eed7968f0edbeac7ac5Timo Sirainenstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen (*channelp)->ostream.ostream.stream_errno = stream_errno;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainenstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
eddd9bf1a1369aea4a2715f6be1137da6d17d293Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
b397665e90fa0fc7c6a9156fdd6cf28b571e8e39Timo Sirainen if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Siraineno_stream_multiplex_sendv(struct multiplex_ostream *mstream)
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen if (mstream->bufsize <= mstream->wbuf->used + 5)
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen while((channel = get_next_channel(mstream)) != NULL) {
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen /* ensure it fits into 32 bit int */
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
2a6af811ea3de3cf9e2f15e446674dd21b0705f3Timo Sirainen buffer_append(mstream->wbuf, &channel->cid, 1);
2a6af811ea3de3cf9e2f15e446674dd21b0705f3Timo Sirainen buffer_append(mstream->wbuf, channel->buf->data, amt);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen ret = o_stream_send(mstream->parent, mstream->wbuf->data,
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen propagate_error(mstream, mstream->parent->stream_errno);
0f66f12eb4cdbf47670975044c88d8f388bf92dfTimo Siraineno_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen const struct const_iovec *iov, unsigned int iov_count)
029cfcdce65b284d5230adf1c920a5f526b03b5cTimo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen if (channel->mstream->bufsize <= channel->buf->used)
029cfcdce65b284d5230adf1c920a5f526b03b5cTimo Sirainen for(unsigned int i=0; i < iov_count; i++) {
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen /* copy data to buffer */
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen size_t tmp = channel->mstream->bufsize - channel->buf->used;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Siraineno_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainen array_foreach(&channel->mstream->channels, channelp)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainenstatic void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
0bf3eac1110a902e7ec7e695c64e8e46c114e623Timo Sirainen /* can't do anything until they are all closed */
0bf3eac1110a902e7ec7e695c64e8e46c114e623Timo Sirainen array_foreach_modifiable(&mstream->channels, channelp)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainenstatic void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainen struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen o_stream_multiplex_ochannel_close(stream, TRUE);
f153a2cec0319f549388d28f8cfd4d50229d1132Timo Sirainen /* delete the channel */
f153a2cec0319f549388d28f8cfd4d50229d1132Timo Sirainen array_foreach_modifiable(&channel->mstream->channels, channelp) {
dffa503fd4ce31334346e539496084c80a2d8d37Timo Sirainen if (*channelp != NULL && (*channelp)->cid == channel->cid) {
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainen o_stream_multiplex_try_destroy(channel->mstream);
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainenstatic struct ostream *
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Siraineno_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
85144b5f0bc763de14c7d87291a90ef74ac241a2Timo Sirainen struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen channel->buf = buffer_create_dynamic(default_pool, 256);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen channel->ostream.fd = o_stream_get_fd(mstream->parent);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen array_append(&channel->mstream->channels, &channel, 1);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen return o_stream_create(&channel->ostream, mstream->parent,
63e2edd14ae7b1dc4a08e2e659501dbf519462f9Timo Sirainenstruct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
63e2edd14ae7b1dc4a08e2e659501dbf519462f9Timo Sirainen (struct multiplex_ochannel *)stream->real_stream;
63e2edd14ae7b1dc4a08e2e659501dbf519462f9Timo Sirainen i_assert(get_channel(chan->mstream, cid) == NULL);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen return o_stream_add_channel_real(chan->mstream, cid);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainenstruct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainen mstream->wbuf = buffer_create_dynamic(default_pool, 256);
4bc96ba6f1d67a90a75fa131bcd2cd508ea5a05aTimo Sirainenuint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)