journal-remote-parse.c revision dd87b1840c966fd25b81a7aa1071e8488c624db8
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt This file is part of systemd.
7bd8e95d44977833d0de3fc4e893eb3bc84351d6Patrik Flykt Copyright 2014 Zbigniew Jędrzejewski-Szmek
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt systemd is free software; you can redistribute it and/or modify it
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt under the terms of the GNU Lesser General Public License as published by
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt the Free Software Foundation; either version 2.1 of the License, or
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt (at your option) any later version.
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt systemd is distributed in the hope that it will be useful, but
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt WITHOUT ANY WARRANTY; without even the implied warranty of
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt Lesser General Public License for more details.
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt You should have received a copy of the GNU Lesser General Public License
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt along with systemd; If not, see <http://www.gnu.org/licenses/>.
07630cea1f3a845c09309f197ac7c4f11edd3b62Lennart Poettering log_debug("Closing fd:%d (%s)", source->fd, source->name);
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt log_debug("Writer ref count %u", source->writer->n_ref);
76253e73f9c9c24fec755e485516f3b55d0707b4Dan Williams * Initialize zero-filled source with given values. On success, takes
f12abb48fc510b8b349c05e35ba048134debaf25Patrik Flykt * ownerhship of fd and writer, otherwise does not touch them.
346e13a25dc6f76d3bc9d8decd40dc4782b02d2aPatrik FlyktRemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flyktstatic char* realloc_buffer(RemoteSource *source, size_t size) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt b = GREEDY_REALLOC(source->buf, source->size, size);
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flyktstatic int get_line(RemoteSource *source, char **line, size_t *size) {
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt assert(source->buf == NULL || source->size > 0);
631bbe71298ec892f77f44f94feb612646fe6853Patrik Flykt while (true) {
631bbe71298ec892f77f44f94feb612646fe6853Patrik Flykt size_t start = MAX(source->scanned, source->offset);
3f0c075f8ef3344da5a6bda524540201f9204e61Patrik Flykt log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
c3e2adeaba8e043caed0ef139eeaea016bd152d0Patrik Flykt /* we have to wait for some data to come to us */
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt if (source->size - source->filled < LINE_CHUNK &&
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt assert(source->size - source->filled >= LINE_CHUNK ||
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt n = read(source->fd, source->buf + source->filled,
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt if (n < 0) {
d7c9c21f18704580f66a1ce73fb6b506fdf40732Patrik Flykt log_error("read(%d, ..., %zd): %m", source->fd,
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt } else if (n == 0)
0ae0e5cd96813bacad43a39920a043d8d20a67dbLennart Poettering *size = c + 1 - source->buf - source->offset;
d7c9c21f18704580f66a1ce73fb6b506fdf40732Patrik Flyktint push_data(RemoteSource *source, const char *data, size_t size) {
76253e73f9c9c24fec755e485516f3b55d0707b4Dan Williams if (!realloc_buffer(source, source->filled + size)) {
76253e73f9c9c24fec755e485516f3b55d0707b4Dan Williams log_error("Failed to store received data of size %zu "
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt "(in addition to existing %zu bytes with %zu filled): %s",
76253e73f9c9c24fec755e485516f3b55d0707b4Dan Williams size, source->size, source->filled, strerror(ENOMEM));
76253e73f9c9c24fec755e485516f3b55d0707b4Dan Williams memcpy(source->buf + source->filled, data, size);
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flyktstatic int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
0ae0e5cd96813bacad43a39920a043d8d20a67dbLennart Poettering assert(source->state == STATE_DATA_START ||
cc22955cfefb4bd6e7a135f1ec95fb5a07ba9ce3Thomas Haller assert(source->buf != NULL || source->size == 0);
0ae0e5cd96813bacad43a39920a043d8d20a67dbLennart Poettering assert(source->buf == NULL || source->size > 0);
66eac1201a9c1596f5901f8dbbf24bda7e350878Dan Williams while (source->filled - source->offset < size) {
d7c9c21f18704580f66a1ce73fb6b506fdf40732Patrik Flykt /* we have to wait for some data to come to us */
fe4b2156256c5bdf52341576571ce9f095d9f085Tom Gundersen if (!realloc_buffer(source, source->offset + size))
fe4b2156256c5bdf52341576571ce9f095d9f085Tom Gundersen n = read(source->fd, source->buf + source->filled,
fe4b2156256c5bdf52341576571ce9f095d9f085Tom Gundersen log_error("read(%d, ..., %zd): %m", source->fd,
fe4b2156256c5bdf52341576571ce9f095d9f085Tom Gundersen } else if (n == 0)
ebe207d4acf38165adbc45298662982eecdb9e9fTom Gundersenstatic int get_data_size(RemoteSource *source) {
d7c9c21f18704580f66a1ce73fb6b506fdf40732Patrik Flykt r = fill_fixed_size(source, &data, sizeof(uint64_t));
bbfa43ca37df0718287c25a8e39ee7477ebf33f6Patrik Flykt source->data_size = le64toh( *(uint64_t *) data );
bbfa43ca37df0718287c25a8e39ee7477ebf33f6Patrik Flykt log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
0ae0e5cd96813bacad43a39920a043d8d20a67dbLennart Poetteringstatic int get_data_data(RemoteSource *source, void **data) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt r = fill_fixed_size(source, data, source->data_size);
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flyktstatic int get_data_newline(RemoteSource *source) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt r = fill_fixed_size(source, (void**) &data, 1);
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt log_error("expected newline, got '%c'", *data);
ea3b3a75abb3f8b853f7da454b9b8e258a120eeaPatrik Flyktstatic int process_dunder(RemoteSource *source, char *line, size_t n) {
3f0c075f8ef3344da5a6bda524540201f9204e61Patrik Flykt /* XXX: is it worth to support timestamps in extended format?
3f0c075f8ef3344da5a6bda524540201f9204e61Patrik Flykt * We don't produce them, but who knows... */
f89087272b5561c9a3fc9d6a4e2a09f75f688fa7Thomas Haller /* ignore __CURSOR */
f89087272b5561c9a3fc9d6a4e2a09f75f688fa7Thomas Haller timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
f89087272b5561c9a3fc9d6a4e2a09f75f688fa7Thomas Haller long long unsigned x;
f89087272b5561c9a3fc9d6a4e2a09f75f688fa7Thomas Haller log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
4e3e6679e8f73b83d38e4b20d8b025e12991d1cbPatrik Flykt return r < 0 ? r : 1;
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt long long unsigned x;
f12abb48fc510b8b349c05e35ba048134debaf25Patrik Flykt log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
f12abb48fc510b8b349c05e35ba048134debaf25Patrik Flykt return r < 0 ? r : 1;
139b011ab81ccea1d51f09e0261a1c390115c6ffPatrik Flykt /* no dunder */
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt if (r == 0) {
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt if (n == 1) {
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt log_debug("Received empty line, event is ready");
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt return r < 0 ? r : 0;
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt /* MESSAGE=xxx\n
bbfa43ca37df0718287c25a8e39ee7477ebf33f6Patrik Flykt LLLLLLLL0011223344...\n
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt /* chomp newline */
ed6ee21953dac9c78383da00bc4514ece6b75ab5Patrik Flykt /* replace \n with = */
926695f1b5f9395eeb416cc2f478a9cf75fdbeb4Thomas Hindoe Paaboel Andersen log_debug("Received: %.*s", (int) n, line);
7246333cb803b03440d3bd0bdaa233564d09b5aePatrik Flykt if (r < 0) {
3dc34fcc97b41f8b7b019027225b121dfbb9871dPatrik Flykt return 0; /* continue */
7246333cb803b03440d3bd0bdaa233564d09b5aePatrik Flykt // log_debug("get_data_size() -> %d", r);
7246333cb803b03440d3bd0bdaa233564d09b5aePatrik Flykt if (r == 0) {
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt return 0; /* continue */
3dc34fcc97b41f8b7b019027225b121dfbb9871dPatrik Flykt // log_debug("get_data_data() -> %d", r);
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt if (r == 0) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt r = iovw_put(&source->iovw, data, source->data_size);
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt if (r < 0) {
da6fe470e17fa02f3adedc779585caf8669252bdPatrik Flykt log_error("failed to put binary buffer in iovect");
7246333cb803b03440d3bd0bdaa233564d09b5aePatrik Flykt return 0; /* continue */
346e13a25dc6f76d3bc9d8decd40dc4782b02d2aPatrik Flykt // log_debug("get_data_newline() -> %d", r);
346e13a25dc6f76d3bc9d8decd40dc4782b02d2aPatrik Flykt if (r == 0) {
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flykt return 0; /* continue */
a9aff3615b430f86bd0a824214d95f634efaf894Patrik Flyktint process_source(RemoteSource *source, bool compress, bool seal) {
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt /* We have a full event */
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt log_debug("Received a full event from source@%p fd:%d (%s)",
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt log_warning("Entry with no payload, skipping");
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
a34b57c0d43b8bf819ccd4f62c314b41b625454dPatrik Flykt log_error("Failed to write entry of %zu bytes: %s",
3dc34fcc97b41f8b7b019027225b121dfbb9871dPatrik Flykt /* possibly reset buffer position */
d1b0afe3653b4316a6361d204169620726d468a0Patrik Flykt source->offset = source->scanned = source->filled = 0;
d1b0afe3653b4316a6361d204169620726d468a0Patrik Flykt else if (source->offset > source->size - source->filled &&
3dc34fcc97b41f8b7b019027225b121dfbb9871dPatrik Flykt memcpy(source->buf, source->buf + source->offset, remain);
d1b0afe3653b4316a6361d204169620726d468a0Patrik Flykt while (target > 16 * LINE_CHUNK && remain < target / 2)
3dc34fcc97b41f8b7b019027225b121dfbb9871dPatrik Flykt log_warning("Failed to reallocate buffer to (smaller) size %zu",
d1b0afe3653b4316a6361d204169620726d468a0Patrik Flykt log_debug("Reallocated buffer from %zu to %zu bytes",