本篇博客是继续对Hyperledger Fabric
中的PBFT
代码进行解读,重点关注状态同步机制的实现。
I.写在前面
A. 前置阅读
本篇博客是建立在之前一些博客的基础上。一切重复的知识介绍,笔者在这篇博客中就不再赘述了。 因此,强烈建议读者先去阅读一下以下这几篇博客。
- Fabric中PBFT源码解读(1)
- Fabric中PBFT源码解读(2)
- Fabric中PBFT源码解读(3)
- Fabric中PBFT源码解读(4)
- Fabric中PBFT源码解读(5)
- Fabric中PBFT源码解读——Checkpoint机制
B. instance.skipInProgress字段
在前面一篇博客Fabric中PBFT源码解读——Checkpoint机制中,我们介绍到在weakCheckpointSetOutOfRange
函数中,若发现当前节点的状态数据落后了,则将instance.skipInProgress
字段设置为true
。相关代码摘录如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// [pbft-core.go] recvCheckpoint -> weakCheckpointSetOutOfRange
func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *Checkpoint) bool {
...
if chkpt.SequenceNumber < H { // L1063
...
} else {
...
if len(instance.hChkpts) >= instance.f+1 { // L1073
...
if m := chkptSeqNumArray[len(chkptSeqNumArray)-(instance.f+1)]; m > H {
...
instance.skipInProgress = true // L1094
...
}
}
}
}instance.skipInProgress
字段主要作了一种标记:当前节点需要进行状态同步。在后面的合适时机,节点将基于该标记真正发起状态同步操作。 本篇博客将介绍Fabric
PBFT
代码中与状态同步相关的代码。
II. 标记需要进行“状态同步”
如上一小节所述,节点的状态同步分为了两个步骤:(1)通过instance.skipInProgress
字段标记需要进行“状态同步”;(2)在合适的时机,检查instance.skipInProgress
字段,若为true
,则启动状态同步操作。 除了上一小节中所说的weakCheckpointSetOutOfRange
函数可能会对skipInProgress
字段进行标记,另外一个地方也可能会对该字段进行标记,如下所示: 1
2
3
4
5
6
7
8
9
10// [pbft-core.go] stateTransfer
func (instance *pbftCore) stateTransfer(optional *stateUpdateTarget) {
if !instance.skipInProgress {
...
instance.skipInProgress = true
...
}
instance.retryStateTransfer(optional)
}stateTransfer
只是对retryStateTransfer
函数做了一次封装。后面我们会介绍到:启动状态同步的真实操作就是通过调用retryStateTransfer
函数实现的。因此,对于stateTransfer
函数中的操作,我们可以理解为:其在一个函数中同时完成了节点状态同步的两个步骤,从而可以立即启动节点状态的同步。
III. 启动“状态同步”操作
启动“状态同步”操作的时机主要有两个:一是在ProcessEvent
函数中,二是在witnessCheckpointWeakCert
函数中。相关代码分别如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync()
if instance.skipInProgress {
instance.retryStateTransfer(nil)
}
...
}
...
}
// [pbft-core.go] witnessCheckpointWeakCert
func (instance *pbftCore) witnessCheckpointWeakCert(chkpt *Checkpoint) {
...
if instance.skipInProgress {
...
instance.retryStateTransfer(target)
}ProcessEvent
函数中,每当一个请求执行完毕后,就会判断一下是不是需要进行状态同步。witnessCheckpointWeakCert
函数是在recvCheckpoint
函数中检测到有f+1个相同的Checkpoint
消息时(也即表明此Checkpoint
大概率是稳定的Checkpoint
)被调用。witnessCheckpointWeakCert
函数首先构建一个“状态同步”的目标target
,然后判断一下是不是需要进行状态同步。
容易看出,无论是哪一种情形,都是通过调用retryStateTransfer
函数进行状态同步的。
另一方面,调用retryStateTransfer
函数的代码还包括以下两处:1)ProcessEvent
函数的stateUpdatedEvent case
和 2)stateTransfer
函数。stateTransfer
函数的相关代码已经在第II节中介绍了,ProcessEvent
函数的相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case stateUpdatedEvent:
...
if et.target == nil || update.seqNo < instance.h {
...
if instance.highStateTarget == nil {
...
} else if update.seqNo < instance.highStateTarget.seqNo {
...
instance.retryStateTransfer(nil)
}...
instance.skipInProgress = false
...
}
}
...
}stateUpdatedEvent
。相应地,在ProcessEvent
中处理该类型event
时,将会再次检查当前的low-water mark
和已完成的状态更新操作的seqNo
。若后者小于前者,则说明需要再次进行状态更新操作,也即调用retryStateTransfer
函数。否则,将skipInProgress
字段置为false
。
综上,调用retryStateTransfer
函数进行状态同步的时机包括四个:1)ProcessEvent
函数的execDoneEvent case
;2)ProcessEvent
函数的stateUpdatedEvent case
;3)witnessCheckpointWeakCert
函数;4)stateTransfer
函数。
IV. retryStateTransfer函数的实现细节
retryStateTransfer
函数的主体代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// [pbft-core.go] retryStateTransfer
func (instance *pbftCore) retryStateTransfer(optional *stateUpdateTarget) {
if instance.currentExec != nil { // L886
logger.Debugf(...)
return
}
if instance.stateTransferring { // L891
logger.Debugf(...)
return
}
target := optional // L896
if target == nil {
if instance.highStateTarget == nil { // L898
...
return
}
target = instance.highStateTarget // L902
}
instance.stateTransferring = true // L905
...
instance.consumer.skipTo(target.seqNo, target.id, target.replicas) // L908
}
retryStateTransfer
函数首先会判断当前节点是否正在执行一个请求(L886行)。如果是,则停止状态更新,直接返回。- L891行会判断当前节点是否正在进行状态更新,若是,也停止状态更新,直接返回。判断当前节点是否正在进行状态更新依赖于
stateTransferring
字段,该字段在L905行被赋值为true
,表明正式进行状态更新。 - L896行定义的
target
变量,用于保存将要同步的目标状态。该变量受两个参数的影响,一个是retryStateTransfer
函数传入的参数optional
,另一个是instance
保存的字段highStateTarget
。highStateTarget
的优先级比optional
高,因此优先将target
变量赋值为highStateTarget
值(L902行)。但若两者都为nil
(L898行),则停止状态更新,直接返回。 - L908行调用
skipTo
函数进行状态的更新。skipTo
函数其实是innerStack
接口中定义的一个函数,依赖于使用PBFT
模块进行共识的其他模块的实现。具体而言,在Fabric
中共有三种innerStack
接口的实现:1)consensus/pbft/pbft.go
文件中obcGeneric
类,该类与区块链深入绑定,是真正用于区块链应用的实现类;2)consensus/pbft/mock_utilities_test.go
文件中的omniProto
类,该类主要用于测试,且基本上是一些空实现;3)consensus/pbft/pbft-core_mock_test.go
文件中simpleConsumer
类,该类也主要用于测试。和之前的几篇博客一样,本博客主要关注simpleConsumer
类中的实现。
V. simpleConsumer.skipTo函数的实现细节
simpleConsumer.skipTo
函数的主体代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// [pbft-core_mock_test.go] skipTo函数
func (sc *simpleConsumer) skipTo(seqNo uint64, id []byte, replicas []uint64) {
...
sc.executions = seqNo // L117
go func() { // L118
sc.pe.manager.Queue() <- stateUpdatedEvent{ // L119
chkpt: &checkpointMessage{
seqNo: seqNo,
id: id,
},
target: &pb.BlockchainInfo{},
}
}()
sc.pbftNet.debugMsg("TEST: skipping to %d\n", seqNo) // L127
}simpleConsumer.skipTo
函数的实现部分也比较简单:
- 由于状态同步完成后的执行请求数将达到
seqNo
,因此L117行将executions
字段设置为seqNo - L118行启动了一个协程,其中新建了一个
stateUpdatedEvent
类型的变量,并发送到chan Event
通道中。这一部分的具体运行流程已经在博客Fabric中PBFT源码解读(2)进行了详细介绍,感兴趣的读者可以前往查阅。 正如本博客第III节中所述,stateUpdatedEvent
类型的event
将可能再次开启下一轮的状态同步。
读者可能会困惑:这里并没有进行数据传输操作啊,怎么实现状态同步呢? 这是因为,我们这里只是pbft-core_mock_test.go
测试文件中的代码,具体的数据传输操作是跟使用PBFT
共识的应用紧密相关的。因此,这里只是打印了一个log
,如L127行。