ioloop.c revision e2588872c1fe79642589b805aaab9fbb6750771b
1968N/A/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file */
1968N/A
1968N/A#include "lib.h"
1968N/A#include "array.h"
1968N/A#include "time-util.h"
1968N/A#include "istream-private.h"
1968N/A#include "ioloop-private.h"
1968N/A
1968N/A#include <unistd.h>
1968N/A
1968N/A#define timer_is_larger(tvp, uvp) \
1968N/A ((tvp)->tv_sec > (uvp)->tv_sec || \
1968N/A ((tvp)->tv_sec == (uvp)->tv_sec && \
1968N/A (tvp)->tv_usec > (uvp)->tv_usec))
1968N/A
1968N/Atime_t ioloop_time = 0;
1968N/Astruct timeval ioloop_timeval;
1968N/A
1968N/Astruct ioloop *current_ioloop = NULL;
1968N/Astatic ARRAY(io_switch_callback_t *) io_switch_callbacks = ARRAY_INIT;
1968N/A
1968N/Astatic void io_loop_initialize_handler(struct ioloop *ioloop)
1968N/A{
2616N/A unsigned int initial_fd_count;
1968N/A
1968N/A initial_fd_count = ioloop->max_fd_count > 0 &&
1968N/A ioloop->max_fd_count < IOLOOP_INITIAL_FD_COUNT ?
1968N/A ioloop->max_fd_count : IOLOOP_INITIAL_FD_COUNT;
1968N/A io_loop_handler_init(ioloop, initial_fd_count);
1968N/A}
1968N/A
1968N/Astatic struct io_file *
1968N/Aio_add_file(int fd, enum io_condition condition,
1968N/A unsigned int source_linenum,
1968N/A io_callback_t *callback, void *context)
2510N/A{
1968N/A struct io_file *io;
2028N/A
2028N/A i_assert(callback != NULL);
2028N/A i_assert((condition & IO_NOTIFY) == 0);
2028N/A
2301N/A io = i_new(struct io_file, 1);
2028N/A io->io.condition = condition;
1968N/A io->io.callback = callback;
1968N/A io->io.context = context;
1968N/A io->io.ioloop = current_ioloop;
1968N/A io->io.source_linenum = source_linenum;
1968N/A io->refcount = 1;
2028N/A io->fd = fd;
2028N/A
1968N/A if (io->io.ioloop->cur_ctx != NULL) {
2028N/A io->io.ctx = io->io.ioloop->cur_ctx;
2843N/A io_loop_context_ref(io->io.ctx);
2028N/A }
1968N/A
1968N/A if (io->io.ioloop->handler_context == NULL)
1968N/A io_loop_initialize_handler(io->io.ioloop);
1968N/A if (fd != -1)
1968N/A io_loop_handle_add(io);
2028N/A else {
1968N/A /* we're adding an istream whose only way to get notified
2616N/A is to call i_stream_set_input_pending() */
2301N/A }
1968N/A
2028N/A if (io->io.ioloop->io_files != NULL) {
1968N/A io->io.ioloop->io_files->prev = io;
1968N/A io->next = io->io.ioloop->io_files;
1968N/A }
1968N/A io->io.ioloop->io_files = io;
2028N/A return io;
2028N/A}
2028N/A
2028N/A#undef io_add
2028N/Astruct io *io_add(int fd, enum io_condition condition,
2028N/A unsigned int source_linenum,
1968N/A io_callback_t *callback, void *context)
2301N/A{
1968N/A struct io_file *io;
1968N/A
1968N/A i_assert(fd >= 0);
1968N/A io = io_add_file(fd, condition, source_linenum, callback, context);
1968N/A return &io->io;
1968N/A}
1968N/A
1968N/A#undef io_add_istream
1968N/Astruct io *io_add_istream(struct istream *input, unsigned int source_linenum,
1968N/A io_callback_t *callback, void *context)
1968N/A{
1968N/A struct io_file *io;
1968N/A
1968N/A io = io_add_file(i_stream_get_fd(input), IO_READ, source_linenum,
1968N/A callback, context);
1968N/A io->istream = input;
1968N/A i_stream_ref(io->istream);
1968N/A i_stream_set_io(io->istream, &io->io);
1968N/A return &io->io;
1968N/A}
1968N/A
1968N/Astatic void io_file_unlink(struct io_file *io)
1968N/A{
2301N/A if (io->prev != NULL)
2301N/A io->prev->next = io->next;
2301N/A else
2301N/A io->io.ioloop->io_files = io->next;
2301N/A
2301N/A if (io->next != NULL)
2301N/A io->next->prev = io->prev;
2301N/A
2301N/A /* if we got here from an I/O handler callback, make sure we
2301N/A don't try to handle this one next. */
2301N/A if (io->io.ioloop->next_io_file == io)
2301N/A io->io.ioloop->next_io_file = io->next;
2301N/A}
1968N/A
1968N/Astatic void io_remove_full(struct io **_io, bool closed)
1968N/A{
1968N/A struct io *io = *_io;
1968N/A
1968N/A i_assert(io->callback != NULL);
1968N/A
1968N/A *_io = NULL;
1968N/A
1968N/A /* make sure the callback doesn't get called anymore.
1968N/A kqueue code relies on this. */
1968N/A io->callback = NULL;
1968N/A
1968N/A if (io->pending) {
1968N/A i_assert(io->ioloop->io_pending_count > 0);
1968N/A io->ioloop->io_pending_count--;
1978N/A }
1968N/A
1968N/A if (io->ctx != NULL)
2510N/A io_loop_context_unref(&io->ctx);
2028N/A
2301N/A if ((io->condition & IO_NOTIFY) != 0)
2142N/A io_loop_notify_remove(io);
2301N/A else {
2028N/A struct io_file *io_file = (struct io_file *)io;
2028N/A struct istream *istream = io_file->istream;
2028N/A
2301N/A if (istream != NULL) {
1978N/A /* remove io before it's freed */
2510N/A i_stream_unset_io(istream, io);
2510N/A }
2510N/A
2301N/A io_file_unlink(io_file);
2132N/A if (io_file->fd != -1)
2028N/A io_loop_handle_remove(io_file, closed);
2301N/A else
2132N/A i_free(io);
2028N/A
2301N/A /* remove io from the ioloop before unreferencing the istream,
2301N/A because a destroyed istream may automatically close the
2301N/A fd. */
2301N/A if (istream != NULL)
2028N/A i_stream_unref(&istream);
2028N/A }
2028N/A}
2843N/A
2843N/Avoid io_remove(struct io **io)
2843N/A{
2843N/A io_remove_full(io, FALSE);
2028N/A}
2028N/A
1968N/Avoid io_remove_closed(struct io **io)
1968N/A{
1978N/A i_assert(((*io)->condition & IO_NOTIFY) == 0);
1978N/A
1968N/A io_remove_full(io, TRUE);
1968N/A}
1968N/A
1968N/Avoid io_set_pending(struct io *io)
1968N/A{
1968N/A i_assert((io->condition & IO_NOTIFY) == 0);
1968N/A
1968N/A if (!io->pending) {
1968N/A io->pending = TRUE;
1968N/A io->ioloop->io_pending_count++;
1968N/A }
1968N/A}
1968N/A
1968N/Astatic void timeout_update_next(struct timeout *timeout, struct timeval *tv_now)
1968N/A{
2028N/A if (tv_now == NULL) {
1968N/A if (gettimeofday(&timeout->next_run, NULL) < 0)
1968N/A i_fatal("gettimeofday(): %m");
2301N/A } else {
2301N/A timeout->next_run.tv_sec = tv_now->tv_sec;
2301N/A timeout->next_run.tv_usec = tv_now->tv_usec;
2301N/A }
2301N/A
2301N/A /* we don't want microsecond accuracy or this function will be
2301N/A called all the time - millisecond is more than enough */
2301N/A timeout->next_run.tv_usec -= timeout->next_run.tv_usec % 1000;
2301N/A
2301N/A timeout->next_run.tv_sec += timeout->msecs/1000;
2301N/A timeout->next_run.tv_usec += (timeout->msecs%1000)*1000;
2301N/A
2301N/A if (timeout->next_run.tv_usec > 1000000) {
2301N/A timeout->next_run.tv_sec++;
2301N/A timeout->next_run.tv_usec -= 1000000;
2301N/A }
2301N/A}
2301N/A
2301N/Astatic struct timeout *
2301N/Atimeout_add_common(unsigned int source_linenum,
2301N/A timeout_callback_t *callback, void *context)
2301N/A{
2301N/A struct timeout *timeout;
2301N/A
2301N/A timeout = i_new(struct timeout, 1);
2301N/A timeout->item.idx = UINT_MAX;
2301N/A timeout->source_linenum = source_linenum;
2301N/A timeout->ioloop = current_ioloop;
2301N/A
2301N/A timeout->callback = callback;
2301N/A timeout->context = context;
2301N/A
2301N/A if (timeout->ioloop->cur_ctx != NULL) {
2301N/A timeout->ctx = timeout->ioloop->cur_ctx;
2301N/A io_loop_context_ref(timeout->ctx);
2301N/A }
2301N/A
2301N/A return timeout;
2301N/A}
2301N/A
2301N/A#undef timeout_add
2301N/Astruct timeout *timeout_add(unsigned int msecs, unsigned int source_linenum,
2301N/A timeout_callback_t *callback, void *context)
2301N/A{
2301N/A struct timeout *timeout;
2301N/A
2301N/A timeout = timeout_add_common(source_linenum, callback, context);
2301N/A timeout->msecs = msecs;
2301N/A
2301N/A if (msecs > 0) {
2301N/A /* start this timeout in the next run cycle */
2301N/A array_append(&timeout->ioloop->timeouts_new, &timeout, 1);
2301N/A } else {
2301N/A /* trigger zero timeouts as soon as possible */
2301N/A timeout_update_next(timeout, timeout->ioloop->running ?
2301N/A NULL : &ioloop_timeval);
2301N/A priorityq_add(timeout->ioloop->timeouts, &timeout->item);
2301N/A }
2301N/A return timeout;
2301N/A}
2301N/A
2301N/A#undef timeout_add_short
2028N/Astruct timeout *
2028N/Atimeout_add_short(unsigned int msecs, unsigned int source_linenum,
2028N/A timeout_callback_t *callback, void *context)
2028N/A{
2028N/A return timeout_add(msecs, source_linenum, callback, context);
2028N/A}
2028N/A
2028N/A#undef timeout_add_absolute
2028N/Astruct timeout *
2028N/Atimeout_add_absolute(const struct timeval *time,
2028N/A unsigned int source_linenum,
2028N/A timeout_callback_t *callback, void *context)
2028N/A{
2028N/A struct timeout *timeout;
2028N/A
2843N/A timeout = timeout_add_common(source_linenum, callback, context);
2028N/A timeout->one_shot = TRUE;
2028N/A timeout->next_run = *time;
2028N/A
2028N/A priorityq_add(timeout->ioloop->timeouts, &timeout->item);
2028N/A return timeout;
2028N/A}
2028N/A
2028N/Astatic struct timeout *
2028N/Atimeout_copy(const struct timeout *old_to)
2028N/A{
2028N/A struct timeout *new_to;
2028N/A
2028N/A new_to = timeout_add_common
2028N/A (old_to->source_linenum, old_to->callback, old_to->context);
2028N/A new_to->one_shot = old_to->one_shot;
2028N/A new_to->msecs = old_to->msecs;
2028N/A new_to->next_run = old_to->next_run;
2073N/A
2073N/A if (old_to->item.idx != UINT_MAX)
2028N/A priorityq_add(new_to->ioloop->timeouts, &new_to->item);
2028N/A else if (!new_to->one_shot) {
2028N/A i_assert(new_to->msecs > 0);
2028N/A array_append(&new_to->ioloop->timeouts_new, &new_to, 1);
2028N/A }
2028N/A
2028N/A return new_to;
1968N/A}
2142N/A
2142N/Astatic void timeout_free(struct timeout *timeout)
2142N/A{
2142N/A if (timeout->ctx != NULL)
2142N/A io_loop_context_unref(&timeout->ctx);
2142N/A i_free(timeout);
2142N/A}
2142N/A
2142N/Avoid timeout_remove(struct timeout **_timeout)
2142N/A{
2142N/A struct timeout *timeout = *_timeout;
2142N/A struct ioloop *ioloop = timeout->ioloop;
2142N/A
2142N/A *_timeout = NULL;
2142N/A if (timeout->item.idx != UINT_MAX)
2142N/A priorityq_remove(timeout->ioloop->timeouts, &timeout->item);
2142N/A else if (!timeout->one_shot && timeout->msecs > 0) {
2142N/A struct timeout *const *to_idx;
2142N/A array_foreach(&ioloop->timeouts_new, to_idx) {
2142N/A if (*to_idx == timeout) {
2142N/A array_delete(&ioloop->timeouts_new,
2142N/A array_foreach_idx(&ioloop->timeouts_new, to_idx), 1);
2142N/A break;
2142N/A }
2142N/A }
2142N/A }
2142N/A timeout_free(timeout);
2142N/A}
2142N/A
2142N/Astatic void ATTR_NULL(2)
2142N/Atimeout_reset_timeval(struct timeout *timeout, struct timeval *tv_now)
2142N/A{
2142N/A if (timeout->item.idx == UINT_MAX)
2142N/A return;
2142N/A
2142N/A timeout_update_next(timeout, tv_now);
2142N/A if (timeout->msecs <= 1) {
2142N/A /* if we came here from io_loop_handle_timeouts(),
2142N/A next_run must be larger than tv_now or we could go to
2142N/A infinite loop. +1000 to get 1 ms further, another +1000 to
2142N/A account for timeout_update_next()'s truncation. */
2142N/A timeout->next_run.tv_usec += 2000;
2142N/A if (timeout->next_run.tv_usec >= 1000000) {
2142N/A timeout->next_run.tv_sec++;
2142N/A timeout->next_run.tv_usec -= 1000000;
2142N/A }
2142N/A }
2142N/A i_assert(tv_now == NULL ||
2142N/A timeout->next_run.tv_sec > tv_now->tv_sec ||
2142N/A (timeout->next_run.tv_sec == tv_now->tv_sec &&
2142N/A timeout->next_run.tv_usec > tv_now->tv_usec));
2142N/A priorityq_remove(timeout->ioloop->timeouts, &timeout->item);
2142N/A priorityq_add(timeout->ioloop->timeouts, &timeout->item);
2142N/A}
2142N/A
2142N/Avoid timeout_reset(struct timeout *timeout)
2142N/A{
2142N/A i_assert(!timeout->one_shot);
2142N/A timeout_reset_timeval(timeout, NULL);
2142N/A}
2142N/A
2142N/Astatic int timeout_get_wait_time(struct timeout *timeout, struct timeval *tv_r,
2028N/A struct timeval *tv_now)
2028N/A{
2028N/A int ret;
2028N/A
2028N/A if (tv_now->tv_sec == 0) {
2028N/A if (gettimeofday(tv_now, NULL) < 0)
2028N/A i_fatal("gettimeofday(): %m");
2028N/A }
2028N/A tv_r->tv_sec = tv_now->tv_sec;
2028N/A tv_r->tv_usec = tv_now->tv_usec;
2028N/A
2028N/A i_assert(tv_r->tv_sec > 0);
2028N/A i_assert(timeout->next_run.tv_sec > 0);
2028N/A
2028N/A tv_r->tv_sec = timeout->next_run.tv_sec - tv_r->tv_sec;
2028N/A tv_r->tv_usec = timeout->next_run.tv_usec - tv_r->tv_usec;
2028N/A if (tv_r->tv_usec < 0) {
2028N/A tv_r->tv_sec--;
2028N/A tv_r->tv_usec += 1000000;
1968N/A }
2028N/A
2028N/A if (tv_r->tv_sec < 0 || (tv_r->tv_sec == 0 && tv_r->tv_usec < 1000)) {
2028N/A tv_r->tv_sec = 0;
2028N/A tv_r->tv_usec = 0;
2028N/A return 0;
2028N/A }
2028N/A if (tv_r->tv_sec > INT_MAX/1000-1)
2028N/A tv_r->tv_sec = INT_MAX/1000-1;
2028N/A
2028N/A /* round wait times up to next millisecond */
2028N/A ret = tv_r->tv_sec * 1000 + (tv_r->tv_usec + 999) / 1000;
2028N/A i_assert(ret > 0 && tv_r->tv_sec >= 0 && tv_r->tv_usec >= 0);
2028N/A return ret;
2028N/A}
2028N/A
2028N/Aint io_loop_get_wait_time(struct ioloop *ioloop, struct timeval *tv_r)
2028N/A{
2028N/A struct timeval tv_now;
2028N/A struct priorityq_item *item;
2028N/A struct timeout *timeout;
2028N/A int msecs;
2028N/A
2028N/A item = priorityq_peek(ioloop->timeouts);
2028N/A timeout = (struct timeout *)item;
1968N/A if (timeout == NULL) {
1968N/A /* no timeouts. use INT_MAX msecs for timeval and
2028N/A return -1 for poll/epoll infinity. */
1968N/A tv_r->tv_sec = INT_MAX / 1000;
2028N/A tv_r->tv_usec = 0;
1968N/A ioloop->next_max_time = (1ULL << (TIME_T_MAX_BITS-1)) - 1;
1968N/A return -1;
1968N/A }
1968N/A
1968N/A tv_now.tv_sec = 0;
1968N/A msecs = timeout_get_wait_time(timeout, tv_r, &tv_now);
1968N/A ioloop->next_max_time = (tv_now.tv_sec + msecs/1000) + 1;
2510N/A
1968N/A /* update ioloop_timeval - this is meant for io_loop_handle_timeouts()'s
1968N/A ioloop_wait_usecs calculation. normally after this we go to the
1968N/A ioloop and after that we update ioloop_timeval immediately again. */
2028N/A ioloop_timeval = tv_now;
2028N/A ioloop_time = tv_now.tv_sec;
2028N/A return msecs;
2028N/A}
2028N/A
2028N/Astatic int timeout_cmp(const void *p1, const void *p2)
2028N/A{
2028N/A const struct timeout *to1 = p1, *to2 = p2;
2028N/A
2073N/A return timeval_cmp(&to1->next_run, &to2->next_run);
2028N/A}
2028N/A
2028N/Astatic void io_loop_default_time_moved(time_t old_time, time_t new_time)
2028N/A{
2028N/A if (old_time > new_time) {
2028N/A i_warning("Time moved backwards by %ld seconds.",
2028N/A (long)(old_time - new_time));
2028N/A }
2028N/A}
2028N/A
2028N/Astatic void io_loop_timeouts_start_new(struct ioloop *ioloop)
1968N/A{
1968N/A struct timeout *const *to_idx;
2028N/A
2028N/A if (array_count(&ioloop->timeouts_new) == 0)
2028N/A return;
1968N/A
1968N/A io_loop_time_refresh();
1968N/A
1968N/A array_foreach(&ioloop->timeouts_new, to_idx) {
1968N/A struct timeout *timeout= *to_idx;
1968N/A i_assert(timeout->next_run.tv_sec == 0 &&
1968N/A timeout->next_run.tv_usec == 0);
1968N/A i_assert(!timeout->one_shot);
1968N/A i_assert(timeout->msecs > 0);
1968N/A timeout_update_next(timeout, &ioloop_timeval);
1968N/A priorityq_add(ioloop->timeouts, &timeout->item);
1968N/A }
1968N/A array_clear(&ioloop->timeouts_new);
2028N/A}
2028N/A
2028N/Astatic void io_loop_timeouts_update(struct ioloop *ioloop, long diff_secs)
2028N/A{
1968N/A struct priorityq_item *const *items;
1968N/A unsigned int i, count;
1968N/A
1968N/A count = priorityq_count(ioloop->timeouts);
1968N/A items = priorityq_items(ioloop->timeouts);
1968N/A for (i = 0; i < count; i++) {
1968N/A struct timeout *to = (struct timeout *)items[i];
1968N/A
2028N/A to->next_run.tv_sec += diff_secs;
1968N/A }
1968N/A}
1968N/A
1968N/Astatic void io_loops_timeouts_update(long diff_secs)
1968N/A{
1968N/A struct ioloop *ioloop;
1968N/A
1968N/A for (ioloop = current_ioloop; ioloop != NULL; ioloop = ioloop->prev)
1968N/A io_loop_timeouts_update(ioloop, diff_secs);
1968N/A}
1968N/A
2510N/Astatic void io_loop_handle_timeouts_real(struct ioloop *ioloop)
2510N/A{
2510N/A struct priorityq_item *item;
1968N/A struct timeval tv, tv_call, prev_ioloop_timeval = ioloop_timeval;
2510N/A data_stack_frame_t t_id;
1968N/A
1968N/A if (gettimeofday(&ioloop_timeval, NULL) < 0)
1968N/A i_fatal("gettimeofday(): %m");
1968N/A
1968N/A /* Don't bother comparing usecs. */
2028N/A if (unlikely(ioloop_time > ioloop_timeval.tv_sec)) {
2453N/A /* time moved backwards */
2510N/A io_loops_timeouts_update(-(long)(ioloop_time -
2453N/A ioloop_timeval.tv_sec));
2453N/A ioloop->time_moved_callback(ioloop_time,
1968N/A ioloop_timeval.tv_sec);
1968N/A /* the callback may have slept, so check the time again. */
1968N/A if (gettimeofday(&ioloop_timeval, NULL) < 0)
2028N/A i_fatal("gettimeofday(): %m");
1968N/A } else {
1968N/A if (unlikely(ioloop_timeval.tv_sec >
1968N/A ioloop->next_max_time)) {
1968N/A io_loops_timeouts_update(ioloop_timeval.tv_sec -
1968N/A ioloop->next_max_time);
1968N/A /* time moved forwards */
1968N/A ioloop->time_moved_callback(ioloop->next_max_time,
1968N/A ioloop_timeval.tv_sec);
2510N/A }
2510N/A ioloop->ioloop_wait_usecs +=
1968N/A timeval_diff_usecs(&ioloop_timeval, &prev_ioloop_timeval);
2028N/A }
2028N/A
2028N/A ioloop_time = ioloop_timeval.tv_sec;
2028N/A tv_call = ioloop_timeval;
2028N/A
2028N/A while ((item = priorityq_peek(ioloop->timeouts)) != NULL) {
2510N/A struct timeout *timeout = (struct timeout *)item;
1968N/A
2028N/A /* use tv_call to make sure we don't get to infinite loop in
2028N/A case callbacks update ioloop_timeval. */
2028N/A if (timeout_get_wait_time(timeout, &tv, &tv_call) > 0)
2510N/A break;
2510N/A
2510N/A if (timeout->one_shot) {
2510N/A /* remove timeout from queue */
2510N/A priorityq_remove(timeout->ioloop->timeouts, &timeout->item);
2028N/A } else {
2028N/A /* update timeout's next_run and reposition it in the queue */
2028N/A timeout_reset_timeval(timeout, &tv_call);
2028N/A }
2028N/A
2028N/A if (timeout->ctx != NULL)
2028N/A io_loop_context_activate(timeout->ctx);
2028N/A t_id = t_push_named("ioloop timeout handler %p",
2028N/A (void *)timeout->callback);
2132N/A timeout->callback(timeout->context);
2132N/A if (t_pop() != t_id) {
2132N/A i_panic("Leaked a t_pop() call in timeout handler %p",
2132N/A (void *)timeout->callback);
2132N/A }
2132N/A if (ioloop->cur_ctx != NULL)
2132N/A io_loop_context_deactivate(ioloop->cur_ctx);
2132N/A }
2132N/A}
2132N/A
2132N/Avoid io_loop_handle_timeouts(struct ioloop *ioloop)
2132N/A{
2028N/A T_BEGIN {
2028N/A io_loop_handle_timeouts_real(ioloop);
2028N/A } T_END;
2028N/A}
2028N/A
2028N/Avoid io_loop_call_io(struct io *io)
2028N/A{
2028N/A struct ioloop *ioloop = io->ioloop;
2028N/A data_stack_frame_t t_id;
2028N/A
2028N/A if (io->pending) {
2028N/A i_assert(ioloop->io_pending_count > 0);
2028N/A ioloop->io_pending_count--;
2028N/A io->pending = FALSE;
2028N/A }
2028N/A
2028N/A if (io->ctx != NULL)
2028N/A io_loop_context_activate(io->ctx);
2028N/A t_id = t_push_named("ioloop handler %p",
2310N/A (void *)io->callback);
2028N/A io->callback(io->context);
2028N/A if (t_pop() != t_id) {
2028N/A i_panic("Leaked a t_pop() call in I/O handler %p",
2028N/A (void *)io->callback);
2028N/A }
2028N/A if (ioloop->cur_ctx != NULL)
2028N/A io_loop_context_deactivate(ioloop->cur_ctx);
2028N/A}
2028N/A
2028N/Avoid io_loop_run(struct ioloop *ioloop)
2028N/A{
2028N/A if (ioloop->handler_context == NULL)
2028N/A io_loop_initialize_handler(ioloop);
2028N/A
2028N/A if (ioloop->cur_ctx != NULL)
2028N/A io_loop_context_unref(&ioloop->cur_ctx);
2028N/A
2028N/A /* recursive io_loop_run() isn't allowed for the same ioloop.
2028N/A it can break backends. */
2028N/A i_assert(!ioloop->iolooping);
2028N/A ioloop->iolooping = TRUE;
2028N/A
2028N/A ioloop->running = TRUE;
2028N/A while (ioloop->running)
1968N/A io_loop_handler_run(ioloop);
2028N/A ioloop->iolooping = FALSE;
2028N/A}
2028N/A
2073N/Astatic void io_loop_call_pending(struct ioloop *ioloop)
2073N/A{
2073N/A struct io_file *io;
2073N/A
2073N/A while (ioloop->io_pending_count > 0) {
2028N/A io = ioloop->io_files;
2028N/A do {
2028N/A ioloop->next_io_file = io->next;
2028N/A if (io->io.pending)
2028N/A io_loop_call_io(&io->io);
2028N/A if (ioloop->io_pending_count == 0)
2028N/A break;
2028N/A io = ioloop->next_io_file;
2028N/A } while (io != NULL);
2028N/A }
2028N/A}
2028N/A
2028N/Avoid io_loop_handler_run(struct ioloop *ioloop)
2028N/A{
2028N/A io_loop_timeouts_start_new(ioloop);
2028N/A io_loop_handler_run_internal(ioloop);
2028N/A io_loop_call_pending(ioloop);
2028N/A}
2028N/A
2028N/Avoid io_loop_stop(struct ioloop *ioloop)
2028N/A{
2028N/A ioloop->running = FALSE;
1968N/A}
2028N/A
2028N/Avoid io_loop_set_running(struct ioloop *ioloop)
2028N/A{
2028N/A ioloop->running = TRUE;
2028N/A}
2510N/A
2510N/Avoid io_loop_set_max_fd_count(struct ioloop *ioloop, unsigned int max_fds)
2510N/A{
2510N/A ioloop->max_fd_count = max_fds;
2028N/A}
2028N/A
2028N/Abool io_loop_is_running(struct ioloop *ioloop)
1968N/A{
2028N/A return ioloop->running;
2028N/A}
2028N/A
2028N/Avoid io_loop_time_refresh(void)
2028N/A{
2453N/A if (gettimeofday(&ioloop_timeval, NULL) < 0)
2510N/A i_fatal("gettimeofday(): %m");
2453N/A ioloop_time = ioloop_timeval.tv_sec;
2453N/A}
2028N/A
2028N/Astruct ioloop *io_loop_create(void)
2028N/A{
2028N/A struct ioloop *ioloop;
2028N/A
2028N/A /* initialize time */
2028N/A if (gettimeofday(&ioloop_timeval, NULL) < 0)
2028N/A i_fatal("gettimeofday(): %m");
2028N/A ioloop_time = ioloop_timeval.tv_sec;
2028N/A
1968N/A ioloop = i_new(struct ioloop, 1);
1968N/A ioloop->timeouts = priorityq_init(timeout_cmp, 32);
2028N/A i_array_init(&ioloop->timeouts_new, 8);
1968N/A
1968N/A ioloop->time_moved_callback = current_ioloop != NULL ?
1968N/A current_ioloop->time_moved_callback :
1968N/A io_loop_default_time_moved;
2028N/A
1968N/A ioloop->prev = current_ioloop;
1968N/A io_loop_set_current(ioloop);
2028N/A return ioloop;
1968N/A}
2028N/A
1968N/Avoid io_loop_destroy(struct ioloop **_ioloop)
1968N/A{
2028N/A struct ioloop *ioloop = *_ioloop;
1968N/A struct timeout *const *to_idx;
1968N/A struct priorityq_item *item;
2028N/A
2510N/A *_ioloop = NULL;
1968N/A
2028N/A /* ->prev won't work unless loops are destroyed in create order */
1968N/A i_assert(ioloop == current_ioloop);
1968N/A io_loop_set_current(current_ioloop->prev);
2028N/A
2028N/A if (ioloop->notify_handler_context != NULL)
2028N/A io_loop_notify_handler_deinit(ioloop);
2028N/A
1968N/A while (ioloop->io_files != NULL) {
2028N/A struct io_file *io = ioloop->io_files;
2028N/A struct io *_io = &io->io;
1968N/A
2028N/A i_warning("I/O leak: %p (line %u, fd %d)",
2028N/A (void *)io->io.callback,
2028N/A io->io.source_linenum, io->fd);
2028N/A io_remove(&_io);
2073N/A }
2028N/A i_assert(ioloop->io_pending_count == 0);
2028N/A
2028N/A array_foreach(&ioloop->timeouts_new, to_idx) {
2028N/A struct timeout *to = *to_idx;
2028N/A
2028N/A i_warning("Timeout leak: %p (line %u)", (void *)to->callback,
2028N/A to->source_linenum);
1968N/A timeout_free(to);
1968N/A }
1968N/A array_free(&ioloop->timeouts_new);
1968N/A
1968N/A while ((item = priorityq_pop(ioloop->timeouts)) != NULL) {
1968N/A struct timeout *to = (struct timeout *)item;
2028N/A
2028N/A i_warning("Timeout leak: %p (line %u)", (void *)to->callback,
2028N/A to->source_linenum);
2028N/A timeout_free(to);
2028N/A }
2028N/A priorityq_deinit(&ioloop->timeouts);
2028N/A
2028N/A if (ioloop->handler_context != NULL)
2028N/A io_loop_handler_deinit(ioloop);
2028N/A
2028N/A if (ioloop->cur_ctx != NULL)
1968N/A io_loop_context_deactivate(ioloop->cur_ctx);
1968N/A
1968N/A i_free(ioloop);
2028N/A}
2028N/A
1968N/Avoid io_loop_set_time_moved_callback(struct ioloop *ioloop,
1968N/A io_loop_time_moved_callback_t *callback)
2028N/A{
2028N/A ioloop->time_moved_callback = callback;
2028N/A}
1968N/A
1968N/Astatic void io_switch_callbacks_free(void)
2510N/A{
2510N/A array_free(&io_switch_callbacks);
2510N/A}
2510N/A
1968N/Avoid io_loop_set_current(struct ioloop *ioloop)
1968N/A{
2028N/A io_switch_callback_t *const *callbackp;
1968N/A struct ioloop *prev_ioloop = current_ioloop;
1968N/A
1968N/A current_ioloop = ioloop;
2028N/A if (array_is_created(&io_switch_callbacks)) {
2028N/A array_foreach(&io_switch_callbacks, callbackp)
2028N/A (*callbackp)(prev_ioloop);
2028N/A }
1968N/A}
2028N/A
2453N/Avoid io_loop_add_switch_callback(io_switch_callback_t *callback)
2510N/A{
2453N/A if (!array_is_created(&io_switch_callbacks)) {
2453N/A i_array_init(&io_switch_callbacks, 4);
1968N/A lib_atexit(io_switch_callbacks_free);
1968N/A }
1968N/A array_append(&io_switch_callbacks, &callback, 1);
2028N/A}
1968N/A
1968N/Avoid io_loop_remove_switch_callback(io_switch_callback_t *callback)
1968N/A{
1968N/A io_switch_callback_t *const *callbackp;
1968N/A unsigned int idx;
1968N/A
1968N/A array_foreach(&io_switch_callbacks, callbackp) {
2510N/A if (*callbackp == callback) {
2510N/A idx = array_foreach_idx(&io_switch_callbacks, callbackp);
2510N/A array_delete(&io_switch_callbacks, idx, 1);
2510N/A return;
2510N/A }
2510N/A }
2510N/A i_unreached();
2510N/A}
2510N/A
2510N/Astruct ioloop_context *io_loop_context_new(struct ioloop *ioloop)
2510N/A{
2510N/A struct ioloop_context *ctx;
2510N/A
2510N/A ctx = i_new(struct ioloop_context, 1);
2510N/A ctx->refcount = 2;
2510N/A ctx->ioloop = ioloop;
2510N/A i_array_init(&ctx->callbacks, 4);
2510N/A
2510N/A if (ioloop->cur_ctx != NULL)
2510N/A io_loop_context_unref(&ioloop->cur_ctx);
2510N/A ioloop->cur_ctx = ctx;
2510N/A return ctx;
2510N/A}
2510N/A
2510N/Avoid io_loop_context_ref(struct ioloop_context *ctx)
2510N/A{
2510N/A i_assert(ctx->refcount > 0);
2510N/A
2510N/A ctx->refcount++;
2510N/A}
2510N/A
2510N/Avoid io_loop_context_unref(struct ioloop_context **_ctx)
2510N/A{
2510N/A struct ioloop_context *ctx = *_ctx;
2510N/A
2510N/A *_ctx = NULL;
2510N/A
2510N/A i_assert(ctx->refcount > 0);
2693N/A if (--ctx->refcount > 0)
2693N/A return;
2693N/A
2693N/A /* cur_ctx itself keeps a reference */
2693N/A i_assert(ctx->ioloop->cur_ctx != ctx);
2510N/A
2693N/A array_free(&ctx->callbacks);
2510N/A i_free(ctx);
2510N/A}
2510N/A
2510N/A#undef io_loop_context_add_callbacks
2510N/Avoid io_loop_context_add_callbacks(struct ioloop_context *ctx,
2510N/A io_callback_t *activate,
2693N/A io_callback_t *deactivate, void *context)
2510N/A{
2510N/A struct ioloop_context_callback cb;
2510N/A
2510N/A memset(&cb, 0, sizeof(cb));
2693N/A cb.activate = activate;
2510N/A cb.deactivate = deactivate;
2693N/A cb.context = context;
2510N/A
2510N/A array_append(&ctx->callbacks, &cb, 1);
2510N/A}
2510N/A
2510N/A#undef io_loop_context_remove_callbacks
2510N/Avoid io_loop_context_remove_callbacks(struct ioloop_context *ctx,
2510N/A io_callback_t *activate,
2510N/A io_callback_t *deactivate, void *context)
2510N/A{
2510N/A struct ioloop_context_callback *cb;
2510N/A
2510N/A array_foreach_modifiable(&ctx->callbacks, cb) {
2510N/A if (cb->context == context &&
2510N/A cb->activate == activate && cb->deactivate == deactivate) {
2510N/A /* simply mark it as deleted, since we could get
2510N/A here from activate/deactivate loop */
2510N/A cb->activate = NULL;
2510N/A cb->deactivate = NULL;
2616N/A cb->context = NULL;
2616N/A return;
2510N/A }
2616N/A }
2616N/A i_panic("io_loop_context_remove_callbacks() context not found");
2510N/A}
2510N/A
2510N/Astatic void
2510N/Aio_loop_context_remove_deleted_callbacks(struct ioloop_context *ctx)
2510N/A{
2510N/A const struct ioloop_context_callback *cbs;
2510N/A unsigned int i, count;
2510N/A
2510N/A cbs = array_get(&ctx->callbacks, &count);
2510N/A for (i = 0; i < count; ) {
2510N/A if (cbs[i].activate != NULL)
2510N/A i++;
2510N/A else {
2510N/A array_delete(&ctx->callbacks, i, 1);
2510N/A cbs = array_get(&ctx->callbacks, &count);
2510N/A }
2510N/A }
2510N/A}
2510N/A
2510N/Avoid io_loop_context_activate(struct ioloop_context *ctx)
2510N/A{
2510N/A struct ioloop_context_callback *cb;
2510N/A
2510N/A i_assert(ctx->ioloop->cur_ctx == NULL);
2510N/A
2510N/A ctx->ioloop->cur_ctx = ctx;
2510N/A io_loop_context_ref(ctx);
2510N/A array_foreach_modifiable(&ctx->callbacks, cb) {
2510N/A i_assert(!cb->activated);
2510N/A if (cb->activate != NULL)
2510N/A cb->activate(cb->context);
2510N/A cb->activated = TRUE;
2510N/A }
2510N/A}
2510N/A
2510N/Avoid io_loop_context_deactivate(struct ioloop_context *ctx)
2510N/A{
2510N/A struct ioloop_context_callback *cb;
2510N/A
2510N/A i_assert(ctx->ioloop->cur_ctx != NULL);
2510N/A
2510N/A array_foreach_modifiable(&ctx->callbacks, cb) {
2510N/A if (!cb->activated) {
2510N/A /* we just added this callback. don't deactivate it
2510N/A before it gets first activated. */
2510N/A } else {
2510N/A if (cb->deactivate != NULL)
2510N/A cb->deactivate(cb->context);
2510N/A cb->activated = FALSE;
2510N/A }
2510N/A }
2510N/A ctx->ioloop->cur_ctx = NULL;
2510N/A io_loop_context_remove_deleted_callbacks(ctx);
2510N/A io_loop_context_unref(&ctx);
2510N/A}
2510N/A
2510N/Astruct ioloop_context *io_loop_get_current_context(struct ioloop *ioloop)
2510N/A{
2510N/A return ioloop->cur_ctx;
2510N/A}
2510N/A
2510N/Astruct io *io_loop_move_io(struct io **_io)
2510N/A{
2510N/A struct io *old_io = *_io;
2510N/A struct io_file *old_io_file, *new_io_file;
2510N/A
2510N/A i_assert((old_io->condition & IO_NOTIFY) == 0);
2510N/A
2510N/A if (old_io->ioloop == current_ioloop)
2510N/A return old_io;
2510N/A
2510N/A old_io_file = (struct io_file *)old_io;
1968N/A new_io_file = io_add_file(old_io_file->fd, old_io->condition,
1968N/A old_io->source_linenum,
1968N/A old_io->callback, old_io->context);
1968N/A if (old_io_file->istream != NULL) {
1968N/A /* reference before io_remove() */
2028N/A new_io_file->istream = old_io_file->istream;
1968N/A i_stream_ref(new_io_file->istream);
2028N/A }
2132N/A if (old_io->pending)
2132N/A io_set_pending(&new_io_file->io);
1968N/A io_remove(_io);
2132N/A if (new_io_file->istream != NULL) {
2132N/A /* update istream io after it was removed with io_remove() */
2132N/A i_stream_set_io(new_io_file->istream, &new_io_file->io);
2132N/A }
2132N/A return &new_io_file->io;
2132N/A}
2028N/A
2028N/Astruct timeout *io_loop_move_timeout(struct timeout **_timeout)
2028N/A{
2028N/A struct timeout *new_to, *old_to = *_timeout;
1968N/A
1968N/A if (old_to->ioloop == current_ioloop)
1968N/A return old_to;
1968N/A
1968N/A new_to = timeout_copy(old_to);
2028N/A timeout_remove(_timeout);
2028N/A return new_to;
2028N/A}
1968N/A
2028N/Abool io_loop_have_ios(struct ioloop *ioloop)
2028N/A{
2028N/A return ioloop->io_files != NULL;
2028N/A}
2132N/A
2132N/Abool io_loop_have_immediate_timeouts(struct ioloop *ioloop)
2132N/A{
2132N/A struct timeval tv;
2132N/A
2132N/A return io_loop_get_wait_time(ioloop, &tv) == 0;
2132N/A}
2132N/A
2028N/Auint64_t io_loop_get_wait_usecs(struct ioloop *ioloop)
2132N/A{
2132N/A return ioloop->ioloop_wait_usecs;
2132N/A}
2132N/A
2132N/Aenum io_condition io_loop_find_fd_conditions(struct ioloop *ioloop, int fd)
1968N/A{
2132N/A enum io_condition conditions = 0;
2132N/A struct io_file *io;
2132N/A
2132N/A i_assert(fd >= 0);
2132N/A
2132N/A for (io = ioloop->io_files; io != NULL; io = io->next) {
1968N/A if (io->fd == fd)
1968N/A conditions |= io->io.condition;
1968N/A }
1968N/A return conditions;
1968N/A}
1968N/A