Lines Matching refs:dir
47 static void director_hosts_purge_removed(struct director *dir);
55 director_user_kill_finish_delayed(struct director *dir, struct user *user,
58 static bool director_is_self_ip_set(struct director *dir)
60 if (net_ip_compare(&dir->self_ip, &net_ip4_any))
63 if (net_ip_compare(&dir->self_ip, &net_ip6_any))
69 static void director_find_self_ip(struct director *dir)
74 hosts = array_get(&dir->dir_hosts, &count);
77 dir->self_ip = hosts[i]->ip;
84 void director_find_self(struct director *dir)
86 if (dir->self_host != NULL)
89 if (!director_is_self_ip_set(dir))
90 director_find_self_ip(dir);
92 dir->self_host = director_host_lookup(dir, &dir->self_ip,
93 dir->self_port);
94 if (dir->self_host == NULL) {
96 net_ip2addr(&dir->self_ip), dir->self_port);
98 dir->self_host->self = TRUE;
101 static unsigned int director_find_self_idx(struct director *dir)
106 i_assert(dir->self_host != NULL);
108 hosts = array_get(&dir->dir_hosts, &count);
110 if (hosts[i] == dir->self_host)
117 director_has_outgoing_connection(struct director *dir,
122 array_foreach(&dir->connections, connp) {
131 director_log_connect(struct director *dir, struct director_host *host,
146 net_ip2addr(&dir->self_ip), str_c(str), reason);
149 int director_connect_host(struct director *dir, struct director_host *host,
155 if (director_has_outgoing_connection(dir, host))
158 director_log_connect(dir, host, reason);
159 port = dir->test_port != 0 ? dir->test_port : host->port;
160 fd = net_connect_ip(&host->ip, port, &dir->self_ip);
170 (void)director_connection_init_out(dir, fd, host);
175 director_get_preferred_right_host(struct director *dir)
180 hosts = array_get(&dir->dir_hosts, &count);
186 self_idx = director_find_self_idx(dir);
196 static void director_quick_reconnect_retry(struct director *dir)
198 director_connect(dir, "Alone in director ring - trying to connect to others");
201 static bool director_wait_for_others(struct director *dir)
207 if (dir->ring_first_alone != 0 &&
208 ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
211 if (dir->ring_first_alone == 0)
212 dir->ring_first_alone = ioloop_time;
214 array_foreach(&dir->dir_hosts, hostp) {
218 timeout_remove(&dir->to_reconnect);
219 dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
220 director_quick_reconnect_retry, dir);
224 void director_connect(struct director *dir, const char *reason)
229 self_idx = director_find_self_idx(dir);
233 hosts = array_get(&dir->dir_hosts, &count);
252 if (director_connect_host(dir, hosts[idx], reason) == 0) {
258 if (count > 1 && director_wait_for_others(dir))
266 if (dir->left != NULL) {
270 director_connection_get_name(dir->left));
271 director_connection_deinit(&dir->left,
274 dir->ring_min_version = DIRECTOR_VERSION_MINOR;
275 if (!dir->ring_handshaked)
276 director_set_ring_handshaked(dir);
277 else if (!dir->ring_synced)
278 director_set_ring_synced(dir);
281 void director_set_ring_handshaked(struct director *dir)
283 i_assert(!dir->ring_handshaked);
285 timeout_remove(&dir->to_handshake_warning);
286 if (dir->ring_handshake_warning_sent) {
289 dir->ring_handshake_warning_sent = FALSE;
293 dir->ring_handshaked = TRUE;
294 director_set_ring_synced(dir);
297 static void director_reconnect_timeout(struct director *dir)
300 director_get_preferred_right_host(dir);
302 cur_host = dir->right == NULL ? NULL :
303 director_connection_get_host(dir->right);
308 (void)director_connect_host(dir, preferred_host,
316 void director_set_ring_synced(struct director *dir)
320 i_assert(!dir->ring_synced);
321 i_assert((dir->left != NULL && dir->right != NULL) ||
322 (dir->left == NULL && dir->right == NULL));
324 timeout_remove(&dir->to_handshake_warning);
325 if (dir->ring_handshake_warning_sent) {
328 (int)(ioloop_time - dir->ring_last_sync_time),
329 mail_hosts_hash(dir->mail_hosts));
330 dir->ring_handshake_warning_sent = FALSE;
333 host = dir->right == NULL ? NULL :
334 director_connection_get_host(dir->right);
336 timeout_remove(&dir->to_reconnect);
337 if (host != director_get_preferred_right_host(dir)) {
339 dir->to_reconnect =
341 director_reconnect_timeout, dir);
344 if (dir->left != NULL)
345 director_connection_set_synced(dir->left, TRUE);
346 if (dir->right != NULL)
347 director_connection_set_synced(dir->right, TRUE);
348 timeout_remove(&dir->to_sync);
349 dir->ring_synced = TRUE;
350 dir->ring_last_sync_time = ioloop_time;
354 director_hosts_purge_removed(dir);
355 mail_hosts_set_synced(dir->mail_hosts);
356 director_set_state_changed(dir);
359 void director_sync_send(struct director *dir, struct director_host *host,
365 if (host == dir->self_host) {
366 dir->last_sync_sent_ring_change_counter = dir->ring_change_counter;
367 dir->last_sync_start_time = ioloop_timeval;
374 director_connection_get_minor_version(dir->right) > 0) {
380 director_connection_send(dir->right, str_c(str));
384 if (dir->left != NULL)
385 director_connection_ping(dir->left);
386 director_connection_ping(dir->right);
390 director_has_any_outgoing_connections(struct director *dir)
394 array_foreach(&dir->connections, connp) {
401 bool director_resend_sync(struct director *dir)
403 if (dir->ring_synced) {
408 if (dir->right == NULL) {
411 if (dir->to_reconnect == NULL &&
412 !director_has_any_outgoing_connections(dir)) {
414 director_connect(dir, "Right side connection lost");
416 } else if (dir->left != NULL) {
418 dir->self_host->last_sync_timestamp = ioloop_time;
419 director_sync_send(dir, dir->self_host, dir->sync_seq,
421 mail_hosts_hash(dir->mail_hosts));
422 if (dir->to_sync != NULL)
423 timeout_reset(dir->to_sync);
429 static void director_sync_timeout(struct director *dir)
431 i_assert(!dir->ring_synced);
433 if (director_resend_sync(dir))
434 i_error("Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq);
437 void director_set_ring_unsynced(struct director *dir)
439 if (dir->ring_synced) {
440 dir->ring_synced = FALSE;
441 dir->ring_last_sync_time = ioloop_time;
444 if (dir->to_sync == NULL) {
445 dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
446 director_sync_timeout, dir);
448 timeout_reset(dir->to_sync);
452 static void director_sync(struct director *dir)
455 dir->sync_seq++;
456 if (dir->right == NULL && dir->left == NULL) {
461 director_set_ring_unsynced(dir);
463 if (dir->sync_frozen) {
464 dir->sync_pending = TRUE;
467 if (dir->right == NULL) {
468 i_assert(!dir->ring_synced ||
469 (dir->left == NULL && dir->right == NULL));
471 dir->sync_seq);
476 dir->sync_seq, dir->right == NULL ? "(nowhere)" :
477 director_connection_get_name(dir->right));
482 if (dir->left != NULL)
483 director_connection_set_synced(dir->left, FALSE);
484 director_connection_set_synced(dir->right, FALSE);
485 director_sync_send(dir, dir->self_host, dir->sync_seq,
487 mail_hosts_hash(dir->mail_hosts));
490 void director_sync_freeze(struct director *dir)
494 i_assert(!dir->sync_frozen);
495 i_assert(!dir->sync_pending);
497 array_foreach(&dir->connections, connp)
499 dir->sync_frozen = TRUE;
502 void director_sync_thaw(struct director *dir)
506 i_assert(dir->sync_frozen);
508 dir->sync_frozen = FALSE;
509 if (dir->sync_pending) {
510 dir->sync_pending = FALSE;
511 director_sync(dir);
513 array_foreach(&dir->connections, connp)
527 added_host->dir->ring_change_counter++;
530 director_update_send(added_host->dir, src, cmd);
533 static void director_hosts_purge_removed(struct director *dir)
538 timeout_remove(&dir->to_remove_dirs);
540 hosts = array_get(&dir->dir_hosts, &count);
545 hosts = array_get(&dir->dir_hosts, &count);
555 struct director *dir = removed_host->dir;
574 if (dir->to_remove_dirs == NULL) {
575 dir->to_remove_dirs =
577 director_hosts_purge_removed, dir);
585 director_update_send_version(dir, src,
589 conns = array_get(&dir->connections, &count);
597 conns = array_get(&dir->connections, &count);
600 if (dir->right == NULL)
601 director_connect(dir, "Reconnecting after director was removed");
602 director_sync(dir);
606 director_send_host(struct director *dir, struct director_host *src,
614 orig_src = dir->self_host;
622 if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) {
626 dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) {
627 if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) {
634 director_remove_host(dir, NULL, NULL, host);
637 if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) {
646 director_update_send(dir, src, str_c(str));
649 void director_resend_hosts(struct director *dir)
653 array_foreach(mail_hosts_get(dir->mail_hosts), hostp)
654 director_send_host(dir, dir->self_host, NULL, *hostp);
657 void director_update_host(struct director *dir, struct director_host *src,
662 director_set_state_changed(dir);
668 mail_hosts_hash(dir->mail_hosts));
670 director_send_host(dir, src, orig_src, host);
674 if (dir->right != NULL || dir->left != NULL)
676 director_sync(dir);
679 void director_remove_host(struct director *dir, struct director_host *src,
687 orig_src = dir->self_host;
691 director_update_send(dir, src, t_strdup_printf(
699 director_sync(dir);
702 void director_flush_host(struct director *dir, struct director_host *src,
709 orig_src = dir->self_host;
713 director_update_send(dir, src, t_strdup_printf(
718 director_sync(dir);
721 void director_update_user(struct director *dir, struct director_host *src,
729 array_foreach(&dir->connections, connp) {
744 void director_update_user_weak(struct director *dir, struct director_host *src,
755 orig_src = dir->self_host;
763 if (src != dir->self_host && dir->left != NULL && dir->right != NULL &&
764 director_connection_get_host(dir->left) ==
765 director_connection_get_host(dir->right)) {
771 if (dir->right == src_conn)
772 director_connection_send(dir->left, cmd);
773 else if (dir->left == src_conn)
774 director_connection_send(dir->right, cmd);
778 director_update_send(dir, src, cmd);
785 struct director *dir = ctx->dir;
823 director_user_kill_finish_delayed(dir, user, result == 1);
828 director_flush_user(struct director *dir, struct user *user)
847 if (*dir->set->director_flush_socket == '\0' ||
850 director_user_kill_finish_delayed(dir, user, FALSE);
857 if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) {
859 dir->set->director_flush_socket, error);
860 director_user_kill_finish_delayed(dir, user, FALSE);
907 static void director_user_move_finished(struct director *dir)
909 i_assert(dir->users_moving_count > 0);
910 dir->users_moving_count--;
912 director_set_state_changed(dir);
917 struct director *dir = user->kill_ctx->dir;
930 director_user_move_finished(dir);
943 director_user_kill_finish_delayed(struct director *dir, struct user *user,
959 timeout_add(dir->set->director_user_kick_delay * 1000,
964 director_finish_user_kill(struct director *dir, struct user *user, bool self)
976 if (dir->right == NULL) {
978 director_flush_user(dir, user);
981 director_connection_send(dir->right, t_strdup_printf(
1021 i_assert(ctx->dir->users_kicking_count > 0);
1022 ctx->dir->users_kicking_count--;
1023 if (ctx->dir->kick_callback != NULL)
1024 ctx->dir->kick_callback(ctx->dir);
1033 director_user_move_finished(ctx->dir);
1039 director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated);
1068 void director_kill_user(struct director *dir, struct director_host *src,
1086 ctx->dir = dir;
1096 dir->users_moving_count++;
1105 dir->users_kicking_count++;
1106 ipc_client_cmd(dir->ipc_proxy, cmd,
1114 director_finish_user_kill(ctx->dir, user,
1119 void director_move_user(struct director *dir, struct director_host *src,
1173 orig_src = dir->self_host;
1176 director_update_send(dir, src, t_strdup_printf(
1182 director_kill_user(dir, src, user, host->tag, old_host, FALSE);
1189 struct director *dir = context;
1197 i_assert(dir->users_kicking_count > 0);
1198 dir->users_kicking_count--;
1199 if (dir->kick_callback != NULL)
1200 dir->kick_callback(dir);
1203 void director_kick_user(struct director *dir, struct director_host *src,
1210 dir->users_kicking_count++;
1211 ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1212 director_kick_user_callback, dir);
1215 orig_src = dir->self_host;
1223 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd));
1226 void director_kick_user_alt(struct director *dir, struct director_host *src,
1236 dir->users_kicking_count++;
1237 ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1238 director_kick_user_callback, dir);
1241 orig_src = dir->self_host;
1251 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd));
1254 void director_kick_user_hash(struct director *dir, struct director_host *src,
1263 dir->users_kicking_count++;
1264 ipc_client_cmd(dir->ipc_proxy, cmd,
1265 director_kick_user_callback, dir);
1268 orig_src = dir->self_host;
1274 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
1278 director_send_user_killed_everywhere(struct director *dir,
1284 orig_src = dir->self_host;
1287 director_update_send(dir, src, t_strdup_printf(
1294 director_user_tag_killed(struct director *dir, struct mail_tag *tag,
1308 director_finish_user_kill(dir, user, TRUE);
1320 director_send_user_killed_everywhere(dir, dir->self_host, NULL,
1324 director_user_killed_everywhere(dir, dir->self_host,
1330 void director_user_killed(struct director *dir, unsigned int username_hash)
1334 array_foreach(mail_hosts_get_tags(dir->mail_hosts), tagp)
1335 director_user_tag_killed(dir, *tagp, username_hash);
1339 director_user_tag_killed_everywhere(struct director *dir,
1364 director_flush_user(dir, user);
1365 director_send_user_killed_everywhere(dir, src, orig_src, username_hash);
1368 void director_user_killed_everywhere(struct director *dir,
1375 array_foreach(mail_hosts_get_tags(dir->mail_hosts), tagp) {
1376 director_user_tag_killed_everywhere(dir, *tagp, src, orig_src,
1381 static void director_state_callback_timeout(struct director *dir)
1383 timeout_remove(&dir->to_callback);
1384 dir->state_change_callback(dir);
1387 void director_set_state_changed(struct director *dir)
1391 if (dir->to_callback == NULL) {
1392 dir->to_callback =
1393 timeout_add(0, director_state_callback_timeout, dir);
1397 void director_update_send(struct director *dir, struct director_host *src,
1400 director_update_send_version(dir, src, 0, cmd);
1403 void director_update_send_version(struct director *dir,
1411 array_foreach(&dir->connections, connp) {
1439 struct director *dir;
1441 dir = i_new(struct director, 1);
1442 dir->set = set;
1443 dir->self_port = listen_port;
1444 dir->self_ip = *listen_ip;
1445 dir->state_change_callback = callback;
1446 dir->kick_callback = kick_callback;
1447 i_array_init(&dir->dir_hosts, 16);
1448 i_array_init(&dir->pending_requests, 16);
1449 i_array_init(&dir->connections, 8);
1450 dir->mail_hosts = mail_hosts_init(set->director_user_expire,
1453 dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
1454 dir->ring_min_version = DIRECTOR_VERSION_MINOR;
1455 return dir;
1460 struct director *dir = *_dir;
1466 while (array_count(&dir->connections) > 0) {
1467 connp = array_idx(&dir->connections, 0);
1472 mail_hosts_deinit(&dir->mail_hosts);
1473 mail_hosts_deinit(&dir->orig_config_hosts);
1475 ipc_client_deinit(&dir->ipc_proxy);
1476 timeout_remove(&dir->to_reconnect);
1477 timeout_remove(&dir->to_handshake_warning);
1478 timeout_remove(&dir->to_request);
1479 timeout_remove(&dir->to_sync);
1480 timeout_remove(&dir->to_remove_dirs);
1481 timeout_remove(&dir->to_callback);
1482 while (array_count(&dir->dir_hosts) > 0) {
1483 hostp = array_idx(&dir->dir_hosts, 0);
1487 array_free(&dir->pending_requests);
1488 array_free(&dir->dir_hosts);
1489 array_free(&dir->connections);
1490 i_free(dir);
1508 struct director *dir;
1515 director_iterate_users_init(struct director *dir, bool iter_until_current_tail)
1518 iter->dir = dir;
1531 tags = mail_hosts_get_tags(iter->dir->mail_hosts);
1558 director_get_username_hash(struct director *dir, const char *username,
1563 if (mail_user_hash(username, dir->set->director_username_hash, hash_r,
1567 dir->set->director_username_hash, error);