本篇博客是PBFT
代码解读的第五篇博客。
IX. 节点对reqBatch的执行流程
本系列的第一篇博客介绍了源代码的获取和测试用例的选择;第二篇博客介绍了Primary
节点对repBatch
的处理,以及对PrePrepare
消息的发送;第三篇博客介绍了Preprepare
消息的接收以及Prepare
消息的发送、Prepare
消息的接收以及Commit
消息的发送。第四篇博客介绍了Commit消息的接收以及测试用例的结束。 强烈建议读者先阅读上述四篇博客,再来阅读当前这篇。
为方便读者查阅,本系列博客的链接整理如下:
在第四篇博客中第VII节的最后,我们介绍到节点在接收到execDoneEvent
类型的Event
后,将调用instance.execDoneSync()
对该event
对应的reqBatch
进行执行。为方便后面的表述,我们假设reqBatch
代表了一批交易。 为方便读者理解,我们将相关代码再次摘录如下: 1
2
3
4
5
6
7
8
9
10// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
...
}
...
}execDoneSync
函数,后者的代码主体如下所示: 1
2
3
4
5
6
7
8
9
10
11// [pbft-core.go] ProcessEvent->execDoneSync
func (instance *pbftCore) execDoneSync() {
if instance.currentExec != nil { // L993
...
instance.lastExec = *instance.currentExec
...
} ...
instance.currentExec = nil // L1005
instance.executeOutstanding() // L1007
}execDoneSync
函数的主要功能是调用了executeOutstanding
函数(L1007行),后者实现了交易的真正执行。这里需要解释一下的是L993行到L1005行的代码。这些代码主要是对上一次的交易执行进行一个收尾工作,包括将lastExec
字段赋值为currentExec
,然后将currentExec
置为nil
。这也是为什么当前函数命名为execDoneSync
,它实现了对上一次执行的“同步/收尾”。 我们继续看executeOutstanding
函数的代码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// [pbft-core.go] ProcessEvent->execDoneSync->executeOutstanding
func (instance *pbftCore) executeOutstanding() {
if instance.currentExec != nil { // L913
...
return
}
...
for idx := range instance.certStore { // L919
if instance.executeOne(idx) { // L920
break // L921
}
} // L923
...
}
type pbftCore struct {
...
certStore map[msgID]*msgCert
...
}
type msgCert struct {
digest string
prePrepare *PrePrepare
sentPrepare bool
prepare []*Prepare
sentCommit bool
commit []*Commit
}executeOutstanding
函数的L913行首先判断了前面的“收尾工作”是否正确完成了,没有的话就直接返回了。L919行至L923行对certStore
中的键值对进行遍历。certStore
是一个map[msgID]*msgCert
类型的变量,msgCert
类型保存了一次PBFT
共识的所有prePrepare
、prepare
、commit
等消息。因此,certStore
实际上保存了是所有的共识数据。每一条共识数据都对应了一条repBatch
。 回到executeOutstanding
函数的L919行,其将certStore
中的所有共识数据进行遍历,针对每一条共识数据执行executeOne
函数(L920行)。
这里,可能有读者会问,我们前面不是针对某一个特定的reqBatch
去执行交易的吗?不就只有这一个repBatch
吗?为什么这里又要遍历所有的共识数据呢? 这是因为不同repBatch
的共识完成时间可能是乱序的。比如说当前完成共识的repBatch
为repBatch_1
,但可能repBatch_2
先于repBatch_1
完成共识了。由于repBatch_2
依赖于repBatch_1
,必须等repBatch_1
执行后才能执行。那么repBatch_2
就只会先保存在certStore
中。等到当前的for
循环遍历时,才可能执行repBatch_2
。
此外,这里需要注意的是,当executeOne
函数的返回结果为true
时,当前循环就结束了,继而从executeOutstanding
函数、execDoneSync
函数中返回了。也就是说:executeOne
函数每次只执行一个reqBatch
。这也是为什么我在上一段中说“可能”执行repBatch_2
的原因,因为如果当前执行了repBatch_1
,repBatch_2
又得等到下一次才能执行。 那么,repBatch_2
到底何时才能执行,我们接下来就会介绍到。
executeOne
函数的主体代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// [pbft-core.go] ProcessEvent->execDoneSync->executeOutstanding->executeOne
func (instance *pbftCore) executeOne(idx msgID) bool {
cert := instance.certStore[idx] // L931
if idx.n != instance.lastExec+1 || cert == nil || cert.prePrepare == nil { // L933
return false
}
...
digest := cert.digest // L944
reqBatch := instance.reqBatchStore[digest] // L945
if !instance.committed(digest, idx.v, idx.n) { // L947
return false
}
...
currentExec := idx.n
instance.currentExec = ¤tExec // L953
...
if digest == "" { // L956
...
instance.execDoneSync()
} else {
logger.Infof("...")
instance.consumer.execute(idx.n, reqBatch) // L964
}
}executeOne
函数首先从certStore
中取出cert
(L931行)。 L933行进行了三项检查:
- 当前要执行的
repBatch
的编号必须正好是已执行的repBatch
的编号加1,也即需要保证:按序执行 - 共识数据
cert
必须是存在的 prePrepare
数据也必须是存在的
L944行和L945获取将要执行的repBatch
的数据。 L947行又进行了一次检查。这是因为instance.certStore
中是保存了所有的共识数据,包括已经完成共识的和正在进行共识的。而我们前面提到的for
循环只是简单的将instance.certStore
中的所有共识数据进行遍历。因此,对于每一条数据都应该检查一下其是否已经完成了共识,即达到了committed
状态。
L953行将currentExec
设置为idx.n
。后面在执行完成后将currentExec
再次置为nil
需要等到下次execDoneSync
函数执行时,这一点在前面也已经进行了解释。
L956行对空请求进行处理,L960行对非空请求进行处理。
- 我们先来看非空请求的处理。可以看到,其再次调用了
execute
函数,即回到了上一篇博客中第VII节的最后部分。也就是说,在执行了一次repBatch
的最后,又发起了一次执行repBatch
的调用。这也就回答了前面的一个问题“repBatch_2
到底何时才能执行”:在repBatch_1
执行结束后,再次调用execute
函数,来执行repBatch_2
。 - 空请求的处理,主要是直接调用了
execDoneSync
函数。与非空请求的处理进行对比,这里省去了通过managerImpl.events
通道进行消息转发的过程。这是因为空请求不涉及到对区块链状态的改变,因而可以简化调用流程。
看到这里,读者可能会有一个最后的疑问:说好的repBatch
的执行呢?调用来调用去,没见到具体的代码执行过程啊。 没错!这是因为我们这里分析的是一个测试用例,在该测试用例中定义了一个simpleConsumer
类来模拟代码执行。该simpleConsumer
类我们在前面几篇博客中已经反复提及到了。真实的PBFT
集群运行时,是使用obcBatch
类及其方法来执行代码的。obcBatch.execute
函数的相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// [batch.go] execute
func (op *obcBatch) execute(seqNo uint64, reqBatch *RequestBatch) {
var txs []*pb.Transaction
for _, req := range reqBatch.GetBatch() {
tx := &pb.Transaction{}
if err := proto.Unmarshal(req.Payload, tx); err != nil {
...
}
...
txs = append(txs, tx)
op.deduplicator.Execute(req)
}
...
op.stack.Execute(meta, txs) // L213
}obcBatch.execute
函数将reqBatch
中每一条交易反序列化到txs
中,并在L213行执行这些交易。