上一篇博客btcd源码解析——peer节点之间的区块数据同步 (1)中,我们介绍了peer A是如何发起数据同步的请求的,当时讲到存在两种模式:headersFirstMode
模式和非headersFirstMode
模式。本篇博客,我们将介绍headersFirstMode
模式下数据同步的细节。
I. headersFirstMode模式下的数据同步过程
总的来说,headersFirstMode
模式下数据同步可分为以下六步,如下图所示: 数据同步过程中涉及的数据结构如下图所示:
以下分六个步骤介绍
headersFirstMode
模式下的数据同步过程
A. peer A 发送"获取区块头"的请求
该模式下将首先调用PushGetHeadersMsg
函数发送”获取区块头“的请求,该函数代码如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go]
func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator,
stopHash *chainhash.Hash) error {
...
msg := wire.NewMsgGetHeaders() // L917
msg.HashStop = *stopHash
for _, hash := range locator {
err := msg.AddBlockLocatorHash(hash)
if err != nil {
return err
}
} // L924
p.QueueMessage(msg, nil) // L925
...
}msg
),并通过L925行的QueueMessage
函数将该请求发送出去。 QueueMessage
函数只是对QueueMessageWithEncoding
函数进行了调用,两者的代码分别如下所示: 1
2
3
4
5
6// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler ->
// handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go] ->
// QueueMessage
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}){
p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}1
2
3
4
5
6
7
8// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler ->
// handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go] ->
// QueueMessage -> QueueMessageWithEncoding
func (p *Peer) QueueMessageWithEncoding(...){
...
p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan:
doneChan} // L1832
}outMsg
的形式发送到outputQueue
管道中,该管道另一端连接Peer.queueHandler()
函数。queueHandler()
函数在Peer.start [peer.go]
中的L2101行被调用,函数定义的代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// queueHandler [peer.go]
func (p *Peer) queueHandler() {
...
waiting := false
...
queuePacket := func(...) bool { // L1571
if !waiting {
p.sendQueue <- msg
} else {
list.PushBack(msg)
}
// we are always waiting now.
return true
} // L1579
out:
for {
select {
case msg := <-p.outputQueue:
waiting = queuePacket(msg, pendingMsgs, waiting) // L1584
...
}
}
...
}queuePacket
函数将msg
发送出去,该函数定义在L1571-1579之间。当waiting
为false
时,该函数将msg
通过sendQueue
管道发送出去,sendQueue
管道另一端连接着Peer.outHanlder
函数。outHanlder()
函数在Peer.start [peer.go]
中的L2102行被调用,函数定义的代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14// queueHandler [peer.go] -> outHandler
func (p *Peer) outHandler() {
out:
for {
select {
case msg := <-p.sendQueue:
switch m := msg.msg.(type) {
...
err := p.writeMessage(msg.msg, msg.encoding) // L1732
...
}
}
...
}outHandler
主要在L1732行调用writeMessage
函数,后者又主要调用WriteMessageWithEncodingN
函数。 WriteMessageWithEncodingN
函数主要是将两部分数据写入到连接中:
- 消息头:
messageHeader [message.go]
。需要注意的是,messageHeader
中存在一个command
字段,该字段通过调用msg.Command()
函数赋值。在当前场景下,msg
的实际类型是MsgGetHeaders
,从而Command
函数返回值为CmdGetHeaders
(见msggetheaders.go
文件中L116行) - 消息体:
payload
B. peer B 响应"获取区块头"的请求
作为P2P
连接中数据的响应端,peer B
运行了一个inHandler
协程,用于接收其他peer
发来的消息,该inHandler
协程在Peer.start
函数中启动,启动代码如下所示: 1
2
3
4
5
6
7
8// start [peer.go]
func (p *Peer) start() error {
...
go p.inHandler()
go p.queueHandler()
go p.outHandler()
...
}inHandler
函数代码如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// start [peer.go] -> inHandler
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgGetAddr:
...
case *wire.MsgHeaders: // L1442
if p.cfg.Listeners.OnHeaders != nil {
p.cfg.Listeners.OnHeaders(p, msg) // L1444
}
...
case *wire.MsgGetHeaders: // L1462
if p.cfg.Listeners.OnGetHeaders != nil {
p.cfg.Listeners.OnGetHeaders(p, msg) // L1464
}
...
}
...
}
...
}readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数,ReadMessageWithEncodingN
定义如下: 1
2
3
4
5
6
7
8
9
10
11// start [peer.go] -> inHandler -> readMessage -> ReadMessageWithEncodingN [message.go]
func ReadMessageWithEncodingN(r io.Reader, ...) (...) {
...
n, hdr, err := readMessageHeader(r)
...
command := hdr.command // L363
...
msg, err := makeEmptyMessage(command) // L371
...
return totalBytes, msg, payload, nil
}command
字段。回忆一下,该字段在第I.A小节中被赋值为CmdGetHeaders
,因此在L371行生成的msg
将是MsgGetHeaders
类型。
回到Peer.inHandler
函数中,L1462行的case
得到调用,并调用L1464行的OnGetHeaders
函数。该函数被赋值为serverPeer.OnGetHeaders
, 函数定义如下所示: 1
2
3
4
5
6
7
8
9
10
11
12// start [peer.go] -> inHandler -> OnGetHeaders [server.go]
func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg
*wire.MsgGetHeaders) {
chain := sp.server.chain
headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop
...
blockHeaders := make([]*wire.BlockHeader, len(headers))
for i := range headers {
blockHeaders[i] = &headers[i]
}
sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) // L794
}OnGetHeaders
函数只是简单得将headers
封装成MsgHeaders
,然后通过函数QueueMessage
发送出去。QueueMessage
函数我们在I.A节已经介绍过,其一步步经过QueueMessageWithEncoding
函数, outputQueue
管道, queuePacket
函数, sendQueue
管道, writeMessage
函数, 最终在WriteMessageWithEncodingN
函数中将数据发送出去。 和第I.A小节不同的是,在当前场景下,WriteMessageWithEncodingN
函数中消息头hdr
的command
被赋值为CmdHeaders
.
C. peer A 处理 "获取区块头"的返回数据
和第I.B小节类似的,inHandler
函数使用readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数。 所不同的是,这一次ReadMessageWithEncodingN
函数中L363行返回CmdHeaders
;相应地,L371行生成的msg
将是MsgHeaders
类型。 回到Peer.inHandler
函数中,L1442行的case
得到调用,并调用L1444行的OnHeaders
函数。该函数被赋值为serverPeer.OnHeaders
, 函数定义如下所示: 1
2
3
4// start [peer.go] -> inHandler -> OnHeaders [server.go]
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
sp.server.syncManager.QueueHeaders(msg, sp.Peer)
}QueueHeaders
函数,后者定义如下: 1
2
3
4
5
6
7// start [peer.go] -> inHandler -> OnHeaders [server.go] ->
// QueueHeaders [manager.go]
func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer
*peerpkg.Peer) {
...
sm.msgChan <- &headersMsg{headers: headers, peer: peer} // L1492
}headers
数据封装成headersMsg
发送到msgChan
通道中,该通道的另外一端连接SyncManager.blockHandler
函数。上一篇博客 btcd源码解析——peer节点之间的区块数据同步 (1) 中,我们已经讨论过msgChan
管道在newPeerMsg
类型时的处理方式,现在再来看一下在headersMsg
类型下的处理方式: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// start [peer.go] -> inHandler -> OnHeaders [server.go] ->
// QueueHeaders [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case *headersMsg: // L1295
sm.handleHeadersMsg(msg) // L1296
...
}
...
}
}
...
}handleHeadersMsg
函数的定义如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51// start [peer.go] -> inHandler -> OnHeaders [server.go] ->
// QueueHeaders [manager.go] -> blockHandler -> handleHeadersMsg
func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
...
if !sm.headersFirstMode { // L902
...
peer.Disconnect()
return
} // L907
...
receivedCheckpoint := false // L916
var finalHash *chainhash.Hash
for _, blockHeader := range msg.Headers { // L918
blockHash := blockHeader.BlockHash()
finalHash = &blockHash
...
prevNodeEl := sm.headerList.Back() // L923
...
node := headerNode{hash: &blockHash}
prevNode := prevNodeEl.Value.(*headerNode)
if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { // L935
node.height = prevNode.height + 1
e := sm.headerList.PushBack(&node)
if sm.startHeader == nil {
sm.startHeader = e
}
} else {
...
} // L947
...
if node.height == sm.nextCheckpoint.Height { // L950
if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
receivedCheckpoint = true // L952
...
} else {
...
}
break
}
} // L968
...
if receivedCheckpoint { // L972
...
sm.fetchHeaderBlocks() // L981
return
}
...
locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) // L988
err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) // L989
...
}headersFirstMode
模式:只有在headersFirstMode
模式下才能收到headers
,否则出错。 L916行定义了receivedCheckpoint
变量,该变量用来表示是否收到了CheckPoint
处的header
。 L918-L968行代码对收到的header
挨个进行处理,处理过程分为两部分:
- L923-L947行对
header
进行检查,检查通过后加入到headerList
的队尾。检查的内容是:当前header
的PrevBlock
是否正好是当前headerList
的队尾元素 (L935)。 - L950-L968行判断当前
header
是否是下一个checkpoint
,若是, 则对receivedCheckpoint
赋值。基于receivedCheckpoint
的值,后续进行不同的处理。当receivedCheckpoint
值为true
时 (L972),开始获取blocks
数据 (L981),fetchHeaderBlocks
函数将在第I.D小节介绍;否则,以获取到的最后一个header
作为locator
(L988),开始新一轮的“区块头获取”(L989).
D. peer A 发送 "获取区块"的请求
fetchHeaderBlocks
函数定义如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// fetchHeaderBlocks [manager.go]
func (sm *SyncManager) fetchHeaderBlocks() {
...
gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) // L847
...
for e := sm.startHeader; e != nil; e = e.Next() { // L849
...
node, ok := e.Value.(*headerNode)
...
iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) // L856
haveInv, err := sm.haveInventory(iv) // L857
...
if !haveInv {
...
gdmsg.AddInvVect(iv) // L876
...
}
...
} // L883
if len(gdmsg.InvList) > 0 {
sm.syncPeer.QueueMessage(gdmsg, nil) // L885
}
}MsgGetData
这个数据结构来承载的,该数据结构定义如下: 1
2
3
4// fetchHeaderBlocks [manager.go] -> MsgGetData [msggetdata.go]
type MsgGetData struct {
InvList []*InvVect
}MsgGetData
包含了InvVect
的slice
. InvVect
描述了一份数据,包括数据类型和数据哈希,其定义如下所示: 1
2
3
4
5// fetchHeaderBlocks [manager.go] -> MsgGetData [msggetdata.go] -> InvVect [invvect.go]
type InvVect struct {
Type InvType // Type of data
Hash chainhash.Hash // Hash of the data
}SyncManager.fetchHeaderBlocks
函数中,L847行首先定义了一个MsgGetData
类型的变量 (gdmsg
). L849-L883遍历处理每一个header
。基于每一个header
,L856行调用NewInvVect
函数新建了一个InvVect
变量 (iv
),NewInvVect
函数中将数据类型设置为wire.InvTypeBlock
. L876行将iv
加入到gdmsg
变量中. 如果gdmsg
变量中InvList
的长度大于0, 则调用QueueMessage
函数将“获取数据”的请求发送出去。 QueueMessage
函数在第I.A小节已经介绍过,其依次经由QueueMessageWithEncoding
函数, outputQueue
管道, Peer.queueHandler
函数, queuePacket
函数, sendQueue
管道, writeMessage
函数,和WriteMessageWithEncodingN
函数,将数据发送到peer B
中。 WriteMessageWithEncodingN
函数在写消息头 (hdr
)时,需要对hdr.command
赋值。由于该msg
是MsgGetData
类型,hdr.command
将被赋值为CmdGetData
.
E. peer B 响应 "获取区块"的请求
和I.B小节类似的,inHandler
函数使用readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数。 所不同的是,这一次ReadMessageWithEncodingN
函数中L363行返回CmdGetData
;相应地,L371行生成的msg
将是MsgGetData
类型。 回到Peer.inHandler
函数中,我们重新将Peer.inHandler
的代码贴在下面: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// inHandler [peer.go]
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgGetData: // L1452
if p.cfg.Listeners.OnGetData != nil {
p.cfg.Listeners.OnGetData(p, msg) // L1454
}
...
}
...
}
...
}
L1452行的case
得到调用,并调用L1454行的OnHeaders
函数。该函数被赋值为serverPeer.OnGetData
, 函数定义如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13// inHandler [peer.go] -> OnGetData [server.go]
func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
...
for i, iv := range msg.InvList { // L672
switch iv.Type {
...
case wire.InvTypeBlock: // L689
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) // L690
...
}
} // L715
...
}InvList
中的每一个iv
进行处理。I.D节中介绍过iv.Type
被赋值为wire.InvTypeBlock
,因此L689的case
被执行,pushBlockMsg
函数被调用。pushBlockMsg
函数如下所示: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// inHandler [peer.go] -> OnGetData [server.go] -> pushBlockMsg
func (s *server) pushBlockMsg(...) error {
...
var blockBytes []byte // L1441
err := sp.server.db.View(func(dbTx database.Tx) error {
var err error
blockBytes, err = dbTx.FetchBlock(hash)
return err
}) // L1446
...
var msgBlock wire.MsgBlock // L1458
err = msgBlock.Deserialize(bytes.NewReader(blockBytes)) // L1459
...
sp.QueueMessageWithEncoding(&msgBlock, dc, encoding) // L1483
...
}blockBytes
中,在L1459反序列化为msgBlock
变量,最后通过QueueMessageWithEncoding
函数发送出去 (L1483). QueueMessageWithEncoding
函数我们在I.A节已经介绍过,其一步步经过outputQueue
管道, queuePacket
函数, sendQueue
管道, writeMessage
函数, 最终在WriteMessageWithEncodingN
函数中将数据发送出去。 和I.A节不同的是,在当前场景下,WriteMessageWithEncodingN
函数中消息头hdr
的command
被赋值为CmdBlock
.
需要注意的是,peer B
在返回区块数据给peer A
时,是逐个区块发送数据的,即每个区块调用一次pushBlockMsg
和QueueMessageWithEncoding
函数。
F. peer A 处理 "获取区块"的返回数据
和I.B小节类似的,inHandler
函数使用readMessage
函数从peer
连接中读取数据,该函数进一步调用ReadMessageWithEncodingN
函数。 所不同的是,这一次ReadMessageWithEncodingN
函数中L363行返回CmdBlock
;相应地,L371行生成的msg
将是MsgBlock
类型。 回到Peer.inHandler
函数中,我们重新将Peer.inHandler
的代码贴在下面: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// inHandler [peer.go]
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgBlock: // L1432
if p.cfg.Listeners.OnBlock != nil {
p.cfg.Listeners.OnBlock(p, msg, buf) // L1434
}
...
}
...
}
...
}
L1432行的case
得到调用,并调用L1434行的OnBlock
函数。该函数被赋值为serverPeer.OnBlock
, 函数定义如下所示: 1
2
3
4
5
6
7
8// inHandler [peer.go] -> OnBlock [server.go]
func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
...
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
...
sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
...
}OnBlock
利用QueueBlock
把数据发送出去,QueueBlock
函数的定义如下: 1
2
3
4
5
6// inHandler [peer.go] -> OnBlock [server.go] -> QueueBlock [manager.go]
func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done
chan struct{}) {
...
sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
}QueueBlock
函数利用msgChan
管道传输数据,msgChan
的另一端连接着blockHandler
函数,该部分已经在上一篇博客 btcd源码解析——peer节点之间的区块数据同步 (1)讲述过,我们重新回顾一下代码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// inHandler [peer.go] -> OnBlock [server.go] -> QueueBlock [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case *blockMsg:
sm.handleBlockMsg(msg) // L1289
msg.reply <- struct{}{}
...
}
}
...
}handleBlockMsg
函数被调用,该函数的主要功能就是将block
的数据存储下来。 由于block
的数据存储过程比较复杂,我们在后面会再写一篇博客介绍:从P2P连接中接收到的block
数据是如何存储的。
II. 小结
至此,我们介绍完了headersFirstMode
模式下节点之间的区块数据同步的过程,下一篇博客会介绍 “非headersFirstMode
模式下的区块数据同步的过程”。