0%

Fabric中PBFT源码解读(3)

本篇博客是PBFT代码解读的第三篇博客。

V. Preprepare消息的接收以及Prepare消息的发送

本系列的第一篇博客介绍了源代码的获取和测试用例的选择,第二篇博客介绍了Primary节点对repBatch的处理,以及对PrePrepare消息的发送。强烈建议读者先阅读上述两篇博客,再来阅读当前这篇。

为方便读者查阅,本系列博客的链接整理如下:

IV.B小节介绍了节点0Preprepare消息的发送过程,本小节介绍其他节点(以节点1为例)接收Preprepare消息的过程和发送Prepare消息的过程。 由IV.B小节的最后介绍可知,节点0Preprepare消息最终发送到了节点1events通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop函数。为唤起读者的记忆,eventLoop函数的代码再次摘录如下。

1
2
3
4
5
6
7
8
9
10
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}
eventLoop函数进一步调用Inject函数、SendEvent函数、pbftCore.ProcessEvent函数。以上都和节点0接收reqBatch消息时的流程相同,以下从pbftCore.ProcessEvent函数开始介绍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *PrePrepare:
err = instance.recvPrePrepare(et)
...
}
}

// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
if !instance.inWV(preprep.View, preprep.SequenceNumber) { // L715
...
return nil
}
...
cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732
...
if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758
...
cert.sentPrepare = true // L766
...
}
...
}

ProcessEvent函数中的switch判断后进入PrePrepare case,继而调用recvPrePrepare函数。recvPrePrepare函数主要保存preprepare消息,并发送prepare消息。 recvPrePrepare函数首先做了一些检查,这里挑选了一个比较有意思的:L715行检查收到的preprep消息是否是当前View中,且SequenceNumber在一定的区间范围内。 L758行的判断语句比较长,主要包括了三部分:

  1. 判断当前节点是否是PrimaryPBFTPrepare阶段不需要Primary节点发送Prepare消息。
  2. 判断当前是否已经到达了preprepared的状态。OSDI‘99的论文中并未定义preprepared的状态原语,这里是代码实现中的额外定义。关于preprepared状态的检查在后面再进行介绍。
  3. 判断是否已经发送过了prepare消息了。sentPrepare是一个bool类型的变量,其在L766行赋值为true

以下介绍对preprepared状态的检查。prePrepared函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
// [pbft-core.go] ProcessEvent -> recvPrePrepare -> prePrepared
func (instance *pbftCore) prePrepared(digest string, v uint64, n uint64) bool {
...
cert := instance.certStore[msgID{v, n}] // L483
if cert != nil {
p := cert.prePrepare
if p != nil && p.View == v && p.SequenceNumber == n && p.BatchDigest == digest { // L486
return true
}
}
...
}
prePrepared函数中主要对instance的字典类型字段certStore进行了查找,查找该字典中是否已经存储了相应的prePrepare内容(L483行),并且查询得到的prePrepare内容与参数中的digestvn保持一致(L486行)。 其中L483行是读certStore字段,那么在此之前应该对certStore字段进行了写操作。该写操作发生在recvPrePrepare函数的L734行,相关代码如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// [pbft-core.go] recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732
...
}

// [pbft-core.go] recvPrePrepare -> getCert
func (instance *pbftCore) getCert(v uint64, n uint64) (cert *msgCert) {
idx := msgID{v, n}
cert, ok := instance.certStore[idx]
if ok {
return
}

cert = &msgCert{}
instance.certStore[idx] = cert
return
}
容易看到recvPrePrepare函数的L732行调用了getCert函数,后者首先查看certStore中是否已经保存了相应的idx,如果没有,则生成一个idx并存储在certStore中。 回到recvPrePrepare函数的L758行,相关代码如下所示。当第一次收到preprepare消息时,该判断语句值为true。首先在L760行定义了代表prepare消息的prep变量。该变量将在L768行和L769行分别发送给自己和其他节点。L769行构建了代表prepare消息的Message_Prepare变量,并通过innerBroadcast函数发送出去。innerBroadcast函数的调用过程和IV.B小节中的类似。

这里的代码实现和OSDI‘99论文中的描述有些出入,OSDI’99论文中prepare消息不发送给自己;相应地,后面prepare消息的计数也就不包括自己的。这里的代码实现中,L768行将prepare消息发给了自己;并且,后面的prepare消息的计数也包括了自己的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758
...
prep := &Prepare{ // L760
...
}
...
instance.recvPrepare(prep) // L768
return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}}) // L769
}
...
}
此外,有个疑惑是L769行不是实现了prepare消息的广播吗?广播不就已经包括了当前节点(即节点0)了吗?为什么还要使用L768行给自己发送一次呢。这是因为innerBroadcast的代码实现中去除了对当前节点的消息发送。IV.B小节已经介绍了innerBroadcast函数中对其他函数的调用链路,即innerBroadcast->simpleConsumer.broadcast->Broadcast->broadcastFilter->internalQueueMessageinternalQueueMessage函数中的消息通过net.msgs通道被testnet.process函数读取。process函数进一步调用processMessageFromChanneldeliverFilter。也就是说,prepare消息的广播是通过deliverFilter函数完成的。deliverFilter函数的相关代码摘录如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// [mock_network_test.go]
func (net *testnet) deliverFilter(msg taggedMsg) {
...
if msg.dst == -1 {
...
for id, ep := range net.endpoints {
...
lid := id
...
go func() {
...
if msg.src == lid {
...
return
}
}
}
}...
}
从代码中容易发现,当消息的发送着msg.src和消息的接受者lid相同时,直接从协程中返回。

VI. Prepare消息的接收以及Commit消息的发送

第V节介绍了节点1中对Prepare消息的发送,本节介绍节点(以节点2为例)对Prepare的接收,基于接收到的Prepare消息判断是否进入了Prepared状态,并决定是否发送Commit消息。 结合IV.B小节和第V节可知,节点1中的Prepare消息最终是发送到了节点2events通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop函数。为唤起读者的记忆,eventLoop函数的代码再次摘录如下。

1
2
3
4
5
6
7
8
9
10
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}
eventLoop函数进一步调用Inject函数、SendEvent函数、pbftCore.ProcessEvent函数。以上都和节点0接收reqBatch消息(节点1接收PrePrepare消息)时的流程相同,以下从pbftCore.ProcessEvent函数开始介绍。
1
2
3
4
5
6
7
8
9
10
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *Prepare: // L344
err = instance.recvPrepare(et)
...
}
}
此时的ProcessEvent函数中switch选择为Prepare case,并调用recvPrepare函数,对应于L344行代码。recvPrepare函数和第V节中介绍的recvPrePrepare函数很类似,关键代码摘录如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// [pbft-core.go] ProcessEvent -> recvPrepare
func (instance *pbftCore) recvPrepare(prep *Prepare) error {
...
if instance.primary(prep.View) == prep.ReplicaId { // L779
...
return nil
}
...
cert := instance.getCert(prep.View, prep.SequenceNumber) // L794
...
cert.prepare = append(cert.prepare, prep) // L802
...
return instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber) // L805
}
recvPrepare函数首先在L779行做了一次判断:若prepare消息的发送方是Primary,则直接返回。这和OSDI'99论文中是保持一致的,即不需要也不考虑Primary发送的prepare消息。L794行和L802行将新收到的prepare消息存储到cert.prepare字段中。然后便是在L805行调用maybeSendCommit函数。相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit
func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {
cert := instance.getCert(v, n)
if instance.prepared(digest, v, n) && !cert.sentCommit { // L811
...
commit := &Commit{
...
}
cert.sentCommit = true
instance.recvCommit(commit)
return instance.innerBroadcast(&Message{&Message_Commit{commit}})
}
return nil
}
容易发现,maybeSendCommit函数和recvPrePrepare函数很类似,最大的不同就是L811行中prepared原语的判断。该prepared原语在OSDI'99论文中也有定义。prepared函数的代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared
func (instance *pbftCore) prepared(digest string, v uint64, n uint64) bool {
...
quorum := 0 // L504
cert := instance.certStore[msgID{v, n}]
...
for _, p := range cert.prepare {
if p.View == v && p.SequenceNumber == n && p.BatchDigest == digest {
quorum++
}
} // L514
...
return quorum >= instance.intersectionQuorum()-1 // L519
}

// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared -> intersectionQuorum
func (instance *pbftCore) intersectionQuorum() int {
return (instance.N + instance.f + 2) / 2
}
prepared函数的实现还是比较容易理解的,首先在L504行到L514行对收到的prepare消息进行计数,计数值为quorum。然后将quorum值与intersectionQuorum-1值进行比较(L519行)。intersectionQuorum值的计算由intersectionQuorum函数完成。当N>3f+1时,intersectionQuorum-1值为2f。这里需要特别强调,prepared状态的判断只需要prepare消息数量达到2f即可;作为对比,下一小节中committed状态的判断需要commit消息数量达到2f+1.

回到maybeSendCommit函数的L811行,当节点2prepare计数超过quorum值时,标记当前节点进入prepared状态,从而开始广播commit消息。