本篇博客是继续对Hyperledger Fabric
中的PBFT
代码进行解读,重点关注checkpoint
机制的实现。
I. 写在前面
A. 前置阅读
本篇博客是建立在之前一些博客的基础上。一切重复的知识介绍,笔者在这篇博客中就不再赘述了。 因此,强烈建议读者先去阅读一下以下这几篇博客。
B. 对TestCheckpoint函数的测试
和博客Fabric中PBFT源码解读(1)中的类似,本篇博客主要介绍pbft-core_test.go
中TestCheckpoint
函数对PBFT
功能的测试。该功能主要指PBFT
中的Checkpoint
。 TestCheckpoint
函数主要分为两部分,第一部分(L297~L330行)是对简单的Checkpoint
生成进行测试,第二部分(L332~L361行)测试Water mark
对ReqBatch
执行的影响。但第二部分的测试代码貌似有点问题。而且整个TestCheckpoint
函数中定义的两个WaitGroup
类型的变量execWait
和finishWait
也挺令人费解的。。。。。。
因此,本篇博客主要关注第一部分的测试,并且忽略execWait
和finishWait
变量。
总体而言,Checkpoint
主要会影响1)节点中Water mark
的更新和2)集群中View
的切换。由于View
的切换比较复杂,笔者会专门用一篇博客进行介绍。因此,本篇博客主要介绍Checkpoint
生成以及Water mark
的更新。
II. 对TestCheckpoint函数运行流程的解读
TestCheckpoint
函数的主体代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// [pbft-core_test.go] TestCheckpoint
func TestCheckpoint(t *testing.T) {
...
validatorCount := 4
config := loadConfig()
config.Set("general.K", 2) // L299
config.Set("general.logmultiplier", 2) // L300
net := makePBFTNetwork(validatorCount, config)
defer net.stop()
execReqBatch := func(tag int64) {
net.pbftEndpoints[0].manager.Queue() <- createPbftReqBatch(tag, uint64(generateBroadcaster(validatorCount)))
net.process()
}
// execWait is 0, and execute will proceed
execReqBatch(1) // L310
execReqBatch(2) // L311
...
net.process()
...
}TestNetwork
大同小异,主要的不同包括general.K
和general.logmultiplier
两个参数的设置,以及对reqBatch
的请求发送进行了一个函数封装(也即execReqBatch
函数)。
A. Checkpoint和Water mark的概念解释
为解释general.K
和general.logmultiplier
两个参数,我们首先介绍一下PBTF节点中的日志存储。PBTF
中的每一条执行请求都会以日志的形式记录在节点中。但这些日志不能无限增长,必须采用某种方式进行日志的缩减(也即OSDI'99论文中Section 4.3 Garbage Collection中所介绍的)。为实现日志删减,PBFT
中定义了Checkpoint
的概念,并将在2f+1个节点中都达成了的Checkpoint
称为Stable checkpoint
。基于Stable checkpoint
,PBFT
中还定义了Water mark
的概念。Water mark
可以被看作是一个序列号窗口,包括low-water mark
和high-water mark
。其中low-water mark
为Stable checkpoint
中包含的最高的请求序列号。
因此,Water mark
随着Checkpoint
往前动态推移。同时,Water mark
还规定了当前能够被接收执行的请求的序列号。具体而言,由于low-water mark
表示了已被Stable checkpoint
确认的序列号,因而低于该low-water mark
的请求将不予执行。同时,高于high-water mark
的请求也将不予执行。下图展示了Checkpoint
和Water mark
的示意图。PBFT
中要求Water mark
的大小必须为Checkpoint
大小的整数倍。如下图中,Checkpoint
的大小为6,Water mark
的大小为18。
回到TestCheckpoint
函数中,general.K
定义了Checkpoint
的大小,general.logmultiplier
定义了Water mark
中可容纳的Checkpoint
的个数。也即Water mark
的大小为general.K
*general.logmultiplier
。
TestCheckpoint
函数在L299行和L300行分别定义了general.K
和general.logmultiplier
的值为2。也即:每隔两个请求,节点将生成一个Checkpoint
。
另一方面,L310行和L311行恰好发起了两个执行请求。因此,预计测试结果中将生成一个Checkpoint
。 以下,我们再此回顾请求的执行过程,重点关注函数调用中与Checkpoint
和Water mark
相关的部分。
B. sendPrePrepare函数中对sequence的检查
回顾之前博客中的内容,L310行中发出的请求将由sendPrePrepare
函数接收并处理,该函数及相关函数的代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// [pbft-core.go] sendPrePrepare
func (instance *pbftCore) sendPrePrepare(reqBatch *RequestBatch, digest string) {
...
if !instance.inWV(instance.view, n) || n > instance.h+instance.L/2 { // L643
...
return
}
}
func (instance *pbftCore) inWV(v uint64, n uint64) bool {
return instance.view == v && instance.inW(n)
}
func (instance *pbftCore) inW(n uint64) bool {
return n-instance.h > 0 && n-instance.h <= instance.L
}sendPrePrepare
函数主要在L643行对请求的sequence
n
进行了两部分检查。
第一部分主要调用了inWV
函数,后者又主要调用了inW
函数。这里需要提及的事,Fabric
对PBFT
的具体实现中,只定义了low-water mark
的值和Water mark
的大小。前者即为instance.h
,后者为instance.L
。因此,在inW
函数中,实际上便对请求的序列号进行了low-water mark
和high-water mark
的检查。
第二部分的检查对n
再次进行了检查。当请求的序列号超过当前water mark
的中位值时,也不会发送当前请求。这一点其实是有点让人费解的,我们姑且认为其是做了一个更为保守的检查操作吧。
C. Commit请求时生成Checkpoint
在前面的博客中介绍到:在recvCommit
函数中如果判断已经达到了committed
的状态,将调用executeOutstanding
函数。为唤起读者的记忆,相关代码摘录如下: 1
2
3
4
5
6
7
8// [pbft-core.go] recvCommit
func (instance *pbftCore) recvCommit(commit *Commit) error {
...
if instance.committed(commit.BatchDigest, commit.View, commit.SequenceNumber) {
...
instance.executeOutstanding()
...
}executeOutstanding
函数进一步调用pbftCore.executeOne
函数、simpleConsumer.execute
函数。execute
函数中向Event
管道中传入了execDoneEvent
类型的消息,如下代码所示: 1
2
3
4
5
6
7// [pbft-core_mock_test.go]execute
func (sc *simpleConsumer) execute(seqNo uint64, reqBatch *RequestBatch) {
for _, req := range reqBatch.GetBatch() {
...
go func() { sc.pe.manager.Queue() <- execDoneEvent{} }()
}
}managerImpl.eventLoop
函数中被接收,后者接着调用managerImpl.Inject
函数、SendEvent
函数将该消息传送到pbftCore.ProcessEvent
函数中。pbftCore.ProcessEvent
函数中的相关代码如下所示: 1
2
3
4
5
6
7
8
9
10// [pbft-core_mock_test.go]execute -> .... -> [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case execDoneEvent:
instance.execDoneSync() // L386
...
...
}ProcessEvent
函数在L386行调用了execDoneSync
函数,后者的相关代码如下: 1
2
3
4
5
6
7
8// [pbft-core_mock_test.go]execute -> .... -> [pbft-core.go] ProcessEvent -> execDoneSync
func (instance *pbftCore) execDoneSync() {
if instance.currentExec != nil {
if instance.lastExec%instance.K == 0 { // L996
instance.Checkpoint(instance.lastExec, instance.consumer.getState())
}
} ...
}lastExec
是K
的整数倍,即被认定为需要进行Checkpoint
,从而调用pbftCore.Checkpoint
函数。Checkpoint
函数的主体代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12// [pbft-core_mock_test.go]execute -> .... -> [pbft-core.go] ProcessEvent -> execDoneSync -> Checkpoint
func (instance *pbftCore) Checkpoint(seqNo uint64, id []byte) {
...
chkpt := &Checkpoint{ // L980
SequenceNumber: seqNo,
ReplicaId: instance.id,
Id: idAsString,
} // L984
...
instance.recvCheckpoint(chkpt) // L988
instance.innerBroadcast(&Message{Payload: &Message_Checkpoint{Checkpoint: chkpt}}) // L989
}Checkpoint
类型的变量chkpt
,L988行将该变量发送给了自己,L989行将该变量广播给了其他节点。 这里我们先看innerBroadcast
函数。这里的innerBroadcast
函数调用和前面博客中的innerBroadcast
调用大同小异,最终都将转发到其他节点中的pbftCore.ProcessEvent
中来接收。pbftCore.ProcessEvent
函数的相关代码如下: 1
2
3
4
5
6
7
8
9// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *Checkpoint:
return instance.recvCheckpoint(et)
...
}ProcessEvent
在接收到Checkpoint
类型的消息后也是调用了recvCheckpoint
函数。以下我们重点来关注recvCheckpoint
函数。
D. recvCheckpoint函数的实现细节(1)
recvCheckpoint
函数的主体代码如下所示: 1
2
3
4
5
6
7
8// [pbft-core.go] recvCheckpoint
func (instance *pbftCore) recvCheckpoint(chkpt *Checkpoint) events.Event {
...
if instance.weakCheckpointSetOutOfRange(chkpt) {
return nil
}
...
}
recvCheckpoint
函数首先调用了weakCheckpointSetOutOfRange
函数。后者主要用来判断,当前checkpoint
是否超过了high-water mark
。并结合已经存储的超过high-water mark
的checkpoint
个数,来判断当前节点的状态是否确实落后了。我们首先来看一下weakCheckpointSetOutOfRange
函数。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// [pbft-core.go] recvCheckpoint -> weakCheckpointSetOutOfRange
func (instance *pbftCore) weakCheckpointSetOutOfRange(chkpt *Checkpoint) bool {
H := instance.h + instance.L
...
if chkpt.SequenceNumber < H { // L1063
...
delete(instance.hChkpts, chkpt.ReplicaId) // L1065
} else {
...
instance.hChkpts[chkpt.ReplicaId] = chkpt.SequenceNumber // L1069
...
if len(instance.hChkpts) >= instance.f+1 { // L1073
chkptSeqNumArray := make([]uint64, len(instance.hChkpts)) // L1074
index := 0
for replicaID, hChkpt := range instance.hChkpts {
chkptSeqNumArray[index] = hChkpt
index++
if hChkpt < H {
delete(instance.hChkpts, replicaID)
}
} // L1082
sort.Sort(sortableUint64Slice(chkptSeqNumArray)) // L1083
...
if m := chkptSeqNumArray[len(chkptSeqNumArray)-(instance.f+1)]; m > H { // L1088
...
instance.moveWatermarks(m) // L1092
...
instance.skipInProgress = true // L1094
...
return true // L1100
}
}
}
return false // L1105
}weakCheckpointSetOutOfRange
函数看起来比较复杂,但整体逻辑还是比较简单的。主要分为两个部分:
- 1)L1063行的判断为真时,说明
chkpt中SequenceNumber
并未超过high-water mark
,当然也就不会得到节点状态确实落后的结论,从而在L1105行返回false
。如下所述,只有当超过f+1个节点发来的chkpt
都超过了high-water mark
,才说明当前节点确实落后了。此外,需要提一下L1065行的操作。L1065行使用了一个hChkpts
字段,该字段中主要保存了所有超过high-water mark
的checkpoint
。因此,当一个节点发来的chkpt
中的SequenceNumber
并未超过high-water mark
时,将尝试对hChkpts
字段进行删除更新操作。 - 2)L1063行的判断为假时,将进一步判断:超过
high-water mark
的checkpoint
数量是否已经超过f+1个。首先在L1069行将当前chkpt
保存在hChkpts字段中。L1073行对hChkpts
字段中保存的checkpoint
数量进行了一个初步判断。L1074至L1082行对hChkpts
字段中保存的内容进行了一次重新组织,包括将其中已经小于high-water mark
的值删除,并将其余值的hChkpt
(其实是SequenceNumber
)保存在一个新的chkptSeqNumArray
变量中。L1083行对chkptSeqNumArray
变量进行了排序。L1088行对chkptSeqNumArray
中处于倒数第f+1位置的值进行了判断:判断其是否超过了high-water mark。这里的逻辑是这样的:由于chkptSeqNumArray
中的值已经是递增有序的了,当倒数第f+1位置的值都大于high-water mark
,则说明其中至少有f+1个位置的值都大于high-water mark
。
乍一看可能会觉得L1074至L1088行的代码有些冗余:不就相当于在删除了小于high-water mark
的checkpoint
后又判断了一次这些checkpoint
的数目嘛。其实这段代码还有一个很重要的作用,就是找到倒数第f+1个checkpoint
的SequenceNumber
值,也即m
。该m
在L1092行用于更新Water mark
。
剩下来的就比较简单了,在L1092行对Water mark
进行了更新,在L1094行将skipInProgress
变量设置为true
,并返回true。 先说一下skipInProgress
变量,其主要被用来作为状态同步的依据。当该值为true
时,节点将在后面进行状态同步。
下面再来看一下L1092行的moveWatermarks
函数,其函数主体如下所示: 1
2
3
4
5
6
7
8
9// [pbft-core.go] recvCheckpoint -> weakCheckpointSetOutOfRange -> moveWatermarks
func (instance *pbftCore) moveWatermarks(n uint64) {
// round down n to previous low watermark
h := n / instance.K * instance.K // L1010
...
instance.h = h // L1049
...
instance.resubmitRequestBatches() // L1054
}moveWatermarks
函数主要干了四件事:
- 计算出新的
low-water mark
。L1010行乍一看是行没用的代码,但其中实现了一些巧妙的运算。由于我们传入的n是某一个Checkpoint
中的最大SequenceNumber
。该SequenceNumber
一般可以表示为instance.K
*m
+(instance.K
-1)的形式。通过L1010行的计算,得到instance.K
*m
,该值即为该checkpoint
的low-water mark
。举例来说,假设instance.K
为6,Checkpoint
中包含的SequenceNumber
区间将可能是[0, 5], [6, 11], [12, 17] ...对于[12, 17]的Checkpoint
,其传入的n
为17,经过L1010行的计算17/6*6得到值12,即为该Checkpoint
的low-water mark
. - 删除小于新的
low-water mark
的数据。由于这些代码比较长(从L1011行到L1048行),考虑到篇幅,笔者没有在上面代码中列出。这些删除操作包括删除certStore
、checkpointStore
、pset
、qset
、chkpts
中存储的各种数据。 - 更新节点的
low-water mark
。L1049实现了该操作。 - 重新提交被积压的请求。如L1054行所示。
E. recvCheckpoint函数的实现细节(2)
重新回到recvCheckpoint
函数,相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60// [pbft-core.go] recvCheckpoint
func (instance *pbftCore) recvCheckpoint(chkpt *Checkpoint) events.Event {
...
if instance.weakCheckpointSetOutOfRange(chkpt) { // L1147
return nil // L1148
}
if !instance.inW(chkpt.SequenceNumber) { // L1151
...
return nil
}
instance.checkpointStore[*chkpt] = true // L1161
...
diffValues := make(map[string]struct{}) // L1164
diffValues[chkpt.Id] = struct{}{}
matching := 0
for testChkpt := range instance.checkpointStore {
if testChkpt.SequenceNumber == chkpt.SequenceNumber {
if testChkpt.Id == chkpt.Id {
matching++
} else {
if _, ok := diffValues[testChkpt.Id]; !ok {
diffValues[testChkpt.Id] = struct{}{}
}
}
}
} // L1178
...
if count := len(diffValues); count > instance.f+1 { // L1183
logger.Panicf("...")
}
if matching == instance.f+1 { // L1188
...
instance.witnessCheckpointWeakCert(chkpt) // L1197
} // L1198
if matching < instance.intersectionQuorum() { // L1200
return nil
}
...
if _, ok := instance.chkpts[chkpt.SequenceNumber]; !ok { // L1213
...
if instance.skipInProgress { // L1216
logSafetyBound := instance.h + instance.L/2
...
if chkpt.SequenceNumber >= logSafetyBound { // L1220
...
instance.moveWatermarks(chkpt.SequenceNumber) // L1222
}
}
return nil
}
...
instance.moveWatermarks(chkpt.SequenceNumber) // L1231
return instance.processNewView() // L1233
}weakCheckpointSetOutOfRange
函数返回为true
,说明当前节点的数据状态落后太多了,需要首先进行状态同步。也就没有必要再进行这一次的Checkpoint
的操作了,直接在L1148行返回。 以下介绍weakCheckpointSetOutOfRange
函数返回为false
的情形。
- L1151行对当前
chkpt
的SequenceNumber
进行了检查,如果不在watermark
范围内,则直接返回。 - L1161行将接收到的
Checkpoint
存储到checkpointStore
中。checkpointStore
中存储了所有接收到的Checkpoint
。 - 接下来的L1164到L1178行基于
checkpointStore
变量,构建了两个新变量diffValues
和matching
。其中matching变量统计了和当前chkpt
的SequenceNumber
和Id
都相同的checkpoint
的数量,diffValues
变量保存了和当前chkpt
的SequenceNumber
相同但Id
不同的Checkpoint
的Id
值。 - L1183行对
diffValues
进行了检查。如果diffValues
中保存的Id
数量超过f+1个,说明和chkpt
的SequenceNumber
及Id
都相同的checkpoint
的数量最多不超过2f个,因而chkpt
永远不可能成为stable checkpoint
。更重要的是,一般而言:SequenceNumber
相同的Checkpoint
对应的Id
(Id
表示一个节点当前的数据状态)应该也是相同的。这里有超过f+1个Checkpoint
的Id
(也即数据状态)不同,其中至少有一个诚实节点和当前节点的数据状态不同,说明集群中出现了问题,因此直接采用panic
函数报错。 - L1188行判断
matching
值是否为f+1。若是,则调用witnessCheckpointWeakCert
函数。witnessCheckpointWeakCert
函数主要是用来辅助节点的状态更新的。因此,整体L1188至L1198行的逻辑是这样的:当当前节点收到f+1个具有相同SequenceNumber
和Id
的Checkpoint
时,该Checkpoint
大概率代表了一个稳定的数据状态,调用witnessCheckpointWeakCert
函数尝试将当前节点的状态更新到该数据状态。笔者这里思考过一个问题:为什么是判断等于f+1,而不是大于等于f+1。笔者的理解是:对witnessCheckpointWeakCert
函数的调用一次就够了,没必要在大于等于f+1的每次都调用该函数。 - L1200行对
matching
值再次进行了判断。若未达到Quorum
(即2f+1),则说明chkpt
对应的Checkpoint
未达到稳定,直接返回;否则,说明chkpt
对应的Checkpoint
达到稳定,继续下面的运行。 - L1213行对
chkpt
再次进行了检查。首先,我们需要解释一下instance.chkpts
和instance.checkpointStore
两个字段的不同。前者存储的是节点自己生成的Checkpoint
,后者存储的是接收到的Checkpoint
(也包含了自己发送给自己的)。因此,后者是前者的超集。另一方面,即使chkpt
对应的Checkpoint
已经满足了Stable
的要求,其收到的2f+1个Checkpoint
可能全部是从别的节点接收到的。也即出现L1213行中判断ok
值为false
的情况。在此情况下,往往说明当前节点的数据状态落后于其他节点了。 代码在此处做了一个小优化。既然当前节点的数据状态已经落后了,如果当前节点正好处于数据同步的过程中(即L1216行instance.skipInProgress
值为true
),并且chkpt
的SequenceNumber
超过了当前Water mark
的中位值(即L1220行),那就直接更新Water mark
值到这个中位值(即L1222行)。更新Water mark
值的目的在于:当当前轮次的状态同步过程结束后,会再次检查Water mark
,若Water mark
中的low-water mark
仍然高于当前更新轮次的seqNo
,则开启新一轮的状态同步。这样的优化加速了状态同步的进程。 关于“状态数据的同步”,笔者将在后面专门用一篇博客进行介绍。 - 代码执行到L1231行的话,说明:
chkpt
已经达到了稳定条件(即:接收到超过2f+1个Checkpoint
消息),并且该节点自己也生成了其对应的Checkpoint
。那么,就可以直接更新Water mark
了。 - L1233行调用
processNewView
进行View
切换相关的处理。关于View
切换,笔者在后面也会再写一篇博客进行介绍。
III. 小结
本篇博客主要介绍了Fabric
中的PBFT
是如何生成Checkpoint
的,以及在达到Stable Checkpoint
时,如何更新节点的Water mark
。 此外,跟Checkpoint
最相关的另一个机制是View
的切换,这一点我们将在后面的博客中进行介绍。