0%

Fabric中PBFT源码解读——状态同步

本篇博客是继续对Hyperledger Fabric中的PBFT代码进行解读,重点关注状态同步机制的实现。

I.写在前面

A. 前置阅读

本篇博客是建立在之前一些博客的基础上。一切重复的知识介绍,笔者在这篇博客中就不再赘述了。 因此,强烈建议读者先去阅读一下以下这几篇博客。

B. instance.skipInProgress字段

在前面一篇博客Fabric中PBFT源码解读——Checkpoint机制中,我们介绍到在weakCheckpointSetOutOfRange函数中,若发现当前节点的状态数据落后了,则将instance.skipInProgress字段设置为true。相关代码摘录如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// [pbft-core.go] recvCheckpoint -> weakCheckpointSetOutOfRange
func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *Checkpoint) bool {
...
if chkpt.SequenceNumber < H { // L1063
...
} else {
...
if len(instance.hChkpts) >= instance.f+1 { // L1073
...
if m := chkptSeqNumArray[len(chkptSeqNumArray)-(instance.f+1)]; m > H {
...
instance.skipInProgress = true // L1094
...
}
}
}
}
instance.skipInProgress字段主要作了一种标记:当前节点需要进行状态同步。在后面的合适时机,节点将基于该标记真正发起状态同步操作。 本篇博客将介绍Fabric PBFT代码中与状态同步相关的代码。

II. 标记需要进行“状态同步”

如上一小节所述,节点的状态同步分为了两个步骤:(1)通过instance.skipInProgress字段标记需要进行“状态同步”;(2)在合适的时机,检查instance.skipInProgress字段,若为true,则启动状态同步操作。 除了上一小节中所说的weakCheckpointSetOutOfRange函数可能会对skipInProgress字段进行标记,另外一个地方也可能会对该字段进行标记,如下所示:

1
2
3
4
5
6
7
8
9
10
// [pbft-core.go] stateTransfer
func (instance *pbftCore) stateTransfer(optional *stateUpdateTarget) {
if !instance.skipInProgress {
...
instance.skipInProgress = true
...
}

instance.retryStateTransfer(optional)
}
可以看到stateTransfer只是对retryStateTransfer函数做了一次封装。后面我们会介绍到:启动状态同步的真实操作就是通过调用retryStateTransfer函数实现的。因此,对于stateTransfer函数中的操作,我们可以理解为:其在一个函数中同时完成了节点状态同步的两个步骤,从而可以立即启动节点状态的同步。

III. 启动“状态同步”操作

启动“状态同步”操作的时机主要有两个:一是在ProcessEvent函数中,二是在witnessCheckpointWeakCert函数中。相关代码分别如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
if instance.skipInProgress {
instance.retryStateTransfer(nil)
}
...
}
...
}

// [pbft-core.go] witnessCheckpointWeakCert
func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *Checkpoint) {
...
if instance.skipInProgress {
...
instance.retryStateTransfer(target)
}
ProcessEvent函数中,每当一个请求执行完毕后,就会判断一下是不是需要进行状态同步。witnessCheckpointWeakCert函数是在recvCheckpoint函数中检测到有f+1个相同的Checkpoint消息时(也即表明此Checkpoint大概率是稳定的Checkpoint)被调用。witnessCheckpointWeakCert函数首先构建一个“状态同步”的目标target,然后判断一下是不是需要进行状态同步。

容易看出,无论是哪一种情形,都是通过调用retryStateTransfer函数进行状态同步的。

另一方面,调用retryStateTransfer函数的代码还包括以下两处:1)ProcessEvent函数的stateUpdatedEvent case 和 2)stateTransfer函数。stateTransfer函数的相关代码已经在第II节中介绍了,ProcessEvent函数的相关代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case stateUpdatedEvent:
...
if et.target == nil || update.seqNo < instance.h {
...
if instance.highStateTarget == nil {
...
} else if update.seqNo < instance.highStateTarget.seqNo {
...
instance.retryStateTransfer(nil)
}...
instance.skipInProgress = false
...
}
}
...
}
后面会介绍到:当一次状态更新操作完成后,会发出一个stateUpdatedEvent。相应地,在ProcessEvent中处理该类型event时,将会再次检查当前的low-water mark和已完成的状态更新操作的seqNo。若后者小于前者,则说明需要再次进行状态更新操作,也即调用retryStateTransfer函数。否则,将skipInProgress字段置为false

综上,调用retryStateTransfer函数进行状态同步的时机包括四个:1)ProcessEvent函数的execDoneEvent case;2)ProcessEvent函数的stateUpdatedEvent case;3)witnessCheckpointWeakCert函数;4)stateTransfer函数。

IV. retryStateTransfer函数的实现细节

retryStateTransfer函数的主体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// [pbft-core.go] retryStateTransfer
func (instance *pbftCore) retryStateTransfer(optional *stateUpdateTarget) {
if instance.currentExec != nil { // L886
logger.Debugf(...)
return
}

if instance.stateTransferring { // L891
logger.Debugf(...)
return
}

target := optional // L896
if target == nil {
if instance.highStateTarget == nil { // L898
...
return
}
target = instance.highStateTarget // L902
}

instance.stateTransferring = true // L905
...
instance.consumer.skipTo(target.seqNo, target.id, target.replicas) // L908
}

  1. retryStateTransfer函数首先会判断当前节点是否正在执行一个请求(L886行)。如果是,则停止状态更新,直接返回。
  2. L891行会判断当前节点是否正在进行状态更新,若是,也停止状态更新,直接返回。判断当前节点是否正在进行状态更新依赖于stateTransferring字段,该字段在L905行被赋值为true,表明正式进行状态更新。
  3. L896行定义的target变量,用于保存将要同步的目标状态。该变量受两个参数的影响,一个是retryStateTransfer函数传入的参数optional,另一个是instance保存的字段highStateTargethighStateTarget的优先级比optional高,因此优先将target变量赋值为highStateTarget值(L902行)。但若两者都为nil(L898行),则停止状态更新,直接返回。
  4. L908行调用skipTo函数进行状态的更新。skipTo函数其实是innerStack接口中定义的一个函数,依赖于使用PBFT模块进行共识的其他模块的实现。具体而言,在Fabric中共有三种innerStack接口的实现:1)consensus/pbft/pbft.go文件中obcGeneric类,该类与区块链深入绑定,是真正用于区块链应用的实现类;2)consensus/pbft/mock_utilities_test.go文件中的omniProto类,该类主要用于测试,且基本上是一些空实现;3)consensus/pbft/pbft-core_mock_test.go文件中simpleConsumer类,该类也主要用于测试。和之前的几篇博客一样,本博客主要关注simpleConsumer类中的实现。

V. simpleConsumer.skipTo函数的实现细节

simpleConsumer.skipTo函数的主体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// [pbft-core_mock_test.go] skipTo函数
func (sc *simpleConsumer) skipTo(seqNo uint64, id []byte, replicas []uint64) {
...
sc.executions = seqNo // L117
go func() { // L118
sc.pe.manager.Queue() <- stateUpdatedEvent{ // L119
chkpt: &checkpointMessage{
seqNo: seqNo,
id: id,
},
target: &pb.BlockchainInfo{},
}
}()
sc.pbftNet.debugMsg("TEST: skipping to %d\n", seqNo) // L127
}
simpleConsumer.skipTo函数的实现部分也比较简单:

  1. 由于状态同步完成后的执行请求数将达到seqNo,因此L117行将executions字段设置为seqNo
  2. L118行启动了一个协程,其中新建了一个stateUpdatedEvent类型的变量,并发送到chan Event通道中。这一部分的具体运行流程已经在博客Fabric中PBFT源码解读(2)进行了详细介绍,感兴趣的读者可以前往查阅。 正如本博客第III节中所述,stateUpdatedEvent类型的event将可能再次开启下一轮的状态同步。

读者可能会困惑:这里并没有进行数据传输操作啊,怎么实现状态同步呢? 这是因为,我们这里只是pbft-core_mock_test.go测试文件中的代码,具体的数据传输操作是跟使用PBFT共识的应用紧密相关的。因此,这里只是打印了一个log,如L127行。

参考文献

  1. PBFT代码篇:fabric 中的 PBFT 实现
  2. Hyperledger Fabric的PBFT源码分析(一)
  3. Fabric中PBFT源码解读(1)
  4. Fabric中PBFT源码解读——Checkpoint机制