0%

Fabric中PBFT源码解读(5)

本篇博客是PBFT代码解读的第五篇博客。

IX. 节点对reqBatch的执行流程

本系列的第一篇博客介绍了源代码的获取和测试用例的选择;第二篇博客介绍了Primary节点对repBatch的处理,以及对PrePrepare消息的发送;第三篇博客介绍了Preprepare消息的接收以及Prepare消息的发送、Prepare消息的接收以及Commit消息的发送。第四篇博客介绍了Commit消息的接收以及测试用例的结束。 强烈建议读者先阅读上述四篇博客,再来阅读当前这篇。

为方便读者查阅,本系列博客的链接整理如下:

第四篇博客中第VII节的最后,我们介绍到节点在接收到execDoneEvent类型的Event后,将调用instance.execDoneSync()对该event对应的reqBatch进行执行。为方便后面的表述,我们假设reqBatch代表了一批交易。 为方便读者理解,我们将相关代码再次摘录如下:

1
2
3
4
5
6
7
8
9
10
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
...
}
...
}
容易看到,这里主要是调用了execDoneSync函数,后者的代码主体如下所示:
1
2
3
4
5
6
7
8
9
10
11
// [pbft-core.go] ProcessEvent->execDoneSync
func (instance *pbftCore) execDoneSync() {
if instance.currentExec != nil { // L993
...
instance.lastExec = *instance.currentExec
...
} ...
instance.currentExec = nil // L1005

instance.executeOutstanding() // L1007
}
execDoneSync函数的主要功能是调用了executeOutstanding函数(L1007行),后者实现了交易的真正执行。这里需要解释一下的是L993行到L1005行的代码。这些代码主要是对上一次的交易执行进行一个收尾工作,包括将lastExec字段赋值为currentExec,然后将currentExec置为nil。这也是为什么当前函数命名为execDoneSync,它实现了对上一次执行的“同步/收尾”。 我们继续看executeOutstanding函数的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// [pbft-core.go] ProcessEvent->execDoneSync->executeOutstanding
func (instance *pbftCore) executeOutstanding() {
if instance.currentExec != nil { // L913
...
return
}
...
for idx := range instance.certStore { // L919
if instance.executeOne(idx) { // L920
break // L921
}
} // L923
...
}

type pbftCore struct {
...
certStore map[msgID]*msgCert
...
}

type msgCert struct {
digest string
prePrepare *PrePrepare
sentPrepare bool
prepare []*Prepare
sentCommit bool
commit []*Commit
}
executeOutstanding函数的L913行首先判断了前面的“收尾工作”是否正确完成了,没有的话就直接返回了。L919行至L923行对certStore中的键值对进行遍历。certStore是一个map[msgID]*msgCert类型的变量,msgCert类型保存了一次PBFT共识的所有prePreparepreparecommit等消息。因此,certStore实际上保存了是所有的共识数据。每一条共识数据都对应了一条repBatch。 回到executeOutstanding函数的L919行,其将certStore中的所有共识数据进行遍历,针对每一条共识数据执行executeOne函数(L920行)。

这里,可能有读者会问,我们前面不是针对某一个特定的reqBatch去执行交易的吗?不就只有这一个repBatch吗?为什么这里又要遍历所有的共识数据呢? 这是因为不同repBatch的共识完成时间可能是乱序的。比如说当前完成共识的repBatchrepBatch_1,但可能repBatch_2先于repBatch_1完成共识了。由于repBatch_2依赖于repBatch_1,必须等repBatch_1执行后才能执行。那么repBatch_2就只会先保存在certStore中。等到当前的for循环遍历时,才可能执行repBatch_2

此外,这里需要注意的是,当executeOne函数的返回结果为true时,当前循环就结束了,继而从executeOutstanding函数、execDoneSync函数中返回了。也就是说:executeOne函数每次只执行一个reqBatch。这也是为什么我在上一段中说“可能”执行repBatch_2的原因,因为如果当前执行了repBatch_1repBatch_2又得等到下一次才能执行。 那么,repBatch_2到底何时才能执行,我们接下来就会介绍到。

executeOne函数的主体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// [pbft-core.go] ProcessEvent->execDoneSync->executeOutstanding->executeOne
func (instance *pbftCore) executeOne(idx msgID) bool {
cert := instance.certStore[idx] // L931

if idx.n != instance.lastExec+1 || cert == nil || cert.prePrepare == nil { // L933
return false
}
...
digest := cert.digest // L944
reqBatch := instance.reqBatchStore[digest] // L945

if !instance.committed(digest, idx.v, idx.n) { // L947
return false
}
...
currentExec := idx.n
instance.currentExec = &currentExec // L953
...
if digest == "" { // L956
...
instance.execDoneSync()
} else {
logger.Infof("...")
instance.consumer.execute(idx.n, reqBatch) // L964
}
}
executeOne函数首先从certStore中取出cert(L931行)。 L933行进行了三项检查:

  • 当前要执行的repBatch的编号必须正好是已执行的repBatch的编号加1,也即需要保证:按序执行
  • 共识数据cert必须是存在的
  • prePrepare数据也必须是存在的

L944行和L945获取将要执行的repBatch的数据。 L947行又进行了一次检查。这是因为instance.certStore中是保存了所有的共识数据,包括已经完成共识的和正在进行共识的。而我们前面提到的for循环只是简单的将instance.certStore中的所有共识数据进行遍历。因此,对于每一条数据都应该检查一下其是否已经完成了共识,即达到了committed状态。

L953行将currentExec设置为idx.n。后面在执行完成后将currentExec再次置为nil需要等到下次execDoneSync函数执行时,这一点在前面也已经进行了解释。

L956行对空请求进行处理,L960行对非空请求进行处理。

  • 我们先来看非空请求的处理。可以看到,其再次调用了execute函数,即回到了上一篇博客中第VII节的最后部分。也就是说,在执行了一次repBatch的最后,又发起了一次执行repBatch的调用。这也就回答了前面的一个问题“repBatch_2到底何时才能执行”:在repBatch_1执行结束后,再次调用execute函数,来执行repBatch_2
  • 空请求的处理,主要是直接调用了execDoneSync函数。与非空请求的处理进行对比,这里省去了通过managerImpl.events通道进行消息转发的过程。这是因为空请求不涉及到对区块链状态的改变,因而可以简化调用流程。

看到这里,读者可能会有一个最后的疑问:说好的repBatch的执行呢?调用来调用去,没见到具体的代码执行过程啊。 没错!这是因为我们这里分析的是一个测试用例,在该测试用例中定义了一个simpleConsumer类来模拟代码执行。该simpleConsumer类我们在前面几篇博客中已经反复提及到了。真实的PBFT集群运行时,是使用obcBatch类及其方法来执行代码的。obcBatch.execute函数的相关代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// [batch.go] execute
func (op *obcBatch) execute(seqNo uint64, reqBatch *RequestBatch) {
var txs []*pb.Transaction
for _, req := range reqBatch.GetBatch() {
tx := &pb.Transaction{}
if err := proto.Unmarshal(req.Payload, tx); err != nil {
...
}
...
txs = append(txs, tx)
op.deduplicator.Execute(req)
}
...
op.stack.Execute(meta, txs) // L213
}
容易看出,obcBatch.execute函数将reqBatch中每一条交易反序列化到txs中,并在L213行执行这些交易。