Skip to content

Commit c71b715

Browse files
Merge pull request #4873 from harmony-one/refactor/proto_id
Refactor Stream Protocol ID, Add New Protocol for Epoch Sync to P2P Host
2 parents 420cd94 + f210b3a commit c71b715

File tree

9 files changed

+102
-117
lines changed

9 files changed

+102
-117
lines changed

api/service/stagedstreamsync/downloader.go

+43-17
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,28 @@ type (
4242
)
4343

4444
// NewDownloader creates a new downloader
45-
func NewDownloader(host p2p.Host, bc core.BlockChain, nodeConfig *nodeconfig.ConfigType, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
46-
config.fixValues()
45+
func NewDownloader(host p2p.Host,
46+
bc core.BlockChain,
47+
nodeConfig *nodeconfig.ConfigType,
48+
consensus *consensus.Consensus,
49+
dbDir string,
50+
isBeaconNode bool,
51+
config Config) *Downloader {
4752

48-
sp := streamSyncProtocol.NewProtocol(streamSyncProtocol.Config{
49-
Chain: bc,
50-
Host: host.GetP2PHost(),
51-
Discovery: host.GetDiscovery(),
52-
ShardID: nodeconfig.ShardID(bc.ShardID()),
53-
Network: config.Network,
54-
BeaconNode: isBeaconNode,
55-
Validator: nodeConfig.Role() == nodeconfig.Validator,
56-
Explorer: nodeConfig.Role() == nodeconfig.ExplorerNode,
57-
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
58-
SmSoftLowCap: config.SmSoftLowCap,
59-
SmHardLowCap: config.SmHardLowCap,
60-
SmHiCap: config.SmHiCap,
61-
DiscBatch: config.SmDiscBatch,
62-
})
53+
config.fixValues()
6354

55+
protoCfg := protocolConfig(host, bc, nodeConfig, isBeaconNode, config)
56+
sp := streamSyncProtocol.NewProtocol(*protoCfg)
6457
host.AddStreamProtocol(sp)
6558

59+
// beacon nodes support epoch chain as well
60+
if isBeaconNode {
61+
epochProtoCfg := protocolConfig(host, bc, nodeConfig, isBeaconNode, config)
62+
epochProtoCfg.EpochChain = true
63+
epochChainProtocol := streamSyncProtocol.NewProtocol(*epochProtoCfg)
64+
host.AddStreamProtocol(epochChainProtocol)
65+
}
66+
6667
logger := utils.Logger().With().
6768
Str("module", "StagedStreamSync").
6869
Uint32("ShardID", bc.ShardID()).
@@ -101,6 +102,31 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, nodeConfig *nodeconfig.Con
101102
}
102103
}
103104

105+
// protocolConfig returns protocol config
106+
func protocolConfig(host p2p.Host,
107+
bc core.BlockChain,
108+
nodeConfig *nodeconfig.ConfigType,
109+
isBeaconNode bool,
110+
config Config) *streamSyncProtocol.Config {
111+
112+
return &streamSyncProtocol.Config{
113+
Chain: bc,
114+
Host: host.GetP2PHost(),
115+
Discovery: host.GetDiscovery(),
116+
ShardID: nodeconfig.ShardID(bc.ShardID()),
117+
Network: config.Network,
118+
BeaconNode: isBeaconNode,
119+
Validator: nodeConfig.Role() == nodeconfig.Validator,
120+
Explorer: nodeConfig.Role() == nodeconfig.ExplorerNode,
121+
EpochChain: !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID,
122+
MaxAdvertiseWaitTime: config.MaxAdvertiseWaitTime,
123+
SmSoftLowCap: config.SmSoftLowCap,
124+
SmHardLowCap: config.SmHardLowCap,
125+
SmHiCap: config.SmHiCap,
126+
DiscBatch: config.SmDiscBatch,
127+
}
128+
}
129+
104130
// Start starts the downloader
105131
func (d *Downloader) Start() {
106132
go func() {

hmy/downloader/downloader.go

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/harmony-one/harmony/p2p"
1919
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
2020
"github.com/harmony-one/harmony/p2p/stream/protocols/sync"
21+
"github.com/harmony-one/harmony/shard"
2122
)
2223

2324
type (
@@ -56,6 +57,7 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, nodeConfig *nodeconfig.Con
5657
BeaconNode: isBeaconNode,
5758
Validator: nodeConfig.Role() == nodeconfig.Validator,
5859
Explorer: nodeConfig.Role() == nodeconfig.ExplorerNode,
60+
EpochChain: !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID,
5961
SmSoftLowCap: config.SmSoftLowCap,
6062
SmHardLowCap: config.SmHardLowCap,
6163
SmHiCap: config.SmHiCap,

p2p/stream/common/streammanager/interface_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var _ StreamManager = &streamManager{}
1818

1919
var (
2020
myPeerID = makePeerID(0)
21-
testProtoID = sttypes.ProtoID("harmony/sync/unitest/0/1.0.0/1")
21+
testProtoID = sttypes.ProtoID("harmony/sync/unitest/0/1.0.0")
2222
)
2323

2424
const (

p2p/stream/common/streammanager/streammanager.go

+3-19
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/harmony-one/abool"
1111
"github.com/harmony-one/harmony/internal/utils"
1212
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
13-
"github.com/harmony-one/harmony/shard"
1413
"github.com/libp2p/go-libp2p/core/network"
1514
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
1615
"github.com/libp2p/go-libp2p/core/protocol"
@@ -113,12 +112,6 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
113112

114113
protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid)
115114

116-
// if it is a beacon node or shard node, print the peer id and proto id
117-
if protoSpec.IsBeaconValidator || protoSpec.ShardID != shard.BeaconChainShardID {
118-
fmt.Println("My peer id: ", host.ID().String())
119-
fmt.Println("My proto id: ", pid)
120-
}
121-
122115
return &streamManager{
123116
myProtoID: pid,
124117
myProtoSpec: protoSpec,
@@ -476,13 +469,12 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
476469
func (sm *streamManager) discover(ctx context.Context) (<-chan libp2p_peer.AddrInfo, error) {
477470
numStreams := sm.streams.size()
478471

479-
protoID := sm.targetProtoID()
480472
discBatch := sm.config.DiscBatch
481473
if sm.config.HiCap-numStreams < sm.config.DiscBatch {
482474
discBatch = sm.config.HiCap - numStreams
483475
}
484476
sm.logger.Debug().
485-
Interface("protoID", protoID).
477+
Interface("protoID", sm.myProtoID).
486478
Int("numStreams", numStreams).
487479
Int("discBatch", discBatch).
488480
Msg("[StreamManager] discovering")
@@ -495,15 +487,7 @@ func (sm *streamManager) discover(ctx context.Context) (<-chan libp2p_peer.AddrI
495487
<-time.After(discTimeout)
496488
cancel()
497489
}()
498-
return sm.pf.FindPeers(ctx2, protoID, discBatch)
499-
}
500-
501-
func (sm *streamManager) targetProtoID() string {
502-
targetSpec := sm.myProtoSpec
503-
if targetSpec.ShardID == shard.BeaconChainShardID { // for beacon chain, only connect to beacon nodes
504-
targetSpec.IsBeaconValidator = true
505-
}
506-
return string(targetSpec.ToProtoID())
490+
return sm.pf.FindPeers(ctx2, string(sm.myProtoID), discBatch)
507491
}
508492

509493
func (sm *streamManager) setupStreamWithPeer(ctx context.Context, pid libp2p_peer.ID) error {
@@ -513,7 +497,7 @@ func (sm *streamManager) setupStreamWithPeer(ctx context.Context, pid libp2p_pee
513497
nCtx, cancel := context.WithTimeout(ctx, connectTimeout)
514498
defer cancel()
515499

516-
st, err := sm.host.NewStream(nCtx, pid, protocol.ID(sm.targetProtoID()))
500+
st, err := sm.host.NewStream(nCtx, pid, protocol.ID(sm.myProtoID))
517501
if err != nil {
518502
return err
519503
}

p2p/stream/common/streammanager/streammanager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func TestStreamSet_numStreamsWithMinProtoID(t *testing.T) {
264264
pid1 = testProtoID
265265
numPid1 = 5
266266

267-
pid2 = sttypes.ProtoID("harmony/sync/unitest/0/1.0.1/1")
267+
pid2 = sttypes.ProtoID("harmony/sync/unitest/0/1.0.1")
268268
numPid2 = 10
269269
)
270270

p2p/stream/protocols/sync/protocol.go

+28-37
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package sync
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"math/rand"
7-
"strconv"
88
"time"
99

1010
"github.com/ethereum/go-ethereum/event"
@@ -17,7 +17,6 @@ import (
1717
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
1818
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
1919
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
20-
"github.com/harmony-one/harmony/shard"
2120
"github.com/hashicorp/go-version"
2221
libp2p_host "github.com/libp2p/go-libp2p/core/host"
2322
libp2p_network "github.com/libp2p/go-libp2p/core/network"
@@ -27,7 +26,8 @@ import (
2726

2827
const (
2928
// serviceSpecifier is the specifier for the service.
30-
serviceSpecifier = "sync"
29+
SyncServiceSpecifier = "sync"
30+
EpochSyncServiceSpecifier = "epochsync"
3131
)
3232

3333
var (
@@ -71,6 +71,7 @@ type (
7171
BeaconNode bool
7272
Validator bool
7373
Explorer bool
74+
EpochChain bool
7475

7576
MaxAdvertiseWaitTime int
7677
// stream manager config
@@ -106,6 +107,12 @@ func NewProtocol(config Config) *Protocol {
106107

107108
sp.rm = requestmanager.NewRequestManager(sp.sm)
108109

110+
// if it is not epoch chain, print the peer id and proto id
111+
if !config.EpochChain {
112+
fmt.Println("My peer id: ", config.Host.ID().String())
113+
fmt.Println("My proto id: ", sp.ProtoID())
114+
}
115+
109116
sp.logger = utils.Logger().With().Str("Protocol", string(sp.ProtoID())).Logger()
110117
return sp
111118
}
@@ -115,10 +122,7 @@ func (p *Protocol) Start() {
115122
p.sm.Start()
116123
p.rm.Start()
117124
p.rl.Start()
118-
// If it's not EpochChain, advertise
119-
if p.config.BeaconNode || p.chain.ShardID() != shard.BeaconChainShardID {
120-
go p.advertiseLoop()
121-
}
125+
go p.advertiseLoop()
122126
}
123127

124128
// Close close the protocol
@@ -130,19 +134,18 @@ func (p *Protocol) Close() {
130134
close(p.closeC)
131135
}
132136

133-
// Specifier return the specifier for the protocol
134-
func (p *Protocol) Specifier() string {
135-
return serviceSpecifier + "/" + strconv.Itoa(int(p.config.ShardID))
136-
}
137-
138137
// ProtoID return the ProtoID of the sync protocol
139138
func (p *Protocol) ProtoID() sttypes.ProtoID {
140139
return p.protoIDByVersion(MyVersion)
141140
}
142141

143-
// ShardProtoID returns the ProtoID of the sync protocol for shard nodes
144-
func (p *Protocol) ShardProtoID() sttypes.ProtoID {
145-
return p.protoIDByVersionForShardNodes(MyVersion)
142+
// ServiceID returns the service ID of the sync protocol
143+
func (p *Protocol) ServiceID() string {
144+
serviceID := SyncServiceSpecifier
145+
if p.config.EpochChain {
146+
serviceID = EpochSyncServiceSpecifier
147+
}
148+
return serviceID
146149
}
147150

148151
// Version returns the sync protocol version
@@ -155,6 +158,11 @@ func (p *Protocol) IsBeaconValidator() bool {
155158
return p.config.BeaconNode && p.config.Validator
156159
}
157160

161+
// IsEpochChain returns true if it is a epoch chain
162+
func (p *Protocol) IsEpochChain() bool {
163+
return p.config.EpochChain
164+
}
165+
158166
// IsValidator returns true if it is a validator node
159167
func (p *Protocol) IsValidator() bool {
160168
return p.config.Validator
@@ -171,7 +179,7 @@ func (p *Protocol) Match(targetID protocol.ID) bool {
171179
if err != nil {
172180
return false
173181
}
174-
if target.Service != serviceSpecifier {
182+
if target.Service != p.ServiceID() {
175183
return false
176184
}
177185
if target.NetworkType != p.config.Network {
@@ -347,11 +355,6 @@ func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID {
347355
pids := make([]sttypes.ProtoID, 0, len(vs))
348356
for _, v := range vs {
349357
pids = append(pids, p.protoIDByVersion(v))
350-
// beacon node needs to inform shard nodes about it supports them as well for EpochChain
351-
// basically beacon node can accept connection from shard nodes to share last epoch blocks
352-
if p.IsBeaconValidator() {
353-
pids = append(pids, p.protoIDByVersionForShardNodes(v))
354-
}
355358
}
356359
return pids
357360
}
@@ -362,22 +365,10 @@ func (p *Protocol) supportedVersions() []*version.Version {
362365

363366
func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID {
364367
spec := sttypes.ProtoSpec{
365-
Service: serviceSpecifier,
366-
NetworkType: p.config.Network,
367-
ShardID: p.config.ShardID,
368-
Version: v,
369-
IsBeaconValidator: (p.config.Validator || p.config.Explorer) && p.config.BeaconNode,
370-
}
371-
return spec.ToProtoID()
372-
}
373-
374-
func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID {
375-
spec := sttypes.ProtoSpec{
376-
Service: serviceSpecifier,
377-
NetworkType: p.config.Network,
378-
ShardID: p.config.ShardID,
379-
Version: v,
380-
IsBeaconValidator: false,
368+
Service: p.ServiceID(),
369+
NetworkType: p.config.Network,
370+
ShardID: p.config.ShardID,
371+
Version: v,
381372
}
382373
return spec.ToProtoID()
383374
}

p2p/stream/protocols/sync/protocol_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ func TestProtocol_Match(t *testing.T) {
1616
targetID protocol.ID
1717
exp bool
1818
}{
19-
{"harmony/sync/unitest/0/1.0.1/1", true},
20-
{"harmony/sync/unitest/0/1.0.1/0", true},
19+
{"harmony/sync/unitest/0/1.0", true},
20+
{"harmony/sync/unitest/1/1.0.1", false},
21+
{"harmony/sync/unitest/0/1.2-alpha", true},
2122
{"h123456", false},
22-
{"harmony/sync/unitest/0/0.9.9/1", false},
23-
{"harmony/epoch/unitest/0/1.0.1/1", false},
24-
{"harmony/sync/mainnet/0/1.0.1/1", false},
25-
{"harmony/sync/unitest/1/1.0.1/1", false},
23+
{"harmony/sync/unitest/0/0.9.9", false},
24+
{"harmony/epochsync/unitest/0/1.0", false},
25+
{"harmony/sync/mainnet/0/1.0.1", false},
26+
{"harmony/sync/unitest/1/1.0.1", false},
2627
}
2728

2829
for i, test := range tests {

p2p/stream/types/interface.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ import (
1111
type Protocol interface {
1212
p2ptypes.LifeCycle
1313

14-
Specifier() string
1514
Version() *version.Version
1615
ProtoID() ProtoID
17-
// ShardProtoID() ProtoID
16+
ServiceID() string
1817
IsBeaconValidator() bool
1918
Match(id protocol.ID) bool
2019
HandleStream(st libp2p_network.Stream)

0 commit comments

Comments
 (0)