ostream-multiplex.c revision 6139e5f5cae82fa04b45e8366c24facf246e6292
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper#include "lib.h"
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper#include "ioloop.h"
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper#include "array.h"
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper#include "ostream-private.h"
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper#include "ostream-multiplex.h"
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper/* all multiplex packets are [1 byte cid][4 byte length][data] */
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstruct multiplex_ostream;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstruct multiplex_ochannel {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct ostream_private ostream;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ostream *mstream;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper uint8_t cid;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_t *buf;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper time_t last_sent;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper bool closed:1;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper};
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstruct multiplex_ostream {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct ostream *parent;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper /* channel 0 is main channel */
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper uint8_t cur_channel;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper unsigned int remain;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_t *wbuf;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper size_t bufsize;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper ARRAY(struct multiplex_ochannel *) channels;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper bool destroyed:1;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper};
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstatic struct multiplex_ochannel *
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperget_channel(struct multiplex_ostream *mstream, uint8_t cid)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper{
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper struct multiplex_ochannel **channelp;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper i_assert(mstream != NULL);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper array_foreach_modifiable(&mstream->channels, channelp) {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper if (*channelp != NULL && (*channelp)->cid == cid)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return *channelp;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper }
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return NULL;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper}
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstatic void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper{
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ochannel **channelp;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper array_foreach_modifiable(&mstream->channels, channelp)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper if (*channelp != NULL)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper (*channelp)->ostream.ostream.stream_errno = stream_errno;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper}
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstatic struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper{
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper time_t oldest = ioloop_time;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ochannel *channel = NULL;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ochannel **channelp;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper array_foreach_modifiable(&mstream->channels, channelp)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper (*channelp)->buf->used > 0)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper channel = *channelp;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return channel;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper}
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstatic ssize_t
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reepero_stream_multiplex_sendv(struct multiplex_ostream *mstream)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper{
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ochannel *channel;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper ssize_t ret = 0;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper if (mstream->bufsize <= mstream->wbuf->used + 5)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return -2;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper while((channel = get_next_channel(mstream)) != NULL) {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper /* ensure it fits into 32 bit int */
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper if (tmp == 0)
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper break;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper uint32_t len = cpu32_to_be(amt);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_append(mstream->wbuf, &channel->cid, 1);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_append(mstream->wbuf, &len, 4);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_append(mstream->wbuf, channel->buf->data, amt);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_delete(channel->buf, 0, amt);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper channel->last_sent = ioloop_time;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper }
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper if (mstream->wbuf->used > 0) {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper ret = o_stream_send(mstream->parent, mstream->wbuf->data,
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper mstream->wbuf->used);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper if (ret < 0) {
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper propagate_error(mstream, mstream->parent->stream_errno);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return ret;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper }
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper buffer_delete(mstream->wbuf, 0, ret);
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper }
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper return ret;
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper}
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeperstatic ssize_t
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reepero_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
0595fb660c93faf1fdbaad7e1300eb342b5baf31Mark de Reeper const struct const_iovec *iov, unsigned int iov_count)
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper{
6406210b71fd4a97800f32f3613eea9b6a6a12ceMark de Reeper struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
ssize_t ret;
size_t total = 0;
if (channel->mstream->bufsize <= channel->buf->used)
return -2;
for(unsigned int i=0; i < iov_count; i++) {
/* copy data to buffer */
size_t tmp = channel->mstream->bufsize - channel->buf->used;
if (tmp == 0)
break;
buffer_append(channel->buf, iov[i].iov_base,
I_MIN(tmp, iov[i].iov_len));
total += I_MIN(tmp, iov[i].iov_len);
}
stream->ostream.offset += total;
if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
return ret;
return total;
}
static void
o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
{
struct multiplex_ochannel *const *channelp;
struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
(void)o_stream_flush(&channel->ostream.ostream);
channel->closed = TRUE;
if (close_parent) {
array_foreach(&channel->mstream->channels, channelp)
if (*channelp !=NULL && !(*channelp)->closed)
return;
o_stream_close(channel->mstream->parent);
}
}
static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
{
struct multiplex_ochannel **channelp;
/* can't do anything until they are all closed */
array_foreach_modifiable(&mstream->channels, channelp)
if (*channelp != NULL)
return;
o_stream_unref(&mstream->parent);
array_free(&mstream->channels);
buffer_free(&mstream->wbuf);
i_free(mstream);
}
static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
{
struct multiplex_ochannel **channelp;
struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
o_stream_multiplex_ochannel_close(stream, TRUE);
o_stream_unref(&channel->ostream.parent);
if (channel->buf != NULL)
buffer_free(&channel->buf);
/* delete the channel */
array_foreach_modifiable(&channel->mstream->channels, channelp) {
if (*channelp != NULL && (*channelp)->cid == channel->cid) {
*channelp = NULL;
break;
}
}
o_stream_multiplex_try_destroy(channel->mstream);
}
static struct ostream *
o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
{
struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
channel->cid = cid;
channel->buf = buffer_create_dynamic(default_pool, 256);
channel->mstream = mstream;
channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
channel->ostream.fd = o_stream_get_fd(mstream->parent);
array_append(&channel->mstream->channels, &channel, 1);
return o_stream_create(&channel->ostream, mstream->parent,
mstream->bufsize);
}
struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
{
struct multiplex_ochannel *chan =
(struct multiplex_ochannel *)stream->real_stream;
i_assert(get_channel(chan->mstream, cid) == NULL);
return o_stream_add_channel_real(chan->mstream, cid);
}
struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
{
struct multiplex_ostream *mstream;
mstream = i_new(struct multiplex_ostream, 1);
mstream->parent = parent;
mstream->bufsize = bufsize;
mstream->wbuf = buffer_create_dynamic(default_pool, 256);
i_array_init(&mstream->channels, 8);
o_stream_ref(parent);
return o_stream_add_channel_real(mstream, 0);
}
uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
{
struct multiplex_ochannel *channel =
(struct multiplex_ochannel *)stream->real_stream;
return channel->cid;
}