-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathservice_cluster_apply_state_updates.cpp
1091 lines (901 loc) · 55 KB
/
service_cluster_apply_state_updates.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "service_common.h"
//====================================================
// We need to update multiple different keys specific to a single partition (topology, leadership, ISR)
// and even when we use transactions to update them atomically . This is because monitoring the whole TANK/clusters/<name>/
// consul keyspace for updates is very expensive. Consul cannot return deltas/changes, so if we were to do that we 'd get back
// *all* keys with their values+metadata when *any* key is modified. To that end, we monitor different namespaces
// in TANK/clusters/<name> so that when a key in each of the monitored keyspace is updated, only that namespace KVs are returned.
// This does mean though that updates may come out of order, because we can't guarantee the order at which we will receive them(depends
// on which consul requests were scheduled first, in what order consul may choose to respond to long-polling requests, etc).
// In addition, consul state updates may a resuilt of e.g curl use where someone manipulates the TANK namespace.
//
// Because of that, we need to be very careful about how we issue updates and how we react to them.
// We will expect that updates are issued in the "correct order" even if sometimes that won't be the case, but at least
// we will assume this to be the case _most of the time_.
//
// The logical order of updates to a partition is
// 1. Nodes
// 2. RF change
// 3. Topology (i.e list of replicas)
// 4. Leadership (i.e leader of the replica)
// 5. ISR (list of in-sync replicas)
// We can batch multiple updates to a partition namespaces using a transaction. However, because of the above semantic
// we can't guarantee that we will also receive them all together, or in order.
// We can defer processing updates for as long as we are getting responses from consul (see schedule_cluster_updates_apply() impl.)
// but that is not sufficient, so we need to _forego updates of multiple partition namespaces_ and instead only update one such partition/namespace
// at a time(see gen_partition_nodes_updates) and then react to the update(see apply_cluster_state_updates) and generate another update etc.
//
// We generate updates almost always as a reaction to a received update.
// Cluster Leaders are responsible for updating almost all keyspaces; the only exception is that partition leaders
// are responsible for the ISRs of the partitions they manage.
//
// SPECIFICS:
// - ISRs can *only* be updated by the partition leader, i.e only the partition leader may persist_isr(). This is especially important for repair(see `REPAIR`)
// - partitions replicas and partitions leaders can *only* be updated by the cluster leader(for repairs, the partition leader should apply them locally and then update)
// - Minimal filtering for consul state updates when reconciling them with memory state(see `FILTER`)
// - REPAIRs when reconciling consul state with memory state only possible for ISRs (see `REPAIR`)
// - All other repairs _after_ consul state was reconciled with memory state. See `DEFERRED REPAIRS`
void Service::schedule_cluster_updates_apply(const char *src) {
if (next_cluster_state_apply != 1) {
// not forced; wait for another 100 ms before apply_cluster_state_updates()
next_cluster_state_apply = now_ms + 100;
}
}
// apply_cluster_state_updates() ASAP
void Service::force_cluster_updates_apply() {
next_cluster_state_apply = 1;
}
void Service::conclude_bootstrap_updates() {
static constexpr bool trace{false};
if (0 == (consul_state.flags & unsigned(ConsulState::Flags::BootstrapStateUpdatesProcessed))) {
if (trace) {
SLog(ansifmt::bold, "** Processed bootstrap updates, will enable_listener() **", ansifmt::reset, "\n");
}
consul_state.flags |= unsigned(ConsulState::Flags::BootstrapStateUpdatesProcessed);
enable_listener();
// we need to acquire the node ID again, this time we will include the endpoint
// so that other nodes will know we are ready
//
// XXX: we _cannot_ attempt to become cluster leaders before registering this node with a valid endpoint
// because other nodes will ignore the consul leadership update because this node won't be available() yet
// i.e if the cluster leadership update for this node arrives before the request for acquiring this node (id, ep)
// then this node ep will be unset so the other nodes will ignore it
schedule_consul_req(consul_state.get_req(consul_request::Type::AcquireNodeID), true);
}
}
// By consolidating all logic here as opposed to implementing bits and pieces everywhere we have a better chance of
// doing this right. It also makes it possible to unit-test behavior, whereas that wasn't possible earlier
//
// INVARIANTS
// - ISR set can only contain nodes from the partition's replicas set that are available()
// - Cannot have any nodes in the ISR that are not also present in the replicas set
// - A partition's leader must exist in the replicas set
// - If a partition leader is set, it must also exist in the ISR
// - A partition's leader must exist in the ISR
// - Cluster leader must be available()
// - A partition leader can only be missing or be NA if the partition is no longer active
void Service::apply_cluster_state_updates() {
enum {
trace = false,
};
TANK_EXPECT(cluster_state.local_node.ref);
TANK_EXPECT(cluster_state.local_node.id);
TANK_EXPECT(consul_state.reg_completed());
cluster_partitions_dirty.clear();
if (cluster_state.updates.nodes.empty() and
cluster_state.updates.pm.empty() and
cluster_state.updates.tm.empty() and
not cluster_state.updates.cluster_leader.defined) {
conclude_bootstrap_updates();
return;
}
const auto before = Timings::Microseconds::Tick();
auto & dirty_nodes = reusable.dirty_nodes;
auto & dirty_topics = reusable.dirty_topics;
auto & dirty_partitions = reusable.dirty_partitions;
auto & v = reusable.part_bool_hashmap;
auto & pv = reusable.pv;
auto & nodes_replicas_updates = reusable.nodes_replicas_updates;
auto & stream_start = reusable.stream_start;
auto & stream_stop = reusable.stream_stop;
[[maybe_unused]] const auto self = cluster_state.local_node.ref;
auto leader_self = cluster_state.leader_self();
bool promoted_to_cluster_leader{false};
bool rebuild_all_available_nodes{false};
bool track_all_partitions_dirty{false};
if (trace) {
SLog(ansifmt::bold, ansifmt::color_brown, "::CLUSTER STATE UPDATE::", ansifmt::reset,
ansifmt::color_brown, " nodes = ", cluster_state.updates.nodes.size(), ", topics = ", cluster_state.updates.tm.size(),
", partitions = ", cluster_state.updates.pm.size(),
", cluster_leader {defined: ", cluster_state.updates.cluster_leader.defined, ", value:", cluster_state.updates.cluster_leader.nid, "}", ansifmt::reset, "\n");
}
dirty_nodes.clear();
dirty_topics.clear();
dirty_partitions.clear();
v.clear();
pv.clear();
nodes_replicas_updates.clear();
stream_start.clear();
stream_stop.clear();
#pragma mark NODES
for (auto &it : cluster_state.updates.nodes) {
auto node = it.first;
auto [ep, avail] = it.second;
const auto was_avail = node->available();
if (node == cluster_state.local_node.ref) {
// This is us, so explicitly do this
if ((!ep || !avail) and
(consul_state.flags & unsigned(ConsulState::Flags::BootstrapStateUpdatesProcessed))) {
// This is odd; someone deleted this from /nodes?
if (trace) {
SLog(ansifmt::bgcolor_magenta, "This node is not available according to consul. Updating State", ansifmt::reset, "\n");
}
schedule_consul_req(consul_state.get_req(consul_request::Type::AcquireNodeID), true);
}
avail = true;
ep = tank_listen_ep;
}
const auto becoming_avail = ep && avail;
node->available_ = avail;
node->ep = ep;
if (node == cluster_state.local_node.ref) {
TANK_EXPECT(node->available());
}
if (becoming_avail != was_avail) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "Node ", node->id, "@", node->ep, " availability(", was_avail, ") => ", becoming_avail, ansifmt::reset, "\n");
}
rebuild_all_available_nodes = true;
dirty_nodes.insert(node);
invalidate_replicated_partitions_from_peer_cache(node);
if (not becoming_avail) {
// so that we will check for a new leader if possible
for (auto p : node->replica_for) {
dirty_partitions.insert(p);
}
// INVARIANT: no longer available nodes cannot be in ISRs
for (auto it = node->isr.list.next; it != &node->isr.list;) {
auto isr_e = switch_list_entry(isr_entry, node_ll, it);
auto part = isr_e->partition();
auto next = it->next;
if (part->cluster.leader.node == self) {
// REPAIR: only the partition leader can repair its ISR
TANK_EXPECT(isr_e->node() == node);
if (trace) {
SLog(ansifmt::bgcolor_magenta, "Removed node(N/A) from ISR of ", part->owner->name(), "/", part->idx, ansifmt::reset, "\n");
}
isr_dispose(isr_e);
persist_isr(part, __LINE__);
} else if (trace) {
SLog(ansifmt::bgcolor_magenta, "Node is no longer available, but will not remove from ISR -- only partition leader can remove it", ansifmt::reset, "\n");
}
it = next;
}
} else if (leader_self) {
// if we are the cluster leader, and a new/unknown node becomes available, we may want to use it to satisfy RF of partitions
// so we will, for now, make them all dirty
// TODO: this is not optimal but it will need to do
track_all_partitions_dirty = true;
}
}
}
if (rebuild_all_available_nodes) {
// XXX: maybe in-place updates make more sense but how often are we going to rebuild?
cluster_state.all_available_nodes.clear();
for (auto n : cluster_state.all_nodes) {
if (n->available()) {
cluster_state.all_available_nodes.emplace_back(n);
}
}
}
#pragma mark Cluster Leader
if (std::exchange(cluster_state.updates.cluster_leader.defined, false)) {
auto n = cluster_state.updates.cluster_leader.nid;
// INVARIANT: a cluster node must be a valid available() node
if (n) {
auto node = cluster_state.find_node(n);
if (!node) {
// FILTER:
if (trace) {
SLog("CLUSTER_LEADER: Cluster Leader cannot change to ", n, " because node is not defined\n");
}
n = 0;
} else if (!node->available()) {
// FILTER:
if (trace) {
SLog("CLUSTER_LEADER: Cluster Leader cannot change to ", n, " because node is not available\n");
}
n = 0;
}
}
if (auto p = cluster_state.leader_id; p != n) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "CLUSTER_LEADER: Cluster Leader update from ", p, " => ", n,
"( this node:", cluster_state.local_node.id, ")", ansifmt::reset, "\n");
}
cluster_state.leader_id = n;
if (n == cluster_state.local_node.id) {
promoted_to_cluster_leader = true;
leader_self = true;
// when we become the cluster leader, we need to reconsider everything
track_all_partitions_dirty = true;
if (trace) {
SLog(ansifmt::bgcolor_magenta, "CLUSTER_LEADER: This node PROMOTED to cluster leader", ansifmt::reset, "\n");
}
} else if (p == cluster_state.local_node.id) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "CLUSTER_LEADER: This node DEMOTED from cluster leader", ansifmt::reset, "\n");
}
}
}
if (!n) {
// no cluster leader? take over if we can
if (consul_state.bootstrap_state_updates_processed()) {
if (trace) {
SLog("Will try to assume Cluster Leadership ASAP\n");
}
try_become_cluster_leader(__LINE__);
} else if (trace) {
SLog("Cannot try_become_cluster_leader() because bootstrap updates not processed\n");
}
}
}
if (track_all_partitions_dirty) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "Will track all partitions as dirty", ansifmt::reset, "\n");
}
for (auto &it : topics) {
auto t = it.second.get();
auto l = t->partitions_;
if (!l) {
continue;
}
for (auto &it : *l) {
dirty_partitions.insert(it.get());
}
}
}
#pragma mark TOPICS
for (auto &it : cluster_state.updates.tm) {
auto t = it.first;
auto state = it.second.get();
#if 0
if (trace && (state->state.defined || state->total_enabled.defined || state->rf.defined)) {
SLog("Topic ", t->name(), " state{defined:", state->state.defined, ", value:", state->state.value,
"} total_enabled{defined:", state->total_enabled.defined, ", value:", state->total_enabled.value,
"} rf{defined:", state->rf.defined, ", value:", state->rf.value, "}\n");
}
#endif
if (state->state.defined) {
if (t->enabled != state->state.value) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "TOPICS:Switched state of ", t->name(), " from ", t->enabled, " to ", state->state.value, ansifmt::reset, "\n");
}
t->enabled = state->state.value;
dirty_topics.insert(t);
if (auto l = t->partitions_) {
for (auto &it : *l) {
auto *const p = it.get();
p->cluster.flags |= unsigned(topic_partition::Cluster::Flags::GC_ISR);
dirty_partitions.insert(p);
invalidate_replicated_partitions_from_peer_cache_by_partition(p);
}
}
}
}
if (state->total_enabled.defined) {
if (const auto cur = t->total_enabled_partitions; cur != state->total_enabled.value) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "TOPICS:For ", t->name(), " total_enabled_partitions(", cur, ") => ", state->total_enabled.value, ansifmt::reset, "\n");
}
if (const auto update = state->total_enabled.value; update < cur) {
for (auto i = update; i < cur; ++i) {
auto p = t->partitions_->at(i).get();
TANK_EXPECT(p);
p->cluster.flags |= unsigned(topic_partition::Cluster::Flags::GC_ISR);
dirty_partitions.insert(p);
invalidate_replicated_partitions_from_peer_cache_by_partition(p);
}
} else if (update > cur) {
for (auto i = cur; i < update; ++i) {
auto *const p = t->partitions_->at(i).get();
TANK_EXPECT(p);
p->cluster.flags |= unsigned(topic_partition::Cluster::Flags::GC_ISR);
dirty_partitions.insert(p);
invalidate_replicated_partitions_from_peer_cache_by_partition(p);
}
}
t->total_enabled_partitions = state->total_enabled.value;
}
}
if (state->rf.defined) {
if (t->cluster.rf_ != state->rf.value) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "TOPICS:For ", t->name(), " RF(", t->cluster.rf_, ") => ", state->rf.value, ansifmt::reset, "\n");
}
t->cluster.rf_ = state->rf.value;
if (auto l = t->partitions_) {
for (auto &it : *l) {
dirty_partitions.insert(it.get());
}
}
}
}
}
// XXX: order we reconcile partition updates is important
// (replicas first, leader second, ISR third)
// Notice that we *only* cleanup ISR (see GC_ISR) after we have
// applied partitions state because we need to know who the leader is, and we process
// leadership state update after process topology updates.
#pragma mark PARTITIONS
for (auto &it : cluster_state.updates.pm) {
auto p = it.first;
auto state = it.second.get();
bool dirty{false};
#if 0
if (trace && (state->leader.defined || state->replicas.updated || state->isr_update)) {
SLog("Partition ", p->owner->name(), "/", p->idx,
" leader{defined:", state->leader.defined, ", id:", state->leader.id,
"} replicas:{updated:", state->replicas.updated,
", nodes:", state->replicas.nodes.size(),
"} isr_updates:", state->isr_update ? state->isr_update->size() : 0, "\n");
}
#endif
#pragma mark PARTITION/REPLICAS
if (std::exchange(state->replicas.updated, false)) {
// reconcile RS
std::vector<cluster_node *> new_set;
const auto & updates = state->replicas.nodes;
size_t ui = 0, ei = 0;
const auto usize = updates.size();
const auto esize = p->cluster.replicas.nodes.size();
if (trace) {
SLog("REPLICAS set was updated\n");
}
while (ui < usize && ei < esize) {
auto un = updates[ui];
auto en = p->cluster.replicas.nodes[ei];
if (un->id == en->id) {
if (trace) {
SLog("RS:Retaining replica of ", p->owner->name(), "/", p->idx, " ", un->id, "@", un->ep, "\n");
}
new_set.emplace_back(un);
++ui;
++ei;
} else if (en->id < un->id) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:Dropping replica of ", p->owner->name(), "/", p->idx, " ", en->id, "@", en->ep, ansifmt::reset, "\n");
}
if (en == self && p->log_open()) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:Also closing log", ansifmt::reset, "\n");
}
close_partition_log(p);
}
nodes_replicas_updates.emplace_back(std::make_pair(en, std::make_pair(p, false)));
dirty = true;
++ei;
} else {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:New replica for ", p->owner->name(), "/", p->idx, " ", un->id, "@", un->ep, ansifmt::reset, "\n");
}
new_set.emplace_back(un);
nodes_replicas_updates.emplace_back(std::make_pair(un, std::make_pair(p, true)));
dirty = true;
++ui;
}
}
while (ui < usize) {
auto n = updates[ui++];
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:New replica for ", p->owner->name(), "/", p->idx, " ", n->id, "@", n->ep, ansifmt::reset, "\n");
}
// force ISR cleanup later
dirty = true;
new_set.emplace_back(n);
nodes_replicas_updates.emplace_back(std::make_pair(n, std::make_pair(p, true)));
}
while (ei < esize) {
auto en = p->cluster.replicas.nodes[ei++];
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:Dropping replica of ", p->owner->name(), "/", p->idx, " ", en->id, "@", en->ep, ansifmt::reset, "\n");
}
if (en == self && p->log_open()) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "RS:Also closing log", ansifmt::reset, "\n");
}
close_partition_log(p);
}
nodes_replicas_updates.emplace_back(std::make_pair(en, std::make_pair(p, false)));
dirty = true;
}
TANK_EXPECT(std::is_sorted(new_set.begin(), new_set.end(), [](const auto a, const auto b) noexcept { return a->id < b->id; }));
// we also need to track as dirty if no nodes are assigned as replicas e.g when
// the partition is first defined
if (dirty || 0 == usize) {
p->cluster.flags |= unsigned(topic_partition::Cluster::Flags::GC_ISR);
p->cluster.replicas.nodes = new_set;
dirty_partitions.insert(p);
invalidate_replicated_partitions_from_peer_cache_by_partition(p);
}
}
#pragma mark PARTITION/LEADER
if (state->leader.defined) {
auto l = state->leader.id ? cluster_state.find_node(state->leader.id) : nullptr;
// FILTER:
// We cannot allow leaders that is not in this partition's RS
// If we do(TODO:) process_peer_consume_resp() will get a bounary check fault
if (l && l != p->cluster.leader.node && !p->cluster.replicas.count(l->id)) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "PARTITION-FILTER: For ", p->owner->name(), "/", p->idx, " will *ignore* leadership promotion to ", l->id, "@", l->ep,
": not in the RS. Will assume <NO LEADER> instead", ansifmt::reset, "\n");
}
l = nullptr;
}
if (auto prev = p->cluster.leader.node; prev != l) {
if (trace) {
if (prev) {
if (l) {
SLog(ansifmt::bgcolor_magenta, "PARTITION:For ", p->owner->name(), "/", p->idx, " leader changed from ",
prev->id, "@", prev->ep, " to ", l->id, "@", l->ep, ansifmt::reset, "\n");
} else {
SLog(ansifmt::bgcolor_magenta, "PARTITION:For ", p->owner->name(), "/", p->idx, " leader changed from ",
prev->id, "@", prev->ep, " to <NO LEADER>", ansifmt::reset, "\n");
}
} else if (l) {
SLog(ansifmt::bgcolor_magenta, "PARTITION:For ", p->owner->name(), "/", p->idx, " leader changed from <NO LEADER> to ",
l->id, "@", l->ep, ansifmt::reset, "\n") ;
}
}
if (prev) {
// update prev->leadership
invalidate_replicated_partitions_from_peer_cache(prev);
TANK_EXPECT(!p->cluster.leader.leadership_ll.empty());
TANK_EXPECT(prev->leadership.partitions_cnt);
p->cluster.leader.leadership_ll.detach_and_reset();
p->cluster.leader.node = nullptr;
prev->leadership.partitions_cnt--;
prev->leadership.dirty = true;
prev->leadership.local_replication_list.reset(nullptr);
if (trace && prev == self) {
SLog(ansifmt::bgcolor_magenta, "Resigned leadership of partition; will wakeup_all_consumers()", ansifmt::reset, "\n");
}
wakeup_all_consumers(p);
}
if (l) {
// update l->leadership
invalidate_replicated_partitions_from_peer_cache(l);
TANK_EXPECT(p->cluster.leader.leadership_ll.empty());
p->cluster.leader.node = l;
p->cluster.leader.leadership_ll.reset();
l->leadership.partitions_cnt++;
l->leadership.dirty = true;
l->leadership.local_replication_list.reset(nullptr);
l->leadership.list.push_back(&p->cluster.leader.leadership_ll);
if (leader_self and p->enabled() and not l->available()) {
// INVARIANT: a partition leader must be available
// someone is likely messing with us, or this is bogus consul state
//
// XXX: we can't use ForceChooseLeader because it is not inconceivable that
// we will also require an RS update for this 'session', which means that
// we won't get to consider a new leader (see SEMANTICS)
//
// To that end, we will need to move this functionality into gen_partition_nodes_updates()
// It means it will be somewhat more expensive there but it is otherwise necessary
if (trace) {
SLog(ansifmt::bgcolor_magenta, "PARTITION:For partition ", p->owner->name(), "/", p->idx, " new leader ", l->id, "@", l->ep,
" is N/A. We will choose another as soon as possible", ansifmt::reset, "\n");
}
} else if (l == self) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "PARTITION:Became leader of partition ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
// When we assume partition leadership, we need to always GC the partition's ISR
p->cluster.flags |= unsigned(topic_partition::Cluster::Flags::GC_ISR);
// IMPORTANT:
// wee need to immediately bump hwmark here to last assigned
// otherewise consume requests may fail with a boundary check fault
//
// UPDATE: 2021-10-14
// _do_ we really to do that? maybe we don't
// we 'd like to _defer_ this for as long as possible
// especially considering that open_partition_log() will set_hwmark()
#if 0
auto log = partition_log(p);
if (trace) {
SLog(ansifmt::bold, ansifmt::bgcolor_magenta, "**Bumping HW from ", partition_hwmark(p),
" to ", log->lastAssignedSeqNum, ansifmt::reset, "\n");
}
set_hwmark(p, log->lastAssignedSeqNum);
#endif
}
} else if (p->require_leader()) {
if (trace) {
SLog(ansifmt::color_magenta, "PARTITION:New leader of ", p->owner->name(), "/", p->idx, " is unknown. Will need to select another as soon as possible", ansifmt::reset, "\n");
}
}
// we will track partition as dirty and we will check
// for replication streams later
dirty = true;
}
}
#pragma mark PARTITION/ISR
if (const auto v = state->isr_update.get()) {
std::unordered_set<cluster_node *> set;
if (trace) {
SLog("ISRs list of ", p->owner->name(), "/", p->idx, ":", values_repr(v->data(), v->size()), "\n");
}
for (const auto id : *v) {
// FILTER: only *available* nodes that are also in the RS may be included in the ISR
if (auto node = cluster_state.find_node(id); node && node->available() && p->cluster.replicas.count(id)) {
set.insert(node);
}
}
for (auto it = p->cluster.isr.list.next; it != &p->cluster.isr.list;) {
auto next = it->next;
auto isr_e = switch_list_entry(isr_entry, partition_ll, it);
auto node = isr_e->node();
if (set.erase(node)) {
// retain
} else {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "ISR:Removing ", node->id, "@", node->ep,
" from ISR of ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
isr_dispose(isr_e);
dirty = true;
}
it = next;
}
if (!set.empty()) {
#ifdef HWM_UPDATE_BASED_ON_ACKS
auto next = p->cluster.isr.next_partition_isr_node_id();
for (auto node : set) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "ISR:Adding ", node->id, "@", node->ep,
" to ISR of ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
TANK_EXPECT(node);
isr_bind(p, node, next++, __LINE__);
dirty = true;
}
#else
for (auto node : set) {
if (trace) {
SLog(ansifmt::bgcolor_magenta, "ISR:Adding ", node->id, "@", node->ep,
" to ISR of ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
TANK_EXPECT(node);
isr_bind(p, node, __LINE__);
dirty = true;
}
#endif
}
}
if (dirty) {
TANK_EXPECT(p);
dirty_partitions.insert(p);
}
}
if (trace) {
SLog("dirty_partitions:", dirty_partitions.size(), ", leader_self = ", leader_self, "\n");
}
#pragma mark DEFERRED UPDATES
// deferred nodes replica_for updates
// this is for performance and simplicity reasons (as opposed to updating affected nodes wehile reconciling partitions replicas ^^)
// XXX: make sure those are applied before we run DEFERRED REPAIRS
std::sort(nodes_replicas_updates.begin(), nodes_replicas_updates.end(), [](const auto &a, const auto &b) noexcept {
return a.first < b.first;
});
for (const auto *p = nodes_replicas_updates.data(), *const e = p + nodes_replicas_updates.size(); p < e;) {
auto n = p->first;
v.clear();
pv.clear();
do {
v.emplace_back(p->second);
} while (++p < e and p->first == n);
std::sort(v.begin(), v.end(),
[](const auto &a, const auto &b) noexcept { return a.first < b.first; });
const auto en = n->replica_for.size();
const auto un = v.size();
uint32_t ei = 0, ui = 0;
auto & replica_for = n->replica_for;
while (ei < en and ui < un) {
auto ep = replica_for[ei];
auto up = v[ui].first;
if (ep < up) {
// retain
pv.emplace_back(ep);
++ei;
} else if (up < ep) {
if (v[ui].second) {
if (trace) {
SLog(">> Node ", n->id, "@", n->ep, " now replica of ", up->owner->name(), "/", up->idx, "\n");
}
pv.emplace_back(up);
}
++ui;
} else {
if (!v[ui].second) {
// drop
if (trace) {
SLog(">> Node ", n->id, "@", n->ep, " no longer replica of ", up->owner->name(), "/", up->idx, "\n");
}
} else {
// retain
pv.emplace_back(ep);
}
++ei;
++ui;
}
}
while (ei < en) {
pv.emplace_back(replica_for[ei++]);
}
while (ui < un) {
if (v[ui].second) {
auto up = v[ui].first;
if (trace) {
SLog(">> Node ", n->id, "@", n->ep, " now replica of ", up->owner->name(), "/", up->idx, "\n");
}
pv.emplace_back(up);
}
++ui;
}
replica_for = pv;
TANK_EXPECT(std::is_sorted(replica_for.begin(), replica_for.end()));
if (trace) {
SLog(">> Node ", n->id, "@", n->ep, " replica_for.size() = ", replica_for.size(), "\n");
}
}
// now that we have reconciled consul state updates with inmemory state
// we can repair or react to those based on wether this node is the cluster leader or leader for any of the dirty partitions
#pragma mark DEFERRED REPAIRS
for (auto p : dirty_partitions) {
auto part_leader = p->cluster.leader.node;
const auto partition_leader_self = part_leader == self;
const auto req_leader = p->require_leader();
#pragma mark REPAIR_ISR
if (p->enabled() and
(p->cluster.flags & unsigned(topic_partition::Cluster::Flags::GC_ISR))) {
// REPAIR: self either just became leader of this partition, or the RS of the partition was modified
// We are going to make sure that the RS is sane (we are the parttiion's leader)
//
// UPDATE:
// It is important to GC the local ISR even if we are not the partition leader
// This is because there is race-based edge case we need to account for
// where a Produce request may fail with Insufficient Replicas.
// DESCRIPTION:
// When we switched RF from 2 to 1, the cluster leader updated the partition RS
// from 2 to 1 nodes and all nodes received and applied the updates.
// However, the ISR update arrived *after* the RS update, so for the however many miliseconds
// it took to receive the subsequent ISR update, the partition leader had
// topic.cluster.rf_ = 1, partition.cluster.replicas.nodes.size = 1, partitionc.cluster.isr.size = <2>
// so if while we are in this limbo state get a produce request, we get a produce request with ack == 0
// we take that to mean that the client requires (ISR size) acks, rf = 2. However,
// if (rf > topic->cluster.rf_f) is true so the request fails with InsufficientReplicas
// (indeed, the ISR update arrived after a few ms after the produce request failed)
//
// This happens rarely, but it _does_ happen.
// We could have the cluster leader to GC/persist ISR in the same update session
// but
// 1. the cluster leader and the partition leader may be different nodes
// 2. We should respect the semantics(only the partition leader can persist ISR)
// 3. We can't guarantee logical order of updates will be respected
//
// To that end:
// - *all* nodes will repair ISR, not just the partition leader
// - when RF is updated we will make sure to GC ISR
//
// However, this is *STILL* an issue:
// If we update RF, and we get a produce request *before* the cluster leader gets to process the RF
// in order to generate RS updates for all affected partitions, at the time we get the prdouce request
// topic.cluster.rf_ = 1, partition.cluster.replicas.nodes.size = 2, partitionc.cluster.isr.size = <2>
// which means we din't get to GC ISR here locally.
// To that end, our topic::compute_required_peers_acks() we will need to account for topic RF as well.
//
//
// ** We still can only persist iff we are the partition leader **
bool any{false};
if (trace && false) {
SLog(ansifmt::color_magenta, "Will need to GC ISR of ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
for (auto it = p->cluster.isr.list.next; it != &p->cluster.isr.list;) {
auto isr_e = switch_list_entry(isr_entry, partition_ll, it);
auto next = it->next;
auto node = isr_e->node();
if (!node->available()) {
if (trace) {
SLog(ansifmt::bgcolor_red, "REPAIR:Node ", node->id, "@", node->ep, " is in ", p->owner->name(), "/", p->idx,
" ISR, but node is not available. Will erase from ISR", ansifmt::reset, "\n");
}
isr_dispose(isr_e);
any = true;
} else if (!p->cluster.replicas.count(node->id)) {
if (trace) {
SLog(ansifmt::bgcolor_red, "REPAIR:Node ", node->id, "@", node->ep, " is in ", p->owner->name(), "/", p->idx,
" ISR, but node is not in RS. Will erase from ISR", ansifmt::reset, "\n");
}
isr_dispose(isr_e);
any = true;
}
it = next;
}
if (any && partition_leader_self) {
// we *only* persist if we are the partition leader
persist_isr(p, __LINE__);
}
}
if (part_leader == self) {
// only the partition leader may update+persist ISR
if (not p->enabled()) {
// REPAIR:
// (only partition leader may update ISR on consul)
if (not p->cluster.isr.list.empty()) {
if (trace) {
SLog(ansifmt::bgcolor_red, "REPAIR:Partition ", p->owner->name(), "/", p->idx,
" is disabled and this node is the partition leader: will reset ISR of size ",
p->cluster.isr.size(), ansifmt::reset, "\n");
}
while (not p->cluster.isr.list.empty()) {
auto isr_e = switch_list_entry(isr_entry, partition_ll, p->cluster.isr.list.next);
isr_dispose(isr_e);
}
persist_isr(p, __LINE__);
}
} else {
if (req_leader and
p->cluster.replicas.count(self->id) and
not p->cluster.isr.find(self)) {
// REPAIR: partition requires a leader, we are that leader, we are in the RS
// but for some reason we are not in the ISR
// Looks like the cluster leader promoted us to the partition leader
if (trace) {
SLog(ansifmt::bgcolor_red, "REPAIR:Partition ", p->owner->name(), "/", p->idx,
" requires a leader, and leader is self, but leader is missing from the ISR and is in the RS. Will add self to it", ansifmt::reset, "\n");
}
isr_bind(p, self, __LINE__);
persist_isr(p, __LINE__);
}
}
}
#pragma mark STREAMS
auto *const peer = p->enabled() and
p->cluster.leader.node and
not partition_leader_self and
p->cluster.leader.node->available() and
self->is_replica_for(p)
? p->cluster.leader.node
: nullptr;
if (auto stream = p->cluster.rs) {
auto src = stream->src;
if (src != peer) {
if (trace) {
if (peer) {
SLog(ansifmt::bgcolor_red, "STREAMS: Stream source of ", p->owner->name(), "/", p->idx,
" changed from ", src->id, "@", src->ep, " to ", peer->id, "@", peer->ep, ansifmt::reset, "\n");
} else {
SLog(ansifmt::bgcolor_red, "STREAMS: Stream source of ", p->owner->name(), "/", p->idx,
" changed from ", src->id, "@", src->ep, " to <NO NODE>", ansifmt::reset, "\n");
}
}
stream_stop.emplace_back(p, src);
if (peer) {
if (trace) {
SLog(ansifmt::bgcolor_red, "STREAMS: Will (re)start replication of ", p->owner->name(), "/", p->idx,
" from ", peer->id, "@", peer->ep, ansifmt::reset, "\n");
}
stream_start.emplace_back(p, peer);
}
}
} else if (peer) {
if (trace) {
SLog(ansifmt::bgcolor_red, "STREAMS: This node is a replica of ", p->owner->name(), "/", p->idx,
", but no active rep.stream. Will replicate from ", peer->id, "@", peer->ep, ansifmt::reset, "\n");
}
stream_start.emplace_back(p, peer);
}
// IMPORTANT:
p->cluster.flags &= ~(unsigned(topic_partition::Cluster::Flags::GC_ISR));
if (!p->enabled()) {
if (p->log_open()) {
if (trace) {
SLog(ansifmt::bgcolor_red, "*CLOSING* partition ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
close_partition_log(p);
}
if (p->safe_to_reset()) {
if (trace) {
SLog(ansifmt::bgcolor_red, "*RESETTING* partition ", p->owner->name(), "/", p->idx, ansifmt::reset, "\n");
}
reset_partition_log(p);
}
}
if (leader_self) {
cluster_partitions_dirty.emplace_back(p);
}
}
conclude_bootstrap_updates();
#pragma mark FINALIZE
if (trace) {
SLog("cluster_partitions_dirty: ", cluster_partitions_dirty.size(),
", stream_stop: ", stream_stop.size(),
", stream_start:", stream_start.size(),
", promoted_to_cluster_leader: ", promoted_to_cluster_leader, "\n");
}
if (not stream_start.empty() or not stream_stop.empty()) {
replicate_partitions(&stream_start, &stream_stop);
}