forked from spacemeshos/go-spacemesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstatustracker.go
169 lines (150 loc) · 5.1 KB
/
statustracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package hare
import (
"context"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
)
// statusTracker tracks status messages.
// Provides functions to build a proposal and validate the statuses.
type statusTracker struct {
logger log.Log
round uint32
malCh chan<- *types.MalfeasanceGossip
statuses map[types.NodeID]*Message // maps PubKey->StatusMsg
threshold int // threshold to indicate a set can be proved
maxCommittedRound uint32 // tracks max committed round (Ki) value in tracked status Messages
maxSet *Set // tracks the max raw set in the tracked status Messages
analyzed bool // indicates if the Messages have already been analyzed
eTracker *EligibilityTracker
tally *CountInfo
}
func newStatusTracker(logger log.Log, round uint32, mch chan<- *types.MalfeasanceGossip, et *EligibilityTracker, threshold, expectedSize int) *statusTracker {
return &statusTracker{
logger: logger,
malCh: mch,
round: round,
statuses: make(map[types.NodeID]*Message, expectedSize),
threshold: threshold,
maxCommittedRound: preRound,
eTracker: et,
}
}
// RecordStatus records the given status message.
func (st *statusTracker) RecordStatus(ctx context.Context, msg *Message) {
metadata := types.HareMetadata{
Layer: msg.Layer,
Round: msg.Round,
MsgHash: msg.signedHash,
}
if prev, exist := st.statuses[msg.SmesherID]; exist { // already handled this sender's status msg
st.logger.WithContext(ctx).With().Warning("duplicate status message detected",
log.Stringer("smesher", msg.SmesherID),
)
prevMetadata := types.HareMetadata{
Layer: prev.Layer,
Round: prev.Round,
MsgHash: prev.signedHash,
}
if !prevMetadata.Equivocation(&metadata) {
return
}
st.logger.WithContext(ctx).With().Warning("equivocation detected in status round",
log.Stringer("smesher", msg.SmesherID),
log.Object("prev", prev),
log.Object("curr", &metadata),
)
st.eTracker.Track(msg.SmesherID, msg.Round, msg.Eligibility.Count, false)
old := &types.HareProofMsg{
InnerMsg: prevMetadata,
SmesherID: msg.SmesherID,
Signature: prev.Signature,
}
this := &types.HareProofMsg{
InnerMsg: metadata,
SmesherID: msg.SmesherID,
Signature: msg.Signature,
}
if err := reportEquivocation(ctx, msg.SmesherID, old, this, &msg.Eligibility, st.malCh); err != nil {
st.logger.WithContext(ctx).With().Warning("failed to report equivocation in status round",
log.Stringer("smesher", msg.SmesherID),
log.Err(err),
)
return
}
return
}
st.statuses[msg.SmesherID] = msg
}
// AnalyzeStatusMessages analyzes the recorded status messages by the validation function.
func (st *statusTracker) AnalyzeStatusMessages(isValid func(m *Message) bool) {
for key, m := range st.statuses {
if !isValid(m) { // only keep valid Messages
delete(st.statuses, key)
} else {
if m.CommittedRound >= st.maxCommittedRound || st.maxCommittedRound == preRound { // track max Ki & matching raw set
st.maxCommittedRound = m.CommittedRound
st.maxSet = NewSet(m.Values)
}
}
}
// only valid messages are left now
var ci CountInfo
st.eTracker.ForEach(st.round, func(node types.NodeID, cr *Cred) {
// only counts the eligibility count from the committed msgs
if _, ok := st.statuses[node]; ok {
if cr.Honest {
ci.IncHonest(cr.Count)
} else {
ci.IncDishonest(cr.Count)
}
} else if !cr.Honest {
ci.IncKnownEquivocator(cr.Count)
}
})
st.tally = &ci
st.analyzed = true
if st.tally.Meet(st.threshold) {
st.logger.With().Info("status round completed", log.Object("eligibility_count", st.tally))
} else {
st.logger.With().Warning("status round failed", log.Object("eligibility_count", st.tally))
}
}
// IsSVPReady returns true if there are enough statuses to build an SVP, false otherwise.
func (st *statusTracker) IsSVPReady() bool {
return st.analyzed && st.tally.Meet(st.threshold)
}
// ProposalSet returns the proposed set if available, nil otherwise.
func (st *statusTracker) ProposalSet(expectedSize int) *Set {
if st.maxCommittedRound == preRound {
return st.buildUnionSet(expectedSize)
}
if st.maxSet == nil { // should be impossible to reach
st.logger.Fatal("maxSet is unexpectedly nil")
}
return st.maxSet
}
// returns the union set of all status Messages collected.
func (st *statusTracker) buildUnionSet(expectedSize int) *Set {
unionSet := NewEmptySet(expectedSize)
for _, m := range st.statuses {
for _, v := range NewSet(m.Values).ToSlice() {
unionSet.Add(v) // assuming add is unique
}
}
return unionSet
}
// BuildSVP builds the SVP if available and returns it, it return false otherwise.
func (st *statusTracker) BuildSVP() *AggregatedMessages {
if !st.IsSVPReady() {
return nil
}
svp := &AggregatedMessages{}
for _, m := range st.statuses {
svp.Messages = append(svp.Messages, *m)
}
// TODO: set aggregated signature
if len(svp.Messages) == 0 {
return nil
}
return svp
}