bcb4e51a409d94ae670de96afb8483a4f7855294Stephan Bosch/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen#define INDEXER_WORKER_NAME "indexer-worker-master"
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenworker_connection_create(const char *socket_path,
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen conn->request_queue = aqueue_init(&conn->request_contexts.arr);
2a4723165754cf9a93d7d91a9fb7949176ddd38bTimo Sirainenstatic void worker_connection_unref(struct worker_connection *conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic void worker_connection_disconnect(struct worker_connection *conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen unsigned int i, count = aqueue_count(conn->request_queue);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("close(%s) failed: %m", conn->socket_path);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen /* cancel any pending requests */
2ee34f0b0b15523389fe3774788acbd412176253Timo Sirainen "discarding %u requests for %s",
2a4723165754cf9a93d7d91a9fb7949176ddd38bTimo Sirainen /* conn->callback() can try to destroy us */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen for (i = 0; i < count; i++) {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenvoid worker_connection_destroy(struct worker_connection **_conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenworker_connection_input_line(struct worker_connection *conn, const char *line)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("Input from worker without pending requests: %s", line);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("Invalid input from worker: %s", line);
ff9eb4a57ff1a151182cdfbfc19f5a4bed54ecafTimo Sirainen /* the request is finished */
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenstatic void worker_connection_input(struct worker_connection *conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if ((line = i_stream_next_line(conn->input)) == NULL)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if (!version_string_verify(line, INDEXER_WORKER_NAME,
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("Indexer worker not compatible with this master "
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen "(mixed old and new binaries?)");
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if ((line = i_stream_next_line(conn->input)) == NULL)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if (str_to_uint(line, &conn->process_limit) < 0 ||
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("Indexer worker sent invalid handshake: %s",
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen while ((line = i_stream_next_line(conn->input)) != NULL) {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen if (worker_connection_input_line(conn, line) < 0) {
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenint worker_connection_connect(struct worker_connection *conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen conn->fd = net_connect_unix(conn->socket_path);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_error("connect(%s) failed: %m", conn->socket_path);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->input = i_stream_create_fd(conn->fd, (size_t)-1);
e93184a9055c2530366dfe617e07199603c399ddMartti Rannanjärvi conn->output = o_stream_create_fd(conn->fd, (size_t)-1);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_set_no_error_handling(conn->output, TRUE);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenbool worker_connection_is_connected(struct worker_connection *conn)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenbool worker_connection_get_process_limit(struct worker_connection *conn,
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen unsigned int *limit_r)
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenvoid worker_connection_request(struct worker_connection *conn,
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainen i_assert(worker_connection_is_connected(conn));
294f579cd3803e2d9997231fdc46523c23774a8fTimo Sirainen i_assert(request->index || request->optimize);
294f579cd3803e2d9997231fdc46523c23774a8fTimo Sirainen conn->request_username = i_strdup(request->username);
d03a871a77f8ec36f48f5fea98d810e51b186fdbTimo Sirainen str_append_tabescaped(str, request->username);
a8dcd4e2332c73087e9b148d34259230a77edb28Timo Sirainen str_append_tabescaped(str, request->session_id);
294f579cd3803e2d9997231fdc46523c23774a8fTimo Sirainen str_printfa(str, "\t%u\t", request->max_recent_msgs);
e2a88d59c0d47d63ce1ad5b1fd95e487124a3fd4Timo Sirainen o_stream_nsend(conn->output, str_data(str), str_len(str));
d9e404180ff26dbbaea68534a5f176765022b76bTimo Sirainenbool worker_connection_is_busy(struct worker_connection *conn)