0%

Fabric中PBFT源码解读(2)

本篇博客是PBFT代码解读的第二篇博客。

IV. Primary节点对repBatch的处理

本系列的第一篇博客介绍了源代码的获取和测试用例的选择。强烈建议读者先阅读上述博客后,再来阅读当前这篇。

为方便读者查阅,本系列博客的链接整理如下:

由上一篇博客Fabric中PBFT源码解读(1)的第III节可知,测试数据repBatch传入到了Primary节点(节点0)的events通道中。本节介绍Primary节点如何取出repBatch测试数据,并基于该数据构建及广播PrePrepare消息。 在介绍Primary节点的操作之前,首先介绍两个非常重要的变量:通道变量events和测试网络变量net

A. 通道变量events和测试网络变量net

需要注意的是,在TestNetwork测试用例中,每个节点其实是用一个协程表示的,并通过pbftEndpoint实例与该协程进行交互。节点之间的消息传输也并不是真得通过物理网络进行传输的,而是通过pbftEndpoint实例中维护的Event类型的通道变量events进行传输。 所有节点的pbftEndpoint实例的引用都保存在一个pbftNetwork类型的变量net中。

上一篇博客第II节中介绍的makePBFTNetwork函数包含了以下代码。其中L178行定义的pn变量从TestNetwork函数中返回后即赋值给net变量。该pn/net变量的pbftEndpoints字段即保存了对所有pbftEndpoint实例的引用,如L179到L183行所示。

1
2
3
4
5
6
7
8
9
10
11
// [pbft-core_mock_test.go]makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork {
...
pn := &pbftNetwork{testnet: makeTestnet(N, endpointFunc)} // L178
pn.pbftEndpoints = make([]*pbftEndpoint, len(pn.endpoints)) // L179
for i, ep := range pn.endpoints {
pn.pbftEndpoints[i] = ep.(*pbftEndpoint)
pn.pbftEndpoints[i].sc.pbftNet = pn
} // L183
return pn
}
并且每个pbftEndpoint实例内部也维护了对上述net的引用(通过嵌套testEndpoint类型)。节点之间的通信便利用该引用指向的net实例进行转发。相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// [pbft-core_mock_test.go]pbftEndpoint
type pbftEndpoint struct { // L29
*testEndpoint
pbft *pbftCore
sc *simpleConsumer
manager events.Manager
}

// [mock_network_test.go]pbftEndpoint->testEndpoint
type testEndpoint struct { // L50
id uint64
net *testnet
}

// [mock_network_test.go]pbftEndpoint->testEndpoint->testnet
type testnet struct { // L41
debug bool
N int
closed chan struct{}
endpoints []endpoint
msgs chan taggedMsg
filterFn func(int, int, []byte) []byte
}

// [pbft-core_mock_test.go]makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork {
...
endpointFunc := func(id uint64, net *testnet) endpoint { // L158
tep := makeTestEndpoint(id, net) // L159
pe := &pbftEndpoint{
testEndpoint: tep, // L161
manager: events.NewManagerImpl(),
}
...
}
...
pn := &pbftNetwork{testnet: makeTestnet(N, endpointFunc)} // L178
...
}

// [pbft-core_mock_test.go]makePBFTNetwork->[mock_network_test.go]makeTestnet
func makeTestnet(N int, initFn func(id uint64, network *testnet) endpoint) *testnet {
net := &testnet{}
net.msgs = make(chan taggedMsg, 100)
net.closed = make(chan struct{})
net.endpoints = make([]endpoint, N)

for i := range net.endpoints {
net.endpoints[i] = initFn(uint64(i), net)
}

return net
}

// [pbft-core_mock_test.go]makePBFTNetwork->[mock_network_test.go]makeTestEndpoint
func makeTestEndpoint(id uint64, net *testnet) *testEndpoint {
ep := &testEndpoint{}
ep.id = id
ep.net = net
return ep
}
TestNetwork函数的L158行定义了一个回调函数变量endpointFunc,并将该变量作为参数传入到makeTestnet函数中,如L178行所示。makeTestnet函数中定义了一个testnet类型的变量net。该变量从makeTestnet函数中返回后,即赋值给makePBFTNetwork函数中pn变量中的testnet字段。 与此同时,net变量作为参数传入到回调函数initFn/endpointFunc中。回到endpointFunc函数的定义,该函数在L159行将参数net传入到makeTestEndpoint函数中,后者将参数net赋值给testEndpointnet变量。而该testEndpoint变量(也即tep)在makePBFTNetwork函数中被赋值给pbftEndpoint类的testEndpoint,如makePBFTNetwork函数中的L161行所示。

综上所述,外部环境(客户端)在向PBFT集群发送消息时,通过TestNetwork中的net实例发送到Primary节点的events通道中。 PBFT集群中,一个节点向其他节点发送消息时,首先通过其内部保存的net引用找到net实例,然后利用该实例将消息转发到特定节点的events通道中。

B. Preprepare消息的发送过程

上一篇博客第III节的内容,其中在L262行将reqBatch发送到节点0events中。为方便查看,相关代码再次附在下面。该发送过程是借助于net变量实现的。节点0基于该reqBatch构建Preprepare消息,并将后者广播给其他节点。 本小节即分析节点0是如何广播Preprepare消息的。

1
2
3
4
5
6
// [pbft-core_test.go] TestNetwork
func TestNetwork(t *testing.T) {
...
net.pbftEndpoints[0].manager.Queue() <- reqBatch // L262
...
}
上一篇博客的第II节中,我们介绍了makePBFTNetwork函数。实际上,在makePBFTNetwork函数的endpointFunc函数中,每个节点启动了一个协程go rountine,来读取对events通道中消息。相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// [pbft-core_mock_test.go] makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork { // L151
...
endpointFunc := func(id uint64, net *testnet) endpoint { // L158
tep := makeTestEndpoint(id, net)
pe := &pbftEndpoint{
testEndpoint: tep,
manager: events.NewManagerImpl(),
}
...
pe.manager.SetReceiver(pe.pbft) // L170
...
pe.manager.Start()
...
}
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start
func (em *managerImpl) Start() {
go em.eventLoop()
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
...
}
}
}
每个节点(包括节点0)内部维护了一个manager变量,后者的Start函数中启动了一个协程运行eventLoop函数。在eventLoop函数内部轮询events通道,并将读取到的消息发送到Inject函数中。Inject函数的代码和其他相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil {
SendEvent(em.receiver, event)
}
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent
func SendEvent(receiver Receiver, event Event) {
next := event
for {
// If an event returns something non-nil, then process it as a new event
next = receiver.ProcessEvent(next) // L113
if next == nil {
break
}
}
}
容易看出,节点内部主要调用了ProcessEvent函数来处理从events通道中读取到的消息,如SendEvent中的L113行代码所示。此处的for循环主要是考虑到消息嵌套的场景:若ProcessEvent处理完一条消息后的结果仍然是一条消息,将继续调用ProcessEvent函数处理。我们这里不出现该场景,因而先不讨论for循环的问题。 SendEvent函数中的Receiver参数是一个接口。回顾makePBFTNetwork函数中的L170行,其将该接口变量复制为pe.pbft(这是一个pbftCore类型变量)。因此,此处调用的ProcessEvent函数的代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
switch et := e.(type) {
...
case *RequestBatch:
err = instance.recvRequestBatch(et)
...
}
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch
func (instance *pbftCore) recvRequestBatch(reqBatch *RequestBatch) error {
...
if instance.primary(instance.view) == instance.id && instance.activeView {
instance.nullRequestTimer.Stop()
instance.sendPrePrepare(reqBatch, digest) // L623
}...
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare
func (instance *pbftCore) sendPrePrepare(reqBatch *RequestBatch, digest string) {
...
preprep := &PrePrepare{
...
}
...
instance.innerBroadcast(&Message{Payload: &Message_PrePrepare{PrePrepare: preprep}}) // L667
...
}
ProcessEvent函数进一步调用recvRequestBatch函数,后者又进一步调用sendPrePrepare函数。sendPrePrepare中生成了一个PrePrepare类型的变量prepare,并以prepare变量为参数调用innerBroadcast函数。innerBroadcast函数的代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast
func (instance *pbftCore) innerBroadcast(msg *Message) error {
...
doByzantine := false
...
if doByzantine {
...
} else {
instance.consumer.broadcast(msgRaw) // L1313
}
...
}
innerBroadcast函数中定义了一个doByzantine变量,该变量决定了当前节点是否进行Byzantine攻击行为。此处,我们假设其不会进行Byzantine攻击,因而进入到L1313行调用broadcast函数。 L1313行中的consumer的类型也是一个接口。回顾makePBFTNetwork函数的定义(代码如下所示),该函数在L165行定义了一个simpleConsumer类型的变量,将该变量赋值给pesc字段,并将后者作为参数传入到newPbftCore函数中。newPbftCore函数的代码也如下所示,由代码可知:newPbftCore函数将pe.sc/simpleConsumer类型变量赋值给了instanceconsumer字段。 因而,innerBroadcast函数中对broadcast函数的调用,实际上是调用simpleConsumer类型的broadcast函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// [pbft-core_mock_test.go] makePBFTNetwork
func makePBFTNetwork(N int, config *viper.Viper) *pbftNetwork { // L151
...
endpointFunc := func(id uint64, net *testnet) endpoint {
...
pe.sc = &simpleConsumer{ // L165
pe: pe,
}

pe.pbft = newPbftCore(id, config, pe.sc, events.NewTimerFactoryImpl(pe.manager)) // L169
...
}
...
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [pbft-core.go] newPbftCore
func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events.TimerFactory) *pbftCore {
...
instance.consumer = consumer
...
}
simpleConsumer类型的broadcast函数的相关代码如下所示。沿着函数调用的链路可知,节点0最终调用了internalQueueMessage函数。后者将tm消息传入通道变量queue中。由broadcastFilter函数的L149行代码可知,变量queue实际上是net.msgs。因而,broadcast函数最终将消息放到了netmsgs通道中。 此外,在L149行的taggedMsg消息构造时,传入-1作为参数,该参数将赋值给taggedMsg消息的dst字段。该参数表明当前消息传输以广播形式进行。该参数在后面还会提及。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast
func (sc *simpleConsumer) broadcast(msgPayload []byte) {
sc.pe.Broadcast(&pb.Message{Payload: msgPayload}, pb.PeerEndpoint_VALIDATOR)
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast
func (ep *testEndpoint) Broadcast(msg *pb.Message, peerType pb.PeerEndpoint_Type) error {
ep.net.broadcastFilter(ep, msg.Payload)
return nil
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast -> broadcastFilter
func (net *testnet) broadcastFilter(ep *testEndpoint, payload []byte) {
...
if payload != nil {
...
internalQueueMessage(net.msgs, taggedMsg{int(ep.id), -1, payload}) // L149
...
}...
}

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop -> Inject -> SendEvent -> [pbft-core.go] ProcessEvent -> recvRequestBatch -> sendPrePrepare -> innerBroadcast -> [pbft-core_mock_test.go] broadcast ->[mock_network_test.go] Broadcast -> broadcastFilter -> internalQueueMessage
func internalQueueMessage(queue chan<- taggedMsg, tm taggedMsg) {
select {
case queue <- tm:
...
}
}
到这里,大家会发现:兜兜转转,最终Preprepare消息的广播还是依靠net实例来完成。

前面介绍了消息是如何一步步传入到net.msgs通道中,那么必然有一个地方实现了将消息从net.msgs通道中读出来。 此时,我们重新回到TestNetwork函数,其相关代码摘录如下:

1
2
3
4
5
6
// [pbft-core_test.go]
func TestNetwork(t *testing.T) {
...
err := net.process() // L264
...
}
TestNetwork函数在L264行调用了process函数,该函数通过轮询的方式反复从net实例的msgs通道中读取消息。相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// [mock_network_test.go] process
func (net *testnet) process() error {
...
for {
select {
case msg, ok := <-net.msgs: // L221
...
if !net.processMessageFromChannel(msg, ok) { // L224
...
}
...
default: // L231
...
select { // L248
case msg, ok := <-net.msgs: // L249
...
if !net.processMessageFromChannel(msg, ok) { // L251
...
}
...
}
}
}
}
process函数中L221行的case和L231行的dafult case很相像,可以说后者的代码包含了前者的代码。这样做的目的是default用于检测当前测试已经完成,从而优雅地从当前process函数中返回。关于default中如何检测当前测试是否已经完成的代码,留到IV.F小节进行介绍。 回到节点0Preprepare消息的广播,由于在process函数刚调用时,net.msgs通道中没有消息,因而节点0此时进入default case执行。其在L248行又一次使用selectmsgs通道中获取消息。由于该select中未定义default case,因而其一定会阻塞在L249行(或发生100毫秒的超时,超时的相关代码被省略了),直到从msgs通道中获取到消息msg,继而将msg作为参数调用processMessageFromChannel函数,如L251行代码所示。processMessageFromChannel函数代码及相关代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// [mock_network_test.go] process -> processMessageFromChannel
func (net *testnet) processMessageFromChannel(msg taggedMsg, ok bool) bool {
...
net.deliverFilter(msg)
return true
}

// [mock_network_test.go] proces -> processMessageFromChannel -> deliverFilter
func (net *testnet) deliverFilter(msg taggedMsg) { // L156
...
senderHandle := net.endpoints[msg.src].getHandle() // L158
if msg.dst == -1 { // L159
...
for id, ep := range net.endpoints { // L163
...
lep := ep
go func() { // L167
...
lep.deliver(payload, senderHandle) // L184
...
}
}
}
}
processMessageFromChannel函数进一步调用了deliverFilter函数。 deliverFilter函数主要实现了对消息msg的发送。前面我们提到,当msg中的dst值是-1时,表示该消息将被广播到其他的所有节点,如L159行所示。L163行对所有的节点进行遍历,针对每个节点启动一个go rountine,如L167行所示。 需要注意的是,每个ep/lep都对应着一个节点的endpoint。接下来便是借助于这些endpoint,将消息发送给对应的节点。 回到L184行的代码。该go routine中调用deliver函数实现msg的发送。该函数除了传入payload参数之外,还传入了senderHandle参数。 senderHandle变量在L158行定义,该行代码调用了endpoint中的getHandle函数。getHandle函数在testEndpoint实现类中的定义如下所示:
1
2
3
4
// [mock_network_test.go] process -> processMessageFromChannel -> deliverFilter -> getHandle
func (ep *testEndpoint) getHandle() *pb.PeerID {
return &pb.PeerID{Name: fmt.Sprintf("vp%d", ep.id)}
}
容易看出getHandle函数只是生成了一个PeerID类型的变量,后者其实是对字符串进行了一次封装。 下面回到deliverFilter函数中的L184行代码中的deliver函数。deliver其实是接口endpoint中定义的一个函数,这里lep变量的实际类型是pbftEndpoint。 因而,该函数在pbftEndpoint实现类中的定义如下所示:
1
2
3
4
5
6
// [mock_network_test.go] process -> processMessageFromChannel -> deliverFilter -> deliver
func (pe *pbftEndpoint) deliver(msgPayload []byte, senderHandle *pb.PeerID) {
senderID, _ := getValidatorID(senderHandle) // L36
...
pe.manager.Queue() <- &pbftMessage{msg: msg, sender: senderID} // L44
}
L36行调用getValidatorID函数获取当前消息传送的发起者ID。然后构建pbftMessage消息,并在L44行将该消息发送到特定节点的events变量中。

综上所述,节点0发送Preprepare的流程大致可概括如下:1)节点0Preprepare消息通过netmsgs通道发送给net实例,2)然后net实例基于其维护的各个节点的endpoint,将Preprepare消息发送到每个节点的events通道中。