This repository was archived by the owner on Dec 10, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrplx_sync.go
69 lines (54 loc) · 1.94 KB
/
rplx_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package rplx
import (
"context"
"go.uber.org/zap"
)
// Sync is GRPC function, fired on incoming sync message
func (rplx *Rplx) Sync(ctx context.Context, req *SyncRequest) (*SyncResponse, error) {
rplx.logger.Debug("get SyncRequest", zap.Int("variables", len(req.Variables)), zap.String("from node", req.NodeID), zap.Any("vars", req.Variables))
rplx.metrics.variablesGot.WithLabelValues(req.NodeID).Add(float64(len(req.Variables)))
go rplx.sync(req)
return &SyncResponse{Code: 0}, nil
}
func (rplx *Rplx) sync(req *SyncRequest) {
for name, v := range req.Variables {
rplx.variablesMx.Lock()
localVar, ok := rplx.variables[name]
if !ok {
localVar = newVariable(name)
rplx.variables[name] = localVar
}
varWasUpdated := false
var remoteNodeInstance *node
rplx.nodesMx.RLock()
if remoteNodeAddr, ok := rplx.nodesIDToAddr[req.NodeID]; ok {
remoteNodeInstance = rplx.nodes[remoteNodeAddr]
}
rplx.nodesMx.RUnlock()
for nodeID, n := range v.NodesValues {
// Если мы получили данные с нашим remoteNodeID, пропускаем
if nodeID == rplx.nodeID {
continue
}
if localVar.updateItem(nodeID, n.Value, n.Version) {
varWasUpdated = true
// Если нода, от которой пришли данные, есть у нас в списке - куда мы шлем обновления,
// то для данной переменной для данной ноды запишем, что у нас самая свежая версия
if remoteNodeInstance != nil {
remoteNodeInstance.replicatedVersionsMx.Lock()
remoteNodeInstance.replicatedVersions[name+"@"+nodeID] = n.Version
remoteNodeInstance.replicatedVersionsMx.Unlock()
}
}
}
if localVar.ttlVersion < v.TTLVersion {
localVar.ttl = v.TTL
localVar.ttlVersion = v.TTLVersion
varWasUpdated = true
}
rplx.variablesMx.Unlock()
if varWasUpdated {
go rplx.sendToReplication(localVar)
}
}
}