journal-upload-journal.c revision 722b6795655149a68277b3cffeba711e1d440e5a
#include <stdbool.h>
#include "util.h"
#include "log.h"
#include "utf8.h"
#include "journal-upload.h"
/**
* Write up to size bytes to buf. Return negative on error, and number of
* bytes written otherwise. The last case is a kind of an error too.
*/
int r;
while (true) {
switch(u->entry_state) {
case ENTRY_CURSOR: {
free(u->current_cursor);
u->current_cursor = NULL;
if (r < 0) {
return r;
}
"__CURSOR=%s\n", u->current_cursor);
/* not enough space */
return pos;
u->entry_state ++;
/* exactly one character short, but we don't need it */
return size;
}
pos += r;
} /* fall through */
case ENTRY_REALTIME: {
if (r < 0) {
return r;
}
/* not enough space */
return pos;
u->entry_state ++;
/* exactly one character short, but we don't need it */
return size;
}
pos += r;
} /* fall through */
case ENTRY_MONOTONIC: {
if (r < 0) {
return r;
}
/* not enough space */
return pos;
u->entry_state ++;
/* exactly one character short, but we don't need it */
return size;
}
pos += r;
} /* fall through */
case ENTRY_BOOT_ID: {
char sid[33];
if (r < 0) {
return r;
}
/* not enough space */
return pos;
u->entry_state ++;
/* exactly one character short, but we don't need it */
return size;
}
pos += r;
} /* fall through */
case ENTRY_NEW_FIELD: {
u->field_pos = 0;
r = sd_journal_enumerate_data(u->journal,
&u->field_data,
&u->field_length);
if (r < 0) {
log_error("Failed to move to next field in entry: %s",
strerror(-r));
return r;
} else if (r == 0) {
u->entry_state = ENTRY_OUTRO;
continue;
}
if (!utf8_is_printable_newline(u->field_data,
u->field_length, false)) {
continue;
}
u->entry_state ++;
} /* fall through */
case ENTRY_TEXT_FIELD:
case ENTRY_BINARY_FIELD: {
bool done;
if (done)
else
(char*) u->field_data + u->field_pos,
tocopy);
if (done) {
u->entry_state = ENTRY_NEW_FIELD;
continue;
} else {
return size;
}
}
case ENTRY_BINARY_FIELD_START: {
const char *c;
if (!c || c == u->field_data) {
log_error("Invalid field.");
return -EINVAL;
}
len = c - (const char*)u->field_data;
/* need space for label + '\n' */
return pos;
u->entry_state ++;
} /* fall through */
case ENTRY_BINARY_FIELD_SIZE: {
/* need space for uint64_t */
return pos;
pos += 8;
u->entry_state ++;
continue;
}
case ENTRY_OUTRO:
/* need space for '\n' */
return pos;
u->entry_state ++;
u->entries_sent ++;
return pos;
default:
assert_not_reached("WTF?");
}
}
assert_not_reached("WTF?");
}
int r;
sd_journal *j;
ssize_t w;
assert(u);
j = u->journal;
if (u->entry_state == ENTRY_DONE) {
r = sd_journal_next(j);
if (r < 0) {
log_error("Failed to move to next entry in journal: %s",
strerror(-r));
return CURL_READFUNC_ABORT;
} else if (r == 0) {
if (u->input_event)
log_debug("No more entries, waiting for journal.");
else {
log_info("No more entries, closing journal.");
}
u->uploading = false;
break;
}
u->entry_state = ENTRY_CURSOR;
}
if (w < 0)
return CURL_READFUNC_ABORT;
filled += w;
if (filled == 0) {
log_error("Buffer space is too small to write entry.");
return CURL_READFUNC_ABORT;
} else if (u->entry_state != ENTRY_DONE)
/* This means that all available space was used up */
break;
log_debug("Entry %zu (%s) has been uploaded.",
u->entries_sent, u->current_cursor);
}
return filled;
}
void close_journal_input(Uploader *u) {
assert(u);
if (u->journal) {
log_debug("Closing journal input.");
sd_journal_close(u->journal);
}
u->timeout = 0;
}
int r;
if (r < 0) {
return r;
} else if (r < skip)
return 0;
/* have data */
u->entry_state = ENTRY_CURSOR;
return start_upload(u, journal_input_callback, u);
}
int check_journal_input(Uploader *u) {
if (u->input_event) {
int r;
r = sd_journal_process(u->journal);
if (r < 0) {
return r;
}
if (r == SD_JOURNAL_NOP)
return 0;
}
return process_journal_input(u, 1);
}
int fd,
void *userp) {
assert(u);
if (u->uploading) {
log_warning("dispatch_journal_input called when uploading, ignoring.");
return 0;
}
log_debug("Detected journal input, checking for new data.");
return check_journal_input(u);
}
int open_journal_for_upload(Uploader *u,
sd_journal *j,
const char *cursor,
bool after_cursor,
bool follow) {
u->journal = j;
if (follow) {
fd = sd_journal_get_fd(j);
if (fd < 0) {
return fd;
}
events = sd_journal_get_events(j);
r = sd_journal_reliable_fd(j);
assert(r >= 0);
if (r > 0)
u->timeout = -1;
else
if (r < 0) {
return r;
}
log_debug("Listening for journal events on fd:%d, timeout %d",
} else
log_debug("Not listening for journal events.");
if (cursor) {
r = sd_journal_seek_cursor(j, cursor);
if (r < 0) {
log_error("Failed to seek to cursor %s: %s",
return r;
}
}
}