journal-remote-parse.c revision 9ff48d0982fcb97923955685fe9fa4e0e67cb238
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering/***
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering This file is part of systemd.
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering Copyright 2014 Zbigniew Jędrzejewski-Szmek
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering systemd is free software; you can redistribute it and/or modify it
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering under the terms of the GNU Lesser General Public License as published by
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering the Free Software Foundation; either version 2.1 of the License, or
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering (at your option) any later version.
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering systemd is distributed in the hope that it will be useful, but
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering WITHOUT ANY WARRANTY; without even the implied warranty of
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering Lesser General Public License for more details.
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering You should have received a copy of the GNU Lesser General Public License
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering along with systemd; If not, see <http://www.gnu.org/licenses/>.
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering***/
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering#include "journal-remote-parse.h"
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering#include "journald-native.h"
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
11c3a36649e5e5e77db499c92f3cdcbd619efd3aThomas Hindoe Paaboel Andersen#define LINE_CHUNK 8*1024u
11c3a36649e5e5e77db499c92f3cdcbd619efd3aThomas Hindoe Paaboel Andersen
11c3a36649e5e5e77db499c92f3cdcbd619efd3aThomas Hindoe Paaboel Andersenvoid source_free(RemoteSource *source) {
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (!source)
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering return;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (source->fd >= 0 && !source->passive_fd) {
c7f1808add4d971229ba5311cf66e659122aa338Lennart Poettering log_debug("Closing fd:%d (%s)", source->fd, source->name);
c7f1808add4d971229ba5311cf66e659122aa338Lennart Poettering safe_close(source->fd);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering }
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering free(source->name);
6ad623a3f77e087e308f334525fd4046811f2a9aLennart Poettering free(source->buf);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering iovw_free_contents(&source->iovw);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
28cb17ef0281efc3a46e5d0e702b0b0ddeaafaa4Filipe Brandenburger log_debug("Writer ref count %u", source->writer->n_ref);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering writer_unref(source->writer);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering sd_event_source_unref(source->event);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering free(source);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering}
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering/**
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering * Initialize zero-filled source with given values. On success, takes
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering * ownerhship of fd and writer, otherwise does not touch them.
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering */
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart PoetteringRemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering RemoteSource *source;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering log_debug("Creating source for %sfd:%d (%s)",
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering passive_fd ? "passive " : "", fd, name);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(fd >= 0);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source = new0(RemoteSource, 1);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (!source)
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering return NULL;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->fd = fd;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->passive_fd = passive_fd;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->name = name;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->writer = writer;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering return source;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering}
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poetteringstatic int get_line(RemoteSource *source, char **line, size_t *size) {
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering ssize_t n, remain;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering char *c = NULL;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering char *newbuf = NULL;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering size_t newsize = 0;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(source);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(source->state == STATE_LINE);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(source->filled <= source->size);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(source->buf == NULL || source->size > 0);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering assert(source->fd >= 0);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering while (true) {
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (source->buf)
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering c = memchr(source->buf + source->scanned, '\n',
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->filled - source->scanned);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (c != NULL)
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering break;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering source->scanned = source->filled;
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering if (source->scanned >= DATA_SIZE_MAX) {
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
6bedfcbb2970e06a4d3280c8fb62083d252ede73Lennart Poettering return -E2BIG;
436dd70f5331ec541ebdd84e106abaa63203f6f1Hristo Venev }
436dd70f5331ec541ebdd84e106abaa63203f6f1Hristo Venev
if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
if (source->size - source->filled < LINE_CHUNK &&
!GREEDY_REALLOC(source->buf, source->size,
MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
return log_oom();
assert(source->size - source->filled >= LINE_CHUNK ||
source->size == DATA_SIZE_MAX);
// FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
// to accomodate such big fields.
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
log_error("read(%d, ..., %zd): %m", source->fd,
source->size - source->filled);
return -errno;
} else if (n == 0)
return 0;
source->filled += n;
}
*line = source->buf;
*size = c + 1 - source->buf;
/* Check if something remains */
remain = source->buf + source->filled - c - 1;
assert(remain >= 0);
if (remain) {
newsize = MAX(remain, LINE_CHUNK);
newbuf = malloc(newsize);
if (!newbuf)
return log_oom();
memcpy(newbuf, c + 1, remain);
}
source->buf = newbuf;
source->size = newsize;
source->filled = remain;
source->scanned = 0;
return 1;
}
int push_data(RemoteSource *source, const char *data, size_t size) {
assert(source);
assert(source->state != STATE_EOF);
if (!GREEDY_REALLOC(source->buf, source->size,
source->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, source->size, source->filled, strerror(ENOMEM));
return -ENOMEM;
}
memcpy(source->buf + source->filled, data, size);
source->filled += size;
return 0;
}
static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
int n;
char *newbuf = NULL;
size_t newsize = 0, remain;
assert(source);
assert(source->state == STATE_DATA_START ||
source->state == STATE_DATA ||
source->state == STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
assert(source->filled <= source->size);
assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
assert(data);
while(source->filled < size) {
if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
if (!GREEDY_REALLOC(source->buf, source->size, size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
log_error("read(%d, ..., %zd): %m", source->fd,
source->size - source->filled);
return -errno;
} else if (n == 0)
return 0;
source->filled += n;
}
*data = source->buf;
/* Check if something remains */
assert(size <= source->filled);
remain = source->filled - size;
if (remain) {
newsize = MAX(remain, LINE_CHUNK);
newbuf = malloc(newsize);
if (!newbuf)
return log_oom();
memcpy(newbuf, source->buf + size, remain);
}
source->buf = newbuf;
source->size = newsize;
source->filled = remain;
source->scanned = 0;
return 1;
}
static int get_data_size(RemoteSource *source) {
int r;
_cleanup_free_ void *data = NULL;
assert(source);
assert(source->state == STATE_DATA_START);
assert(source->data_size == 0);
r = fill_fixed_size(source, &data, sizeof(uint64_t));
if (r <= 0)
return r;
source->data_size = le64toh( *(uint64_t *) data );
if (source->data_size > DATA_SIZE_MAX) {
log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
source->data_size, DATA_SIZE_MAX);
return -EINVAL;
}
if (source->data_size == 0)
log_warning("Binary field with zero length");
return 1;
}
static int get_data_data(RemoteSource *source, void **data) {
int r;
assert(source);
assert(data);
assert(source->state == STATE_DATA);
r = fill_fixed_size(source, data, source->data_size);
if (r <= 0)
return r;
return 1;
}
static int get_data_newline(RemoteSource *source) {
int r;
_cleanup_free_ char *data = NULL;
assert(source);
assert(source->state == STATE_DATA_FINISH);
r = fill_fixed_size(source, (void**) &data, 1);
if (r <= 0)
return r;
assert(data);
if (*data != '\n') {
log_error("expected newline, got '%c'", *data);
return -EINVAL;
}
return 1;
}
static int process_dunder(RemoteSource *source, char *line, size_t n) {
const char *timestamp;
int r;
assert(line);
assert(n > 0);
assert(line[n-1] == '\n');
/* XXX: is it worth to support timestamps in extended format?
* We don't produce them, but who knows... */
timestamp = startswith(line, "__CURSOR=");
if (timestamp)
/* ignore __CURSOR */
return 1;
timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
else
source->ts.realtime = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
else
source->ts.monotonic = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__");
if (timestamp) {
log_notice("Unknown dunder line %s", line);
return 1;
}
/* no dunder */
return 0;
}
int process_data(RemoteSource *source) {
int r;
switch(source->state) {
case STATE_LINE: {
char *line, *sep;
size_t n;
assert(source->data_size == 0);
r = get_line(source, &line, &n);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return r;
}
assert(n > 0);
assert(line[n-1] == '\n');
if (n == 1) {
log_debug("Received empty line, event is ready");
free(line);
return 1;
}
r = process_dunder(source, line, n);
if (r != 0) {
free(line);
return r < 0 ? r : 0;
}
/* MESSAGE=xxx\n
or
COREDUMP\n
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
if (sep)
/* chomp newline */
n--;
else
/* replace \n with = */
line[n-1] = '=';
log_debug("Received: %.*s", (int) n, line);
r = iovw_put(&source->iovw, line, n);
if (r < 0) {
log_error("Failed to put line in iovect");
free(line);
return r;
}
if (!sep)
source->state = STATE_DATA_START;
return 0; /* continue */
}
case STATE_DATA_START:
assert(source->data_size == 0);
r = get_data_size(source);
log_debug("get_data_size() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
source->state = source->data_size > 0 ?
STATE_DATA : STATE_DATA_FINISH;
return 0; /* continue */
case STATE_DATA: {
void *data;
assert(source->data_size > 0);
r = get_data_data(source, &data);
log_debug("get_data_data() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
assert(data);
r = iovw_put(&source->iovw, data, source->data_size);
if (r < 0) {
log_error("failed to put binary buffer in iovect");
return r;
}
source->state = STATE_DATA_FINISH;
return 0; /* continue */
}
case STATE_DATA_FINISH:
r = get_data_newline(source);
log_debug("get_data_newline() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
source->state = STATE_EOF;
return 0;
}
source->data_size = 0;
source->state = STATE_LINE;
return 0; /* continue */
default:
assert_not_reached("wtf?");
}
}
int process_source(RemoteSource *source, bool compress, bool seal) {
int r;
assert(source);
assert(source->writer);
r = process_data(source);
if (r <= 0)
return r;
/* We have a full event */
log_debug("Received a full event from source@%p fd:%d (%s)",
source, source->fd, source->name);
if (!source->iovw.count) {
log_warning("Entry with no payload, skipping");
goto freeing;
}
assert(source->iovw.iovec);
assert(source->iovw.count);
r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
if (r < 0)
log_error("Failed to write entry of %zu bytes: %s",
iovw_size(&source->iovw), strerror(-r));
else
r = 1;
freeing:
iovw_free_contents(&source->iovw);
return r;
}