本篇博客是PBFT
代码解读的第二篇博客。
IV. Primary节点对repBatch的处理
本系列的第一篇博客介绍了源代码的获取和测试用例的选择。强烈建议读者先阅读上述博客后,再来阅读当前这篇。
为方便读者查阅,本系列博客的链接整理如下:
由上一篇博客Fabric中PBFT源码解读(1)的第III节可知,测试数据repBatch
传入到了Primary
节点(节点0
)的events通道中。本节介绍Primary
节点如何取出repBatch
测试数据,并基于该数据构建及广播PrePrepare
消息。 在介绍Primary
节点的操作之前,首先介绍两个非常重要的变量:通道变量events
和测试网络变量net
。
A. 通道变量events和测试网络变量net
需要注意的是,在TestNetwork
测试用例中,每个节点其实是用一个协程表示的,并通过pbftEndpoint
实例与该协程进行交互。节点之间的消息传输也并不是真得通过物理网络进行传输的,而是通过pbftEndpoint
实例中维护的Event
类型的通道变量events
进行传输。 所有节点的pbftEndpoint
实例的引用都保存在一个pbftNetwork
类型的变量net
中。
在上一篇博客第II节中介绍的makePBFTNetwork
函数包含了以下代码。其中L178行定义的pn
变量从TestNetwork
函数中返回后即赋值给net变量。该pn/net
变量的pbftEndpoints
字段即保存了对所有pbftEndpoint
实例的引用,如L179到L183行所示。 1
2
3
4
5
6
7
8
9
10
11// [pbft-core_mock_test.go]makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork {
...
pn := &pbftNetwork{testnet: makeTestnet(N, endpointFunc)} // L178
pn.pbftEndpoints = make([]*pbftEndpoint, len(pn.endpoints)) // L179
for i, ep := range pn.endpoints {
pn.pbftEndpoints[i] = ep.(*pbftEndpoint)
pn.pbftEndpoints[i].sc.pbftNet = pn
} // L183
return pn
}pbftEndpoint
实例内部也维护了对上述net
的引用(通过嵌套testEndpoint
类型)。节点之间的通信便利用该引用指向的net
实例进行转发。相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// [pbft-core_mock_test.go]pbftEndpoint
type pbftEndpoint struct { // L29
*testEndpoint
pbft *pbftCore
sc *simpleConsumer
manager events.Manager
}
// [mock_network_test.go]pbftEndpoint->testEndpoint
type testEndpoint struct { // L50
id uint64
net *testnet
}
// [mock_network_test.go]pbftEndpoint->testEndpoint->testnet
type testnet struct { // L41
debug bool
N int
closed chan struct{}
endpoints []endpoint
msgs chan taggedMsg
filterFn func(int, int, []byte) []byte
}
// [pbft-core_mock_test.go]makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork {
...
endpointFunc := func(id uint64, net *testnet) endpoint { // L158
tep := makeTestEndpoint(id, net) // L159
pe := &pbftEndpoint{
testEndpoint: tep, // L161
manager: events.NewManagerImpl(),
}
...
}
...
pn := &pbftNetwork{testnet: makeTestnet(N, endpointFunc)} // L178
...
}
// [pbft-core_mock_test.go]makePBFTNetwork->[mock_network_test.go]makeTestnet
func makeTestnet(N int, initFn func(id uint64, network *testnet) endpoint) *testnet {
net := &testnet{}
net.msgs = make(chan taggedMsg, 100)
net.closed = make(chan struct{})
net.endpoints = make([]endpoint, N)
for i := range net.endpoints {
net.endpoints[i] = initFn(uint64(i), net)
}
return net
}
// [pbft-core_mock_test.go]makePBFTNetwork->[mock_network_test.go]makeTestEndpoint
func makeTestEndpoint(id uint64, net *testnet) *testEndpoint {
ep := &testEndpoint{}
ep.id = id
ep.net = net
return ep
}TestNetwork
函数的L158行定义了一个回调函数变量endpointFunc
,并将该变量作为参数传入到makeTestnet
函数中,如L178行所示。makeTestnet
函数中定义了一个testnet
类型的变量net
。该变量从makeTestnet
函数中返回后,即赋值给makePBFTNetwork
函数中pn变量中的testnet
字段。 与此同时,net
变量作为参数传入到回调函数initFn/endpointFunc
中。回到endpointFunc
函数的定义,该函数在L159行将参数net
传入到makeTestEndpoint
函数中,后者将参数net
赋值给testEndpoint
的net
变量。而该testEndpoint
变量(也即tep
)在makePBFTNetwork
函数中被赋值给pbftEndpoint
类的testEndpoint
,如makePBFTNetwork
函数中的L161行所示。
综上所述,外部环境(客户端)在向PBFT
集群发送消息时,通过TestNetwork
中的net
实例发送到Primary
节点的events
通道中。 PBFT
集群中,一个节点向其他节点发送消息时,首先通过其内部保存的net
引用找到net
实例,然后利用该实例将消息转发到特定节点的events
通道中。
B. Preprepare消息的发送过程
在上一篇博客第III节的内容,其中在L262行将reqBatch
发送到节点0
的events
中。为方便查看,相关代码再次附在下面。该发送过程是借助于net
变量实现的。节点0
基于该reqBatch
构建Preprepare
消息,并将后者广播给其他节点。 本小节即分析节点0
是如何广播Preprepare
消息的。 1
2
3
4
5
6// [pbft-core_test.go] TestNetwork
func TestNetwork(t *testing.T) {
...
net.pbftEndpoints[0].manager.Queue() <- reqBatch // L262
...
}makePBFTNetwork
函数。实际上,在makePBFTNetwork
函数的endpointFunc
函数中,每个节点启动了一个协程go rountine
,来读取对events
通道中消息。相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// [pbft-core_mock_test.go] makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork { // L151
...
endpointFunc := func(id uint64, net *testnet) endpoint { // L158
tep := makeTestEndpoint(id, net)
pe := &pbftEndpoint{
testEndpoint: tep,
manager: events.NewManagerImpl(),
}
...
pe.manager.SetReceiver(pe.pbft) // L170
...
pe.manager.Start()
...
}
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start
func (em *managerImpl) Start() {
go em.eventLoop()
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}节点0
)内部维护了一个manager
变量,后者的Start
函数中启动了一个协程运行eventLoop
函数。在eventLoop
函数内部轮询events
通道,并将读取到的消息发送到Inject
函数中。Inject
函数的代码和其他相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil {
SendEvent(em.receiver, event)
}
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent
func SendEvent(receiver Receiver, event Event) {
next := event
for {
// If an event returns something non-nil, then process it as a new event
next = receiver.ProcessEvent(next) // L113
if next == nil {
break
}
}
}ProcessEvent
函数来处理从events
通道中读取到的消息,如SendEvent
中的L113行代码所示。此处的for
循环主要是考虑到消息嵌套的场景:若ProcessEvent
处理完一条消息后的结果仍然是一条消息,将继续调用ProcessEvent
函数处理。我们这里不出现该场景,因而先不讨论for
循环的问题。 SendEvent
函数中的Receiver
参数是一个接口。回顾makePBFTNetwork
函数中的L170行,其将该接口变量复制为pe.pbft
(这是一个pbftCore
类型变量)。因此,此处调用的ProcessEvent
函数的代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *RequestBatch:
err = instance.recvRequestBatch(et)
...
}
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch
func (instance *pbftCore) recvRequestBatch(reqBatch *RequestBatch) error {
...
if instance.primary(instance.view) == instance.id && instance.activeView {
instance.nullRequestTimer.Stop()
instance.sendPrePrepare(reqBatch, digest) // L623
}...
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare
func (instance *pbftCore) sendPrePrepare(reqBatch *RequestBatch, digest string) {
...
preprep := &PrePrepare{
...
}
...
instance.innerBroadcast(&Message{Payload: &Message_PrePrepare{PrePrepare: preprep}}) // L667
...
}ProcessEvent
函数进一步调用recvRequestBatch
函数,后者又进一步调用sendPrePrepare
函数。sendPrePrepare
中生成了一个PrePrepare
类型的变量prepare
,并以prepare
变量为参数调用innerBroadcast
函数。innerBroadcast
函数的代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast
func (instance *pbftCore) innerBroadcast(msg *Message) error {
...
doByzantine := false
...
if doByzantine {
...
} else {
instance.consumer.broadcast(msgRaw) // L1313
}
...
}innerBroadcast
函数中定义了一个doByzantine
变量,该变量决定了当前节点是否进行Byzantine
攻击行为。此处,我们假设其不会进行Byzantine
攻击,因而进入到L1313行调用broadcast
函数。 L1313行中的consumer
的类型也是一个接口。回顾makePBFTNetwork
函数的定义(代码如下所示),该函数在L165行定义了一个simpleConsumer
类型的变量,将该变量赋值给pe
的sc
字段,并将后者作为参数传入到newPbftCore
函数中。newPbftCore
函数的代码也如下所示,由代码可知:newPbftCore
函数将pe.sc/simpleConsumer
类型变量赋值给了instance
的consumer
字段。 因而,innerBroadcast
函数中对broadcast
函数的调用,实际上是调用simpleConsumer
类型的broadcast
函数。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// [pbft-core_mock_test.go] makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork { // L151
...
endpointFunc := func(id uint64, net *testnet) endpoint {
...
pe.sc = &simpleConsumer{ // L165
pe: pe,
}
pe.pbft = newPbftCore(id, config, pe.sc, events.NewTimerFactoryImpl(pe.manager)) // L169
...
}
...
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [pbft-core.go] newPbftCore
func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events.TimerFactory) *pbftCore {
...
instance.consumer = consumer
...
}simpleConsumer
类型的broadcast
函数的相关代码如下所示。沿着函数调用的链路可知,节点0
最终调用了internalQueueMessage
函数。后者将tm
消息传入通道变量queue
中。由broadcastFilter
函数的L149行代码可知,变量queue
实际上是net.msgs
。因而,broadcast
函数最终将消息放到了net
的msgs
通道中。 此外,在L149行的taggedMsg
消息构造时,传入-1
作为参数,该参数将赋值给taggedMsg
消息的dst
字段。该参数表明当前消息传输以广播形式进行。该参数在后面还会提及。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast
func (sc *simpleConsumer) broadcast(msgPayload []byte) {
sc.pe.Broadcast(&pb.Message{Payload: msgPayload}, pb.PeerEndpoint_VALIDATOR)
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast
func (ep *testEndpoint) Broadcast(msg *pb.Message, peerType pb.PeerEndpoint_Type) error {
ep.net.broadcastFilter(ep, msg.Payload)
return nil
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast -> broadcastFilter
func (net *testnet) broadcastFilter(ep *testEndpoint, payload []byte) {
...
if payload != nil {
...
internalQueueMessage(net.msgs, taggedMsg{int(ep.id), -1, payload}) // L149
...
}...
}
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast -> broadcastFilter -> internalQueueMessage
func internalQueueMessage(queue chan<- taggedMsg, tm taggedMsg) {
select {
case queue <- tm:
...
}
}Preprepare
消息的广播还是依靠net实例来完成。
前面介绍了消息是如何一步步传入到net.msgs
通道中,那么必然有一个地方实现了将消息从net.msgs
通道中读出来。 此时,我们重新回到TestNetwork
函数,其相关代码摘录如下: 1
2
3
4
5
6// [pbft-core_test.go]
func TestNetwork(t *testing.T) {
...
err := net.process() // L264
...
}TestNetwork
函数在L264行调用了process
函数,该函数通过轮询的方式反复从net
实例的msgs
通道中读取消息。相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// [mock_network_test.go] process
func (net *testnet) process() error {
...
for {
select {
case msg, ok := <-net.msgs: // L221
...
if !net.processMessageFromChannel(msg, ok) { // L224
...
}
...
default: // L231
...
select { // L248
case msg, ok := <-net.msgs: // L249
...
if !net.processMessageFromChannel(msg, ok) { // L251
...
}
...
}
}
}
}process
函数中L221行的case
和L231行的dafult case
很相像,可以说后者的代码包含了前者的代码。这样做的目的是default
用于检测当前测试已经完成,从而优雅地从当前process
函数中返回。关于default
中如何检测当前测试是否已经完成的代码,留到IV.F小节进行介绍。 回到节点0
对Preprepare
消息的广播,由于在process函数刚调用时,net.msgs
通道中没有消息,因而节点0
此时进入default case
执行。其在L248行又一次使用select
从msgs
通道中获取消息。由于该select
中未定义default case
,因而其一定会阻塞在L249行(或发生100毫秒的超时,超时的相关代码被省略了),直到从msgs
通道中获取到消息msg
,继而将msg
作为参数调用processMessageFromChannel
函数,如L251行代码所示。processMessageFromChannel
函数代码及相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// [mock_network_test.go] process -> processMessageFromChannel
func (net *testnet) processMessageFromChannel(msg taggedMsg, ok bool) bool {
...
net.deliverFilter(msg)
return true
}
// [mock_network_test.go] proces -> processMessageFromChannel -> deliverFilter
func (net *testnet) deliverFilter(msg taggedMsg) { // L156
...
senderHandle := net.endpoints[msg.src].getHandle() // L158
if msg.dst == -1 { // L159
...
for id, ep := range net.endpoints { // L163
...
lep := ep
go func() { // L167
...
lep.deliver(payload, senderHandle) // L184
...
}
}
}
}processMessageFromChannel
函数进一步调用了deliverFilter
函数。 deliverFilter
函数主要实现了对消息msg
的发送。前面我们提到,当msg
中的dst
值是-1
时,表示该消息将被广播到其他的所有节点,如L159行所示。L163行对所有的节点进行遍历,针对每个节点启动一个go rountine
,如L167行所示。 需要注意的是,每个ep/lep
都对应着一个节点的endpoint
。接下来便是借助于这些endpoint
,将消息发送给对应的节点。 回到L184行的代码。该go routine
中调用deliver
函数实现msg
的发送。该函数除了传入payload
参数之外,还传入了senderHandle
参数。 senderHandle
变量在L158行定义,该行代码调用了endpoint
中的getHandle
函数。getHandle
函数在testEndpoint
实现类中的定义如下所示: 1
2
3
4// [mock_network_test.go] process -> processMessageFromChannel -> deliverFilter -> getHandle
func (ep *testEndpoint) getHandle() *pb.PeerID {
return &pb.PeerID{Name: fmt.Sprintf("vp%d", ep.id)}
}getHandle
函数只是生成了一个PeerID
类型的变量,后者其实是对字符串进行了一次封装。 下面回到deliverFilter
函数中的L184行代码中的deliver
函数。deliver
其实是接口endpoint
中定义的一个函数,这里lep
变量的实际类型是pbftEndpoint
。 因而,该函数在pbftEndpoint
实现类中的定义如下所示: 1
2
3
4
5
6// [mock_network_test.go] process -> processMessageFromChannel -> deliverFilter -> deliver
func (pe *pbftEndpoint) deliver(msgPayload []byte, senderHandle *pb.PeerID) {
senderID, _ := getValidatorID(senderHandle) // L36
...
pe.manager.Queue() <- &pbftMessage{msg: msg, sender: senderID} // L44
}getValidatorID
函数获取当前消息传送的发起者ID
。然后构建pbftMessage
消息,并在L44行将该消息发送到特定节点的events
变量中。
综上所述,节点0
发送Preprepare
的流程大致可概括如下:1)节点0
将Preprepare
消息通过net
的msgs
通道发送给net
实例,2)然后net
实例基于其维护的各个节点的endpoint
,将Preprepare
消息发送到每个节点的events
通道中。