sd-event.c revision fd38203a2a7bfbdc6cb5fd4dc54378e70f7d6778
/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
/***
This file is part of systemd.
Copyright 2013 Lennart Poettering
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
systemd is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with systemd; If not, see <http://www.gnu.org/licenses/>.
***/
#include "macro.h"
#include "refcnt.h"
#include "prioq.h"
#include "hashmap.h"
#include "util.h"
#include "time-util.h"
#include "sd-event.h"
#define EPOLL_QUEUE_MAX 64
typedef enum EventSourceType {
struct sd_event_source {
void *userdata;
bool pending:1;
int priority;
unsigned pending_index;
unsigned prepare_index;
unsigned pending_iteration;
unsigned prepare_iteration;
union {
struct {
int fd;
bool registered:1;
} io;
struct {
unsigned prioq_index;
} time;
struct {
struct signalfd_siginfo siginfo;
int sig;
} signal;
struct {
int options;
} child;
struct {
} defer;
};
};
struct sd_event {
int epoll_fd;
int signal_fd;
int realtime_fd;
int monotonic_fd;
unsigned n_unmuted_child_sources;
unsigned iteration;
unsigned processed_children;
bool quit;
};
static int pending_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
/* Unmuted ones first */
return -1;
return 1;
/* Lower priority values first */
return -1;
return 1;
/* Older entries first */
if (x->pending_iteration < y->pending_iteration)
return -1;
if (x->pending_iteration > y->pending_iteration)
return 1;
/* Stability for the rest */
if (x < y)
return -1;
if (y > x)
return 1;
return 0;
}
static int prepare_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
/* Move most recently prepared ones last, so that we can stop
* preparing as soon as we hit one that has already been
* prepared in the current iteration */
if (x->prepare_iteration < y->prepare_iteration)
return -1;
if (x->prepare_iteration > y->prepare_iteration)
return 1;
/* Unmuted ones first */
return -1;
return 1;
/* Lower priority values first */
return -1;
return 1;
/* Stability for the rest */
if (x < y)
return -1;
if (y > x)
return 1;
return 0;
}
static int time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
/* Unmuted ones first */
return -1;
return 1;
/* Move the pending ones to the end */
return -1;
return 1;
/* Order by time */
return -1;
return -1;
/* Stability for the rest */
if (x < y)
return -1;
if (y > x)
return 1;
return 0;
}
static void event_free(sd_event *e) {
assert(e);
if (e->epoll_fd >= 0)
if (e->signal_fd >= 0)
if (e->realtime_fd >= 0)
if (e->monotonic_fd >= 0)
prioq_free(e->pending);
prioq_free(e->prepare);
prioq_free(e->monotonic);
prioq_free(e->realtime);
free(e->signal_sources);
free(e);
}
sd_event *e;
int r;
if (!ret)
return -EINVAL;
if (!e)
return -ENOMEM;
e->n_ref = REFCNT_INIT;
if (!e->pending) {
r = -ENOMEM;
goto fail;
}
if (e->epoll_fd < 0) {
r = -errno;
goto fail;
}
*ret = e;
return 0;
fail:
event_free(e);
return r;
}
if (!e)
return NULL;
return e;
}
if (!e)
return NULL;
if (REFCNT_DEC(e->n_ref) <= 0)
event_free(e);
return NULL;
}
static int source_io_unregister(sd_event_source *s) {
int r;
assert(s);
if (!s->io.registered)
return 0;
if (r < 0)
return -errno;
s->io.registered = false;
return 0;
}
struct epoll_event ev = {};
int r;
assert(s);
assert(m != SD_EVENT_MUTED);
if (m == SD_EVENT_ONESHOT)
if (s->io.registered)
else
if (r < 0)
return -errno;
s->io.registered = true;
return 0;
}
static void source_free(sd_event_source *s) {
assert(s);
if (s->event) {
switch (s->type) {
case SOURCE_IO:
break;
case SOURCE_MONOTONIC:
break;
case SOURCE_REALTIME:
break;
case SOURCE_SIGNAL:
if (s->event->signal_sources)
}
break;
case SOURCE_CHILD:
if (s->mute != SD_EVENT_MUTED) {
s->event->n_unmuted_child_sources--;
}
}
break;
}
if (s->pending)
if (s->prepare)
sd_event_unref(s->event);
}
free(s);
}
static int source_set_pending(sd_event_source *s, bool b) {
int r;
assert(s);
if (s->pending == b)
return 0;
s->pending = b;
if (b) {
if (r < 0) {
s->pending = false;
return r;
}
} else
return 0;
}
sd_event_source *s;
assert(e);
if (!s)
return NULL;
s->n_ref = REFCNT_INIT;
s->event = sd_event_ref(e);
s->mute = SD_EVENT_UNMUTED;
return s;
}
int sd_event_add_io(
sd_event *e,
int fd,
void *userdata,
sd_event_source **ret) {
sd_event_source *s;
int r;
if (!e)
return -EINVAL;
if (fd < 0)
return -EINVAL;
return -EINVAL;
if (!callback)
return -EINVAL;
if (!ret)
return -EINVAL;
s = source_new(e, SOURCE_IO);
if (!s)
return -ENOMEM;
if (r < 0) {
source_free(s);
return -errno;
}
*ret = s;
return 0;
}
static int event_setup_timer_fd(
sd_event *e,
int *timer_fd,
struct epoll_event ev = {};
int r, fd;
assert(e);
return 0;
if (fd < 0)
return -errno;
if (r < 0) {
return -errno;
}
return 0;
}
static int event_add_time_internal(
sd_event *e,
int *timer_fd,
void *userdata,
sd_event_source **ret) {
sd_event_source *s;
int r;
if (!e)
return -EINVAL;
if (!callback)
return -EINVAL;
if (!ret)
return -EINVAL;
if (!*prioq) {
if (!*prioq)
return -ENOMEM;
}
if (*timer_fd < 0) {
if (r < 0)
return r;
}
s = source_new(e, type);
if (!s)
return -ENOMEM;
if (r < 0) {
source_free(s);
return r;
}
*ret = s;
return 0;
}
int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
}
int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
}
static int event_update_signal_fd(sd_event *e) {
struct epoll_event ev = {};
bool add_to_epoll;
int r;
assert(e);
add_to_epoll = e->signal_fd < 0;
if (r < 0)
return -errno;
e->signal_fd = r;
if (!add_to_epoll)
return 0;
if (r < 0) {
e->signal_fd = -1;
return -errno;
}
return 0;
}
int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
sd_event_source *s;
int r;
if (!e)
return -EINVAL;
if (sig <= 0)
return -EINVAL;
return -EINVAL;
if (!callback)
return -EINVAL;
if (!ret)
return -EINVAL;
if (!e->signal_sources) {
if (!e->signal_sources)
return -ENOMEM;
} else if (e->signal_sources[sig])
return -EBUSY;
s = source_new(e, SOURCE_SIGNAL);
if (!s)
return -ENOMEM;
e->signal_sources[sig] = s;
r = event_update_signal_fd(e);
if (r < 0) {
source_free(s);
return r;
}
}
*ret = s;
return 0;
}
int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
sd_event_source *s;
int r;
if (!e)
return -EINVAL;
if (pid <= 1)
return -EINVAL;
return -EINVAL;
if (!callback)
return -EINVAL;
if (!ret)
return -EINVAL;
if (r < 0)
return r;
return -EBUSY;
s = source_new(e, SOURCE_CHILD);
if (!s)
return -ENOMEM;
if (r < 0) {
source_free(s);
return r;
}
e->n_unmuted_child_sources ++;
r = event_update_signal_fd(e);
if (r < 0) {
source_free(s);
return -errno;
}
}
*ret = s;
return 0;
}
int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
sd_event_source *s;
int r;
if (!e)
return -EINVAL;
if (!ret)
return -EINVAL;
s = source_new(e, SOURCE_DEFER);
if (!s)
return -ENOMEM;
r = source_set_pending(s, true);
if (r < 0) {
source_free(s);
return r;
}
*ret = s;
return 0;
}
if (!s)
return NULL;
return s;
}
if (!s)
return NULL;
if (REFCNT_DEC(s->n_ref) <= 0)
source_free(s);
return NULL;
}
int sd_event_source_get_pending(sd_event_source *s) {
if (!s)
return -EINVAL;
return s->pending;
}
int sd_event_source_get_io_fd(sd_event_source *s) {
if (!s)
return -EINVAL;
return -EDOM;
}
if (!s)
return -EINVAL;
return -EDOM;
if (!events)
return -EINVAL;
return 0;
}
int r;
if (!s)
return -EINVAL;
return -EDOM;
return -EINVAL;
return 0;
if (s->mute != SD_EVENT_MUTED) {
if (r < 0)
return r;
}
return 0;
}
if (!s)
return -EINVAL;
return -EDOM;
if (!revents)
return -EINVAL;
if (!s->pending)
return -ENODATA;
return 0;
}
int sd_event_source_get_signal(sd_event_source *s) {
if (!s)
return -EINVAL;
if (s->type != SOURCE_SIGNAL)
return -EDOM;
}
if (!s)
return -EINVAL;
return s->priority;
}
if (!s)
return -EINVAL;
return 0;
if (s->pending)
if (s->prepare)
return 0;
}
if (!s)
return -EINVAL;
if (!m)
return -EINVAL;
*m = s->mute;
return 0;
}
int r;
if (!s)
return -EINVAL;
return -EINVAL;
if (s->mute == m)
return 0;
if (m == SD_EVENT_MUTED) {
switch (s->type) {
case SOURCE_IO:
r = source_io_unregister(s);
if (r < 0)
return r;
s->mute = m;
break;
case SOURCE_MONOTONIC:
s->mute = m;
break;
case SOURCE_REALTIME:
s->mute = m;
break;
case SOURCE_SIGNAL:
s->mute = m;
}
break;
case SOURCE_CHILD:
s->mute = m;
s->event->n_unmuted_child_sources--;
}
break;
case SOURCE_DEFER:
s->mute = m;
break;
}
} else {
switch (s->type) {
case SOURCE_IO:
if (r < 0)
return r;
s->mute = m;
break;
case SOURCE_MONOTONIC:
s->mute = m;
break;
case SOURCE_REALTIME:
s->mute = m;
break;
case SOURCE_SIGNAL:
s->mute = m;
}
break;
case SOURCE_CHILD:
s->mute = m;
if (s->mute == SD_EVENT_MUTED) {
s->event->n_unmuted_child_sources++;
}
}
break;
case SOURCE_DEFER:
s->mute = m;
break;
}
}
if (s->pending)
if (s->prepare)
return 0;
}
if (!s)
return -EINVAL;
if (!usec)
return -EINVAL;
return -EDOM;
return 0;
}
if (!s)
return -EINVAL;
return -EDOM;
return 0;
if (s->type == SOURCE_REALTIME)
else
return 0;
}
int r;
if (!s)
return -EINVAL;
return 0;
return 0;
}
if (r < 0)
return r;
if (callback) {
if (r < 0)
return r;
} else
return 0;
}
void* sd_event_source_get_userdata(sd_event_source *s) {
if (!s)
return NULL;
return s->userdata;
}
static int event_arm_timer(
sd_event *e,
int timer_fd,
struct itimerspec its = {};
sd_event_source *s;
int r;
assert_se(e);
s = prioq_peek(prioq);
if (!s || s->mute == SD_EVENT_MUTED)
return 0;
return 0;
/* We don' want to disarm here, just mean some time looooong ago. */
} else
if (r < 0)
return r;
return 0;
}
assert(e);
assert(s);
/*
If this is a oneshot event source, then we added it to the
epoll with EPOLLONESHOT, hence we know it's not registered
anymore. We can save a syscall here...
*/
if (s->mute == SD_EVENT_ONESHOT)
s->io.registered = false;
return source_set_pending(s, true);
}
uint64_t x;
assert(e);
return -EIO;
if (ss < 0) {
return 0;
return -errno;
}
if (ss != sizeof(x))
return -EIO;
return 0;
}
sd_event_source *s;
int r;
assert(e);
for (;;) {
s = prioq_peek(prioq);
if (!s ||
s->mute == SD_EVENT_MUTED ||
s->pending)
break;
r = source_set_pending(s, true);
if (r < 0)
return r;
if (r < 0)
return r;
}
return 0;
}
static int process_child(sd_event *e) {
sd_event_source *s;
Iterator i;
int r;
assert(e);
/*
So, this is ugly. We iteratively invoke waitid() with P_PID
+ WNOHANG for each PID we wait for, instead of using
P_ALL. This is because we only want to get child
information of very specific child processes, and not all
of them. We might not have processed the SIGCHLD even of a
previous invocation and we don't want to maintain a
unbounded *per-child* event queue, hence we really don't
want anything flushed out of the kernel's queue that we
don't care about. Since this is O(n) this means that if you
have a lot of processes you probably want to handle SIGCHLD
yourself.
*/
HASHMAP_FOREACH(s, e->child_sources, i) {
if (s->pending)
continue;
if (s->mute == SD_EVENT_MUTED)
continue;
if (r < 0)
return -errno;
r = source_set_pending(s, true);
if (r < 0)
return r;
}
}
e->processed_children = e->iteration;
return 0;
}
struct signalfd_siginfo si;
bool read_one = false;
int r;
return -EIO;
for (;;) {
sd_event_source *s;
if (ss < 0) {
return read_one;
return -errno;
}
return -EIO;
read_one = true;
r = process_child(e);
if (r < 0)
return r;
continue;
} else {
if (!s)
return -EIO;
}
r = source_set_pending(s, true);
if (r < 0)
return r;
}
return 0;
}
static int source_dispatch(sd_event_source *s) {
int r;
assert(s);
r = source_set_pending(s, false);
if (r < 0)
return r;
if (s->mute == SD_EVENT_ONESHOT) {
r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
if (r < 0)
return r;
}
switch (s->type) {
case SOURCE_IO:
break;
case SOURCE_MONOTONIC:
break;
case SOURCE_REALTIME:
break;
case SOURCE_SIGNAL:
break;
case SOURCE_CHILD:
break;
case SOURCE_DEFER:
break;
}
return r;
}
static int event_prepare(sd_event *e) {
int r;
assert(e);
for (;;) {
sd_event_source *s;
s = prioq_peek(e->prepare);
break;
s->prepare_iteration = e->iteration;
if (r < 0)
return r;
if (r < 0)
return r;
}
return 0;
}
sd_event_source *p;
int r, i, m;
if (!e)
return -EINVAL;
if (e->quit)
return -ESTALE;
e->iteration++;
r = event_prepare(e);
if (r < 0)
return r;
if (r < 0)
return r;
if (r < 0)
return r;
/* On the first iteration, there might be already some
* zombies for us to care for, hence, don't wait */
timeout = 0;
else {
p = prioq_peek(e->pending);
if (p && p->mute != SD_EVENT_MUTED)
timeout = 0;
}
m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
if (m < 0)
return m;
dual_timestamp_get(&n);
for (i = 0; i < m; i++) {
else
if (r < 0)
return r;
}
if (r < 0)
return r;
if (r < 0)
return r;
/* On the first iteration, make sure we really process
* all children which might already be zombies. */
r = process_child(e);
if (r < 0)
return r;
}
p = prioq_peek(e->pending);
if (!p || p->mute == SD_EVENT_MUTED)
return 0;
return source_dispatch(p);
}
int sd_event_loop(sd_event *e) {
int r;
if (!e)
return -EINVAL;
while (!e->quit) {
if (r < 0)
return r;
}
return 0;
}
int sd_event_quit(sd_event *e) {
if (!e)
return EINVAL;
return e->quit;
}
int sd_event_request_quit(sd_event *e) {
if (!e)
return -EINVAL;
e->quit = true;
return 0;
}
if (!s)
return NULL;
return s->event;
}