上一篇博客btcd源码解析——peer节点之间的区块数据同步(2)——headersFirstMode模式介绍了headersFirstMode
模式下,peer
节点之间的数据同步过程。本篇博客将介绍非headersFirstMode
模式下的数据同步过程。 因为本篇博客中的许多函数都已经在上一篇博客中进行了讲解,且在此不会过多赘述,强烈建议读者在阅读完上一篇博客之后再来阅读本篇博客。
I. 非headersFirstMode模式下的数据同步过程
总的来说,非headersFirstMode
模式下的数据同步可分为以下六步,如下图所示:
对比上一篇博客,容易发现两种模式的大致流程是相同的。
A. peer A 发送“获取区块哈希”的请求
回顾我们在博客btcd源码解析——peer节点之间的区块数据同步 (1)中提及的startSync
函数。我们将相关的代码贴在下面: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// startSync [manager.go]
func (sm *SyncManager) startSync() {
...
if bestPeer != nil {
...
locator, err := sm.chain.LatestBlockLocator()
...
if sm.nextCheckpoint != nil &&
best.Height < sm.nextCheckpoint.Height &&
sm.chainParams != &chaincfg.RegressionNetParams {
bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) // L338
sm.headersFirstMode = true // L339
...
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash) // L344
}
sm.syncPeer = bestPeer
} else {
...
}
}headersFirstMode
模式下的数据同步,L344行代码引入了本篇博客的内容——非headersFirstMode
模式下的数据同步。对比L344行和L338行代码,容易发现,非headersFirstMode
模式直接请求下载block
数据。需要注意的是,这里的函数名和变量名很容易让人误解,尽管函数名和变量名中都暗示是获取的区块数据,但其实获取到的是区块的hash
,这一点也可以在下一节中看出。 L344行调用的PushGetBlocksMsg
函数代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13// startSync [manager.go] -> PushGetBlocksMsg [peer.go]
func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
...
msg := wire.NewMsgGetBlocks(stopHash) // L873
for _, hash := range locator { // L874
err := msg.AddBlockLocatorHash(hash) // L875
if err != nil {
return err
}
} // L879
p.QueueMessage(msg, nil) // L880
...
}MsgGetBlocks
变量 (msg
),该变量承载了用于发送到peer B
的数据。 L874-L879行将locator
中的每一个hash
加入到msg
中的BlockLocatorHashes
字段中,其中locator
变量我们也已经在上一篇博客中进行了讲解,这里不再赘述。 L880行将该msg
通过QueueMessage
函数发送出去,QueueMessage
函数也已经在上一篇博客的I.A小节进行了讲述,其一步步经过QueueMessageWithEncoding
函数, outputQueue
管道, queuePacket
函数, sendQueue
管道, writeMessage
函数, 最终在WriteMessageWithEncodingN
函数中将数据发送出去。 与上一篇博客的I.A小节不同的是,在当前场景下,WriteMessageWithEncodingN
函数中消息头hdr
的command
被赋值为CmdGetBlocks
,因为当前的msg
是一个MsgGetBlocks
类型的变量。
B. peer B 响应“获取区块哈希”的请求
和上一篇博客的I.B小节类似,作为P2P连接中数据的响应端,peer B
运行了一个inHandler
协程,用于接收其他peer
发来的消息,该inHandler
协程在Peer.start
函数中启动,启动代码如下所示: 1
2
3
4
5
6
7
8// start [peer.go]
func (p *Peer) start() error {
...
go p.inHandler()
go p.queueHandler()
go p.outHandler()
...
}inHandler
函数代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 // start [peer.go] -> inHandler
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgGetBlocks: // L1457
if p.cfg.Listeners.OnGetBlocks != nil {
p.cfg.Listeners.OnGetBlocks(p, msg) // L1459
}
...
}
...
}
...
}readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数。ReadMessageWithEncodingN
函数的代码如下: 1
2
3
4
5
6
7
8
9
10
11
12// start [peer.go] -> inHandler -> readMessage ->
// ReadMessageWithEncodingN [message.go]
func ReadMessageWithEncodingN(r io.Reader, ...) (...) {
...
n, hdr, err := readMessageHeader(r)
...
command := hdr.command // L363
...
msg, err := makeEmptyMessage(command) // L371
...
return totalBytes, msg, payload, nil
}CmdGetBlocks
;相应地,L371行生成的msg
将是MsgGetBlocks
类型。 回到inHandler
函数中,L1457行的case
被执行,并调用L1459行的OnGetBlocks
行数,该函数被赋值为serverPeer.OnGetBlocks
,函数定义如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// start [peer.go] -> inHandler -> OnGetBlocks [server.go]
func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
...
chain := sp.server.chain
hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
wire.MaxBlocksPerMsg) // L744
...
invMsg := wire.NewMsgInv() // L747
for i := range hashList { // L748
iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i]) // L749
invMsg.AddInvVect(iv) // L750
} // L751
...
if len(invMsg.InvList) > 0 {
...
sp.QueueMessage(invMsg, nil) // L764
}
}BlockLocatorHashes
和HashStop
字段返回相应区块的hash
,赋值给hashList
变量。 L747行新建了一个MsgInv
类型的变量 (invMsg
),该变量用于承载响应数据。MsgInv
的定义和上一篇博客I.D小节的MsgGetData
类型的定义是一样的,如下所示: 1
2
3
4// MsgInv [msginv.go]
type MsgInv struct {
InvList []*InvVect
}OnGetBlocks
函数中,L748-L751行针对hashList
中的每一个hash
值生成一个InvVect
变量,并添加到invMsg
变量中。 L764行将invMsg
变量通过QueueMessage
函数发送给peer A
. QueueMessage
函数也已经在上一篇博客的I.A小节进行了讲述,其一步步经过QueueMessageWithEncoding
函数, outputQueue
管道, queuePacket
函数, sendQueue
管道, writeMessage
函数, 最终在WriteMessageWithEncodingN
函数中将数据发送出去。 与上一篇博客的I.A小节不同的是,在当前场景下,WriteMessageWithEncodingN
函数中消息头hdr
的command
被赋值为CmdInv
,因为当前的msg
是一个MsgInv
类型的变量。
C. peer A 处理“获取区块哈希”的返回数据,并发送“获取区块”的请求
和上一小节类似,peer A
也通过inHandler
协程接收其他peer
发来的消息。 inHandler
函数使用readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数。 所不同的是,这一次ReadMessageWithEncodingN
函数中L363行返回CmdInv
;相应地,L371行生成的msg
将是MsgInv
类型。 因此,Peer.inHandler
函数中,L1437行的case
得到执行;相应地,OnInv
函数得到调用。该函数被赋值为serverPeer.OnInv
,函数定义如下所示: 1
2
3
4
5
6// OnInv [server.go]
func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
...
sp.server.syncManager.QueueInv(msg, sp.Peer) // L613
...
}QueueInv
函数对接收到的msg
进行处理,QueueInv
函数定义如下: 1
2
3
4
5// OnInv [server.go] -> QueueInv [manager.go]
func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
...
sm.msgChan <- &invMsg{inv: inv, peer: peer}
}QueneInv
函数将inv
(msg
)通过msgChan
管道发送出去,管道的另一端连接着SyncManager.blockHandler
函数。该部分我们也已经在上一篇博客中介绍过了,这里我们再回顾一下。SyncManager.blockHandler
函数的定义如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// OnInv [server.go] -> QueueInv [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case *invMsg: // L1292
sm.handleInvMsg(msg) // L1293
...
}
...
}
}
...
}case
得到执行。相应地,L1293行的handleInvMsg
函数得到调用。handleInvMsg
函数定义如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// OnInv [server.go] -> QueueInv [manager.go] -> blockHandler -> handleInvMsg
func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
...
lastBlock := -1 // L1060
invVects := imsg.inv.InvList
for i := len(invVects) - 1; i >= 0; i-- { // L1062
if invVects[i].Type == wire.InvTypeBlock {
lastBlock = i
break
}
} // L1067
invVects := imsg.inv.InvList
...
for i, iv := range invVects { // L1097
...
if sm.headersFirstMode { // L1113
continue
} // L1115
...
haveInv, err := sm.haveInventory(iv) // L1118
...
if !haveInv {
...
state.requestQueue = append(state.requestQueue, iv) // L1143
continue
}
if iv.Type == wire.InvTypeBlock { // L1147
...
if i == lastBlock { // L1178
...
locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
peer.PushGetBlocksMsg(locator, &zeroHash)
}
} // L1185
} // L1186
...
gdmsg := wire.NewMsgGetData() // L1191
requestQueue := state.requestQueue
for len(requestQueue) != 0 { // L1193
iv := requestQueue[0]
requestQueue[0] = nil
requestQueue = requestQueue[1:]
switch iv.Type {
...
case wire.InvTypeBlock:
...
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
...
gdmsg.AddInvVect(iv) // L1233
}
}
...
} // L1241
state.requestQueue = requestQueue
if len(gdmsg.InvList) > 0 {
peer.QueueMessage(gdmsg, nil) // L1244
}
}lastBlock
变量,该变量记录了imsg
中最后一个block
的序号(L1062-L1067行进行了变量更新)。 L1093-L1186行对接收到的imsg
中的每一个iv进行处理:
- 首先,L1113-L1115行表明如果当前模式是
headersFisrtMode
模式,则忽略当前的iv
。 - 其次,L1147判断当前iv是否是
InvTypeBlock
类型。如果是,则进行后续处理。L1178行判断是否当前inv
是imsg
中最后一个block
的inv
,如果是,则继续下一轮的区块获取过程。 - 然后,L1191行定义了一个
MsgGetData
类型的变量 (gdmsg
), 该变量承载了用于获取区块的请求。MsgGetData
数据类型在上一篇博客中也已经介绍过了,这里不再赘述。L1193-L1141行将符合条件的iv添加到gdmsg
变量中。 - 最后,L1244行通过
QueueMessage
函数将gdmsg
变量发送出去。QueueMessage
函数也已经在上一篇博客的I.A小节进行了讲述。再啰嗦一遍,其一步步经过QueueMessageWithEncoding
函数,outputQueue
管道,queuePacket
函数, sendQueue管道,writeMessage
函数, 最终在WriteMessageWithEncodingN
函数中将数据发送出去。
与上一篇博客的I.A小节不同的是,在当前场景下,WriteMessageWithEncodingN
函数中消息头hdr
的command
被赋值为CmdGetData
,因为当前的msg
是一个MsgGetData
类型的变量。
D. peer B 响应“获取区块”的请求
该过程完全和上一篇博客的I.E小节相同,此处不再赘述。
E. peer A 处理“获取区块”的返回数据
该过程完全和上一篇博客的I.F小节相同,此处不再赘述。
II. 小结
至此,我们介绍完了非headersFirstMode
模式下节点之间的区块数据同步的过程。
和上一篇博客headersFirstMode
模式比较,两种模式在后半部分的处理过程完全一致。前半部分的处理中,非headersFirstMode
模式相对来说要简单一些。具体而言,headersFirstMode
模式中的peer A
在处理第一步请求的返回数据和发起第二步请求较为复杂,我们在上一篇博客中分为了两个小节 (I.C小节和I.D小节)进行讲述;相反,非headersFirstMode
模式下peer A
在处理第一步请求的返回数据和发起第二步请求时较为简单,我们在本片博客中只用了一个小节 (I.C小节) 进行了讲述。