本篇博客是PBFT
代码解读的第三篇博客。
V. Preprepare
消息的接收以及Prepare
消息的发送
本系列的第一篇博客介绍了源代码的获取和测试用例的选择,第二篇博客介绍了Primary
节点对repBatch
的处理,以及对PrePrepare
消息的发送。强烈建议读者先阅读上述两篇博客,再来阅读当前这篇。
为方便读者查阅,本系列博客的链接整理如下:
IV.B小节介绍了节点0
中Preprepare
消息的发送过程,本小节介绍其他节点(以节点1
为例)接收Preprepare
消息的过程和发送Prepare
消息的过程。 由IV.B小节的最后介绍可知,节点0
的Preprepare
消息最终发送到了节点1
的events
通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop
函数。为唤起读者的记忆,eventLoop
函数的代码再次摘录如下。 1
2
3
4
5
6
7
8
9
10// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}eventLoop
函数进一步调用Inject
函数、SendEvent
函数、pbftCore.ProcessEvent
函数。以上都和节点0
接收reqBatch
消息时的流程相同,以下从pbftCore.ProcessEvent
函数开始介绍。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *PrePrepare:
err = instance.recvPrePrepare(et)
...
}
}
// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
if !instance.inWV(preprep.View, preprep.SequenceNumber) { // L715
...
return nil
}
...
cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732
...
if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758
...
cert.sentPrepare = true // L766
...
}
...
}
ProcessEvent
函数中的switch
判断后进入PrePrepare case
,继而调用recvPrePrepare
函数。recvPrePrepare
函数主要保存preprepare
消息,并发送prepare
消息。 recvPrePrepare
函数首先做了一些检查,这里挑选了一个比较有意思的:L715行检查收到的preprep
消息是否是当前View
中,且SequenceNumber
在一定的区间范围内。 L758行的判断语句比较长,主要包括了三部分:
- 判断当前节点是否是
Primary
。PBFT
的Prepare
阶段不需要Primary
节点发送Prepare
消息。 - 判断当前是否已经到达了
preprepared
的状态。OSDI‘99
的论文中并未定义preprepared
的状态原语,这里是代码实现中的额外定义。关于preprepared
状态的检查在后面再进行介绍。 - 判断是否已经发送过了
prepare
消息了。sentPrepare
是一个bool
类型的变量,其在L766行赋值为true
。
以下介绍对preprepared
状态的检查。prePrepared
函数定义如下: 1
2
3
4
5
6
7
8
9
10
11
12// [pbft-core.go] ProcessEvent -> recvPrePrepare -> prePrepared
func (instance *pbftCore) prePrepared(digest string, v uint64, n uint64) bool {
...
cert := instance.certStore[msgID{v, n}] // L483
if cert != nil {
p := cert.prePrepare
if p != nil && p.View == v && p.SequenceNumber == n && p.BatchDigest == digest { // L486
return true
}
}
...
}prePrepared
函数中主要对instance
的字典类型字段certStore
进行了查找,查找该字典中是否已经存储了相应的prePrepare
内容(L483行),并且查询得到的prePrepare
内容与参数中的digest
,v
,n
保持一致(L486行)。 其中L483行是读certStore
字段,那么在此之前应该对certStore
字段进行了写操作。该写操作发生在recvPrePrepare
函数的L734行,相关代码如下所示。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// [pbft-core.go] recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732
...
}
// [pbft-core.go] recvPrePrepare -> getCert
func (instance *pbftCore) getCert(v uint64, n uint64) (cert *msgCert) {
idx := msgID{v, n}
cert, ok := instance.certStore[idx]
if ok {
return
}
cert = &msgCert{}
instance.certStore[idx] = cert
return
}recvPrePrepare
函数的L732行调用了getCert
函数,后者首先查看certStore
中是否已经保存了相应的idx
,如果没有,则生成一个idx
并存储在certStore
中。 回到recvPrePrepare
函数的L758行,相关代码如下所示。当第一次收到preprepare
消息时,该判断语句值为true
。首先在L760行定义了代表prepare
消息的prep
变量。该变量将在L768行和L769行分别发送给自己和其他节点。L769行构建了代表prepare
消息的Message_Prepare
变量,并通过innerBroadcast
函数发送出去。innerBroadcast
函数的调用过程和IV.B小节中的类似。
这里的代码实现和OSDI‘99
论文中的描述有些出入,OSDI’99
论文中prepare
消息不发送给自己;相应地,后面prepare
消息的计数也就不包括自己的。这里的代码实现中,L768行将prepare
消息发给了自己;并且,后面的prepare
消息的计数也包括了自己的。 1
2
3
4
5
6
7
8
9
10
11
12
13
14// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {
...
if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758
...
prep := &Prepare{ // L760
...
}
...
instance.recvPrepare(prep) // L768
return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}}) // L769
}
...
}prepare
消息的广播吗?广播不就已经包括了当前节点(即节点0
)了吗?为什么还要使用L768行给自己发送一次呢。这是因为innerBroadcast
的代码实现中去除了对当前节点的消息发送。IV.B小节已经介绍了innerBroadcast
函数中对其他函数的调用链路,即innerBroadcast
->simpleConsumer.broadcast
->Broadcast
->broadcastFilter
->internalQueueMessage
,internalQueueMessage
函数中的消息通过net.msgs
通道被testnet.process
函数读取。process
函数进一步调用processMessageFromChannel
,deliverFilter
。也就是说,prepare
消息的广播是通过deliverFilter
函数完成的。deliverFilter
函数的相关代码摘录如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// [mock_network_test.go]
func (net *testnet) deliverFilter(msg taggedMsg) {
...
if msg.dst == -1 {
...
for id, ep := range net.endpoints {
...
lid := id
...
go func() {
...
if msg.src == lid {
...
return
}
}
}
}...
}msg.src
和消息的接受者lid
相同时,直接从协程中返回。
VI. Prepare
消息的接收以及Commit
消息的发送
第V节介绍了节点1中对Prepare
消息的发送,本节介绍节点(以节点2
为例)对Prepare
的接收,基于接收到的Prepare
消息判断是否进入了Prepared
状态,并决定是否发送Commit
消息。 结合IV.B小节和第V节可知,节点1
中的Prepare
消息最终是发送到了节点2
的events
通道中。 此外,IV.B小节中提到每个节点会启动一个协程运行eventloop
函数。为唤起读者的记忆,eventLoop
函数的代码再次摘录如下。 1
2
3
4
5
6
7
8
9
10// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}eventLoop
函数进一步调用Inject
函数、SendEvent
函数、pbftCore.ProcessEvent
函数。以上都和节点0
接收reqBatch
消息(节点1
接收PrePrepare
消息)时的流程相同,以下从pbftCore.ProcessEvent
函数开始介绍。 1
2
3
4
5
6
7
8
9
10// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *Prepare: // L344
err = instance.recvPrepare(et)
...
}
}ProcessEvent
函数中switch
选择为Prepare case
,并调用recvPrepare
函数,对应于L344行代码。recvPrepare
函数和第V节中介绍的recvPrePrepare
函数很类似,关键代码摘录如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14// [pbft-core.go] ProcessEvent -> recvPrepare
func (instance *pbftCore) recvPrepare(prep *Prepare) error {
...
if instance.primary(prep.View) == prep.ReplicaId { // L779
...
return nil
}
...
cert := instance.getCert(prep.View, prep.SequenceNumber) // L794
...
cert.prepare = append(cert.prepare, prep) // L802
...
return instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber) // L805
}recvPrepare
函数首先在L779行做了一次判断:若prepare
消息的发送方是Primary
,则直接返回。这和OSDI'99
论文中是保持一致的,即不需要也不考虑Primary
发送的prepare
消息。L794行和L802行将新收到的prepare
消息存储到cert.prepare
字段中。然后便是在L805行调用maybeSendCommit
函数。相关代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit
func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {
cert := instance.getCert(v, n)
if instance.prepared(digest, v, n) && !cert.sentCommit { // L811
...
commit := &Commit{
...
}
cert.sentCommit = true
instance.recvCommit(commit)
return instance.innerBroadcast(&Message{&Message_Commit{commit}})
}
return nil
}maybeSendCommit
函数和recvPrePrepare
函数很类似,最大的不同就是L811行中prepared
原语的判断。该prepared
原语在OSDI'99
论文中也有定义。prepared
函数的代码实现如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared
func (instance *pbftCore) prepared(digest string, v uint64, n uint64) bool {
...
quorum := 0 // L504
cert := instance.certStore[msgID{v, n}]
...
for _, p := range cert.prepare {
if p.View == v && p.SequenceNumber == n && p.BatchDigest == digest {
quorum++
}
} // L514
...
return quorum >= instance.intersectionQuorum()-1 // L519
}
// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared -> intersectionQuorum
func (instance *pbftCore) intersectionQuorum() int {
return (instance.N + instance.f + 2) / 2
}prepared
函数的实现还是比较容易理解的,首先在L504行到L514行对收到的prepare
消息进行计数,计数值为quorum
。然后将quorum
值与intersectionQuorum-1
值进行比较(L519行)。intersectionQuorum
值的计算由intersectionQuorum
函数完成。当N>3f+1时,intersectionQuorum-1
值为2f。这里需要特别强调,prepared
状态的判断只需要prepare
消息数量达到2f即可;作为对比,下一小节中committed
状态的判断需要commit
消息数量达到2f+1.
回到maybeSendCommit
函数的L811行,当节点2
的prepare
计数超过quorum
值时,标记当前节点进入prepared
状态,从而开始广播commit
消息。