0%

Fabric中PBFT源码解读(4)

本篇博客是PBFT代码解读的第四篇博客。

VII. Commit消息的接收以及后续执行

本系列的第一篇博客介绍了源代码的获取和测试用例的选择;第二篇博客介绍了Primary节点对repBatch的处理,以及对PrePrepare消息的发送;第三篇博客介绍了Preprepare消息的接收以及Prepare消息的发送、Prepare消息的接收以及Commit消息的发送。 强烈建议读者先阅读上述三篇博客,再来阅读当前这篇。

为方便读者查阅,本系列博客的链接整理如下:

第三篇博客的第VI节介绍了节点2中对Commit消息的发送,本节介绍节点(以节点3为例)对Commit消息的接收,基于接收到的Commit消息判断是否进入了Committed状态,并决定是否执行reqBatch请求。 结合IV.B小节和第VI节的内容可知,节点2中的Commit消息最终是发送到了节点3events通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop函数。为唤起读者的记忆,eventLoop函数的代码再次摘录如下。

1
2
3
4
5
6
7
8
9
10
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}
eventLoop函数进一步调用Inject函数、SendEvent函数、pbftCore.ProcessEvent函数。以上都和节点0接收reqBatch消息(节点1接收PrePrepare消息)时的流程相同,以下从pbftCore.ProcessEvent函数开始介绍。
1
2
3
4
5
6
7
8
9
10
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *Commit: // L346
err = instance.recvCommit(et)
...
}
}
ProcessEvent函数中的switch判断为Commit case,继而调用recvCommit函数,如L346行所示。recvCommit函数代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// [pbft-core.go] ProcessEvent -> recvCommit
func (instance *pbftCore) recvCommit(commit *Commit) error {
...
cert := instance.getCert(commit.View, commit.SequenceNumber)
...
cert.commit = append(cert.commit, commit)

if instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber) { // L850
...
instance.executeOutstanding() // L855
...
}
...
}

// [pbft-core.go] ProcessEvent -> recvCommit -> committed
func (instance *pbftCore) committed(digest string, v uint64, n uint64) bool {
...
return quorum >= instance.intersectionQuorum() // L542
}
recvCommit函数和第三篇博客的第VI节中的maybeSendCommit函数加上recvPrepare函数的代码很相似。其首先对当前是否进入了committed状态做一个判断(L850行),然后基于收到的消息调用executeOutstanding函数进行后续操作(L855行)。 这里需要注意的是committed函数的实现,和prepared的不同。committed函数中要求收到的commit消息数量达到2f+1,而前面的prepared函数中之要求收到的prepare数量达到2f。 此外,在发送/接收commit消息时,是需要Primary节点也发送commit消息的,且primary节点发出的commit消息也会被正常计数;而在发送/接收prepare消息时,不需要primary节点发送prepare消息,即使其发送了,在计数时也会被忽略。

L855行的executeOutstanding函数内部又调用了executeOne函数,后者主要调用了simpleConsumer.execute函数。simpleConsumer.execute函数的代码实现如下所示。该函数的主体部分也比较简单,对executions变量做了一次递增;然后构造了一个execDoneEvent类型的消息,并将其输入到events通道中。

1
2
3
4
5
6
7
8
9
// [pbft-core.go] ProcessEvent -> recvCommit
func (sc *simpleConsumer) execute(seqNo uint64, reqBatch *RequestBatch) {
for _, req := range reqBatch.GetBatch() {
...
sc.executions++
...
go func() { sc.pe.manager.Queue() <- execDoneEvent{} }()
}
}
前面讲到很多次,events中消息的处理是由pbftCore.ProcessEvent完成的,对应于execDoneEvent的部分如下所示。这里只是对执行再次做了一下同步检查,并对可能正在处理的view切换过程进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
...
return instance.processNewView()
...
}
...
}

VIII. 测试的优雅结束

运行一个测试用例,观察其如何优雅的结束也是非常有趣的。本节将介绍TestNetwork这个测试用例是如何结束的,这对于我们自己写测试用例也是非常有帮助的。 回顾第二篇博客中IV.B小节介绍的testnet.process函数,其代码主体如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// [mock_network_test.go] process
func (net *testnet) process() error {
retry := true
...
for {
...
select {
case msg, ok := <-net.msgs: // L221
...
retry = true
...
default: // L231
if !retry { // L232
return nil
}
var busy []int // L236
for i, ep := range net.endpoints {
if ep.isBusy() {
busy = append(busy, i)
}
}
if len(busy) == 0 {
retry = false // L243
continue
} // L245
...
select {
case msg, ok := <-net.msgs: // L249
...
retry = true
...
continue
case <-time.After(100 * time.Millisecond):
continue
}
}
}
}
一般而言,在PBFT流程正常运行时,会进入到L221行执行。但在测试刚启动时,由于节点0还没来得及将消息放入到net.msgs管道中,因而第一次是会进入到L231行执行。如IV.B小节所述,此时会阻塞在L249行,以等待节点0将消息放入到net.msgs管道中。待节点0net.msgs管道中接收到消息,将进行下一次循环。

PBFT流程执行到结束时,会进入到L231行执行。L236行到L245行会对当前的PBFT节点状态进行判断,如果所有的节点都不是busy,那么retry值会被赋值为false(L243行)。然后执行continue语句进入到下一次循环。此时,还是会进入到default case,并通过L232行的判断语句返回。 这里比较巧妙的是,只有连续两次进入到default case时(L231行),且前一次的retry赋值为false,当前的process函数才会返回。 为什么是连续两次呢?可以看到,只要进入到L221行,retry就会被重置为true

为检验我们对于process函数的分析是否正确,笔者试着将default case的代码删掉。发现PBFT流程还是可以正常运行,但无法从当前测试用例中结束。这便验证了我们的分析。

process函数返回到TestNetwork函数中,后者的相关代码摘录如下。容易看出,TestNetwork对各个节点对reqBatch的执行情况进行了检查,若检查通过,则优雅地结束当前测试用例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// [pbft-core_test.go]
func TestNetwork(t *testing.T) {
...
err := net.process()
...
for _, pep := range net.pbftEndpoints {
if pep.sc.executions <= 0 {
t.Errorf("...")
continue
}
if pep.sc.executions != 1 {
t.Errorf("...")
continue
}
if !reflect.DeepEqual(pep.sc.lastExecution, hash(reqBatch.GetBatch()[0])) {
t.Errorf("...")
}
}
}