journal-remote-parse.c revision 8201af08fa09c2bd0f005fbe262f27e2c5bd2d86
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering/***
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering This file is part of systemd.
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering Copyright 2014 Zbigniew Jędrzejewski-Szmek
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering systemd is free software; you can redistribute it and/or modify it
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering under the terms of the GNU Lesser General Public License as published by
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering the Free Software Foundation; either version 2.1 of the License, or
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering (at your option) any later version.
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering systemd is distributed in the hope that it will be useful, but
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering WITHOUT ANY WARRANTY; without even the implied warranty of
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering Lesser General Public License for more details.
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering You should have received a copy of the GNU Lesser General Public License
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering along with systemd; If not, see <http://www.gnu.org/licenses/>.
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering***/
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering#include "journal-remote-parse.h"
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering#include "journald-native.h"
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering#define LINE_CHUNK 1024u
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poetteringvoid source_free(RemoteSource *source) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (!source)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->fd >= 0) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering log_debug("Closing fd:%d (%s)", source->fd, source->name);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering safe_close(source->fd);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering }
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering free(source->name);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering free(source->buf);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering iovw_free_contents(&source->iovw);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering sd_event_source_unref(source->event);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering free(source);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering}
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poetteringstatic int get_line(RemoteSource *source, char **line, size_t *size) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering ssize_t n, remain;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering char *c = NULL;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering char *newbuf = NULL;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering size_t newsize = 0;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->state == STATE_LINE);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->filled <= source->size);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->buf == NULL || source->size > 0);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering while (true) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->buf)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering c = memchr(source->buf + source->scanned, '\n',
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->filled - source->scanned);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (c != NULL)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering break;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->scanned = source->filled;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->scanned >= DATA_SIZE_MAX) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return -E2BIG;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering }
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->fd < 0)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering /* we have to wait for some data to come to us */
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return -EWOULDBLOCK;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->size - source->filled < LINE_CHUNK &&
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering !GREEDY_REALLOC(source->buf, source->size,
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return log_oom();
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->size - source->filled >= LINE_CHUNK);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering n = read(source->fd, source->buf + source->filled,
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering MAX(source->size, DATA_SIZE_MAX) - source->filled);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (n < 0) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (errno != EAGAIN && errno != EWOULDBLOCK)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering log_error("read(%d, ..., %zd): %m", source->fd,
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->size - source->filled);
b8bde11658366290521e3d03316378b482600323Jan Engelhardt return -errno;
b8bde11658366290521e3d03316378b482600323Jan Engelhardt } else if (n == 0)
b8bde11658366290521e3d03316378b482600323Jan Engelhardt return 0;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->filled += n;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering }
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
b8bde11658366290521e3d03316378b482600323Jan Engelhardt *line = source->buf;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering *size = c + 1 - source->buf;
b8bde11658366290521e3d03316378b482600323Jan Engelhardt
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering /* Check if something remains */
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering remain = source->buf + source->filled - c - 1;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(remain >= 0);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (remain) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering newsize = MAX(remain, LINE_CHUNK);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering newbuf = malloc(newsize);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (!newbuf)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return log_oom();
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering memcpy(newbuf, c + 1, remain);
b8bde11658366290521e3d03316378b482600323Jan Engelhardt }
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->buf = newbuf;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->size = newsize;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->filled = remain;
dca348bcbb462305864526c587495a14a76bfcdeJan Engelhardt source->scanned = 0;
b8bde11658366290521e3d03316378b482600323Jan Engelhardt
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return 1;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering}
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poetteringint push_data(RemoteSource *source, const char *data, size_t size) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->state != STATE_EOF);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (!GREEDY_REALLOC(source->buf, source->size,
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->filled + size)) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering log_error("Failed to store received data of size %zu "
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering "(in addition to existing %zu bytes with %zu filled): %s",
b8bde11658366290521e3d03316378b482600323Jan Engelhardt size, source->size, source->filled, strerror(ENOMEM));
b8bde11658366290521e3d03316378b482600323Jan Engelhardt return -ENOMEM;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering }
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering memcpy(source->buf + source->filled, data, size);
5aded369782f28255bc6b494ca905d7acaea7a56Zbigniew Jędrzejewski-Szmek source->filled += size;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return 0;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering}
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
a6278b88305b237b02eabff0d870b57fe851822dLennart Poetteringstatic int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering int n;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering char *newbuf = NULL;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering size_t newsize = 0, remain;
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering
5aded369782f28255bc6b494ca905d7acaea7a56Zbigniew Jędrzejewski-Szmek assert(source);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->state == STATE_DATA_START ||
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->state == STATE_DATA ||
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering source->state == STATE_DATA_FINISH);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(size <= DATA_SIZE_MAX);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->filled <= source->size);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->scanned <= source->filled);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->buf != NULL || source->size == 0);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(source->buf == NULL || source->size > 0);
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering assert(data);
5aded369782f28255bc6b494ca905d7acaea7a56Zbigniew Jędrzejewski-Szmek
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering while(source->filled < size) {
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering if (source->fd < 0)
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering /* we have to wait for some data to come to us */
a6278b88305b237b02eabff0d870b57fe851822dLennart Poettering return -EWOULDBLOCK;
if (!GREEDY_REALLOC(source->buf, source->size, size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
log_error("read(%d, ..., %zd): %m", source->fd,
source->size - source->filled);
return -errno;
} else if (n == 0)
return 0;
source->filled += n;
}
*data = source->buf;
/* Check if something remains */
assert(size <= source->filled);
remain = source->filled - size;
if (remain) {
newsize = MAX(remain, LINE_CHUNK);
newbuf = malloc(newsize);
if (!newbuf)
return log_oom();
memcpy(newbuf, source->buf + size, remain);
}
source->buf = newbuf;
source->size = newsize;
source->filled = remain;
source->scanned = 0;
return 1;
}
static int get_data_size(RemoteSource *source) {
int r;
_cleanup_free_ void *data = NULL;
assert(source);
assert(source->state == STATE_DATA_START);
assert(source->data_size == 0);
r = fill_fixed_size(source, &data, sizeof(uint64_t));
if (r <= 0)
return r;
source->data_size = le64toh( *(uint64_t *) data );
if (source->data_size > DATA_SIZE_MAX) {
log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
source->data_size, DATA_SIZE_MAX);
return -EINVAL;
}
if (source->data_size == 0)
log_warning("Binary field with zero length");
return 1;
}
static int get_data_data(RemoteSource *source, void **data) {
int r;
assert(source);
assert(data);
assert(source->state == STATE_DATA);
r = fill_fixed_size(source, data, source->data_size);
if (r <= 0)
return r;
return 1;
}
static int get_data_newline(RemoteSource *source) {
int r;
_cleanup_free_ char *data = NULL;
assert(source);
assert(source->state == STATE_DATA_FINISH);
r = fill_fixed_size(source, (void**) &data, 1);
if (r <= 0)
return r;
assert(data);
if (*data != '\n') {
log_error("expected newline, got '%c'", *data);
return -EINVAL;
}
return 1;
}
static int process_dunder(RemoteSource *source, char *line, size_t n) {
const char *timestamp;
int r;
assert(line);
assert(n > 0);
assert(line[n-1] == '\n');
/* XXX: is it worth to support timestamps in extended format?
* We don't produce them, but who knows... */
timestamp = startswith(line, "__CURSOR=");
if (timestamp)
/* ignore __CURSOR */
return 1;
timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
else
source->ts.realtime = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
else
source->ts.monotonic = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__");
if (timestamp) {
log_notice("Unknown dunder line %s", line);
return 1;
}
/* no dunder */
return 0;
}
int process_data(RemoteSource *source) {
int r;
switch(source->state) {
case STATE_LINE: {
char *line, *sep;
size_t n;
assert(source->data_size == 0);
r = get_line(source, &line, &n);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return r;
}
assert(n > 0);
assert(line[n-1] == '\n');
if (n == 1) {
log_debug("Received empty line, event is ready");
free(line);
return 1;
}
r = process_dunder(source, line, n);
if (r != 0) {
free(line);
return r < 0 ? r : 0;
}
/* MESSAGE=xxx\n
or
COREDUMP\n
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
if (sep)
/* chomp newline */
n--;
else
/* replace \n with = */
line[n-1] = '=';
log_debug("Received: %.*s", (int) n, line);
r = iovw_put(&source->iovw, line, n);
if (r < 0) {
log_error("Failed to put line in iovect");
free(line);
return r;
}
if (!sep)
source->state = STATE_DATA_START;
return 0; /* continue */
}
case STATE_DATA_START:
assert(source->data_size == 0);
r = get_data_size(source);
log_debug("get_data_size() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
source->state = source->data_size > 0 ?
STATE_DATA : STATE_DATA_FINISH;
return 0; /* continue */
case STATE_DATA: {
void *data;
assert(source->data_size > 0);
r = get_data_data(source, &data);
log_debug("get_data_data() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
assert(data);
r = iovw_put(&source->iovw, data, source->data_size);
if (r < 0) {
log_error("failed to put binary buffer in iovect");
return r;
}
source->state = STATE_DATA_FINISH;
return 0; /* continue */
}
case STATE_DATA_FINISH:
r = get_data_newline(source);
log_debug("get_data_newline() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
source->data_size = 0;
source->state = STATE_LINE;
return 0; /* continue */
default:
assert_not_reached("wtf?");
}
}
int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
int r;
assert(source);
assert(writer);
r = process_data(source);
if (r <= 0)
return r;
/* We have a full event */
log_info("Received a full event from source@%p fd:%d (%s)",
source, source->fd, source->name);
if (!source->iovw.count) {
log_warning("Entry with no payload, skipping");
goto freeing;
}
assert(source->iovw.iovec);
assert(source->iovw.count);
r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
if (r < 0)
log_error("Failed to write entry of %zu bytes: %s",
iovw_size(&source->iovw), strerror(-r));
else
r = 1;
freeing:
iovw_free_contents(&source->iovw);
return r;
}