本篇博客是PBFT
代码解读的第四篇博客。
VII. Commit
消息的接收以及后续执行
本系列的第一篇博客介绍了源代码的获取和测试用例的选择;第二篇博客介绍了Primary
节点对repBatch
的处理,以及对PrePrepare
消息的发送;第三篇博客介绍了Preprepare
消息的接收以及Prepare
消息的发送、Prepare
消息的接收以及Commit
消息的发送。 强烈建议读者先阅读上述三篇博客,再来阅读当前这篇。
为方便读者查阅,本系列博客的链接整理如下:
第三篇博客的第VI节介绍了节点2
中对Commit
消息的发送,本节介绍节点(以节点3
为例)对Commit
消息的接收,基于接收到的Commit
消息判断是否进入了Committed
状态,并决定是否执行reqBatch
请求。 结合IV.B小节和第VI节的内容可知,节点2
中的Commit
消息最终是发送到了节点3
的events
通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop
函数。为唤起读者的记忆,eventLoop
函数的代码再次摘录如下。 1
2
3
4
5
6
7
8
9
10// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}eventLoop
函数进一步调用Inject
函数、SendEvent
函数、pbftCore.ProcessEvent
函数。以上都和节点0
接收reqBatch
消息(节点1
接收PrePrepare
消息)时的流程相同,以下从pbftCore.ProcessEvent
函数开始介绍。 1
2
3
4
5
6
7
8
9
10// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *Commit: // L346
err = instance.recvCommit(et)
...
}
}ProcessEvent
函数中的switch
判断为Commit case
,继而调用recvCommit
函数,如L346行所示。recvCommit
函数代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// [pbft-core.go] ProcessEvent -> recvCommit
func (instance *pbftCore) recvCommit(commit *Commit) error {
...
cert := instance.getCert(commit.View, commit.SequenceNumber)
...
cert.commit = append(cert.commit, commit)
if instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber) { // L850
...
instance.executeOutstanding() // L855
...
}
...
}
// [pbft-core.go] ProcessEvent -> recvCommit -> committed
func (instance *pbftCore) committed(digest string, v uint64, n uint64) bool {
...
return quorum >= instance.intersectionQuorum() // L542
}recvCommit
函数和第三篇博客的第VI节中的maybeSendCommit
函数加上recvPrepare
函数的代码很相似。其首先对当前是否进入了committed
状态做一个判断(L850行),然后基于收到的消息调用executeOutstanding
函数进行后续操作(L855行)。 这里需要注意的是committed
函数的实现,和prepared
的不同。committed
函数中要求收到的commit
消息数量达到2f+1,而前面的prepared
函数中之要求收到的prepare
数量达到2f。 此外,在发送/接收commit
消息时,是需要Primary
节点也发送commit
消息的,且primary
节点发出的commit
消息也会被正常计数;而在发送/接收prepare
消息时,不需要primary
节点发送prepare
消息,即使其发送了,在计数时也会被忽略。
L855行的executeOutstanding
函数内部又调用了executeOne
函数,后者主要调用了simpleConsumer.execute
函数。simpleConsumer.execute
函数的代码实现如下所示。该函数的主体部分也比较简单,对executions
变量做了一次递增;然后构造了一个execDoneEvent
类型的消息,并将其输入到events
通道中。 1
2
3
4
5
6
7
8
9// [pbft-core.go] ProcessEvent -> recvCommit
func (sc *simpleConsumer) execute(seqNo uint64, reqBatch *RequestBatch) {
for _, req := range reqBatch.GetBatch() {
...
sc.executions++
...
go func() { sc.pe.manager.Queue() <- execDoneEvent{} }()
}
}events
中消息的处理是由pbftCore.ProcessEvent
完成的,对应于execDoneEvent
的部分如下所示。这里只是对执行再次做了一下同步检查,并对可能正在处理的view
切换过程进行处理。 1
2
3
4
5
6
7
8
9
10
11
12// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
...
return instance.processNewView()
...
}
...
}
VIII. 测试的优雅结束
运行一个测试用例,观察其如何优雅的结束也是非常有趣的。本节将介绍TestNetwork
这个测试用例是如何结束的,这对于我们自己写测试用例也是非常有帮助的。 回顾第二篇博客中IV.B小节介绍的testnet.process
函数,其代码主体如下所示。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38// [mock_network_test.go] process
func (net *testnet) process() error {
retry := true
...
for {
...
select {
case msg, ok := <-net.msgs: // L221
...
retry = true
...
default: // L231
if !retry { // L232
return nil
}
var busy []int // L236
for i, ep := range net.endpoints {
if ep.isBusy() {
busy = append(busy, i)
}
}
if len(busy) == 0 {
retry = false // L243
continue
} // L245
...
select {
case msg, ok := <-net.msgs: // L249
...
retry = true
...
continue
case <-time.After(100 * time.Millisecond):
continue
}
}
}
}PBFT
流程正常运行时,会进入到L221行执行。但在测试刚启动时,由于节点0
还没来得及将消息放入到net.msgs
管道中,因而第一次是会进入到L231行执行。如IV.B小节所述,此时会阻塞在L249行,以等待节点0
将消息放入到net.msgs
管道中。待节点0
从net.msgs
管道中接收到消息,将进行下一次循环。
当PBFT
流程执行到结束时,会进入到L231行执行。L236行到L245行会对当前的PBFT
节点状态进行判断,如果所有的节点都不是busy
,那么retry
值会被赋值为false
(L243行)。然后执行continue
语句进入到下一次循环。此时,还是会进入到default case
,并通过L232行的判断语句返回。 这里比较巧妙的是,只有连续两次进入到default case
时(L231行),且前一次的retry
赋值为false
,当前的process
函数才会返回。 为什么是连续两次呢?可以看到,只要进入到L221行,retry
就会被重置为true
。
为检验我们对于process
函数的分析是否正确,笔者试着将default case
的代码删掉。发现PBFT
流程还是可以正常运行,但无法从当前测试用例中结束。这便验证了我们的分析。
从process
函数返回到TestNetwork
函数中,后者的相关代码摘录如下。容易看出,TestNetwork
对各个节点对reqBatch
的执行情况进行了检查,若检查通过,则优雅地结束当前测试用例。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// [pbft-core_test.go]
func TestNetwork(t *testing.T) {
...
err := net.process()
...
for _, pep := range net.pbftEndpoints {
if pep.sc.executions <= 0 {
t.Errorf("...")
continue
}
if pep.sc.executions != 1 {
t.Errorf("...")
continue
}
if !reflect.DeepEqual(pep.sc.lastExecution, hash(reqBatch.GetBatch()[0])) {
t.Errorf("...")
}
}
}