0%

btcd源码解析——peer节点之间的区块数据同步 (2) —— headersFirstMode模式

上一篇博客btcd源码解析——peer节点之间的区块数据同步 (1)中,我们介绍了peer A是如何发起数据同步的请求的,当时讲到存在两种模式:headersFirstMode模式和非headersFirstMode模式。本篇博客,我们将介绍headersFirstMode模式下数据同步的细节。

I. headersFirstMode模式下的数据同步过程

总的来说,headersFirstMode模式下数据同步可分为以下六步,如下图所示: 数据同步过程中涉及的数据结构如下图所示: 以下分六个步骤介绍headersFirstMode模式下的数据同步过程

A. peer A 发送"获取区块头"的请求

该模式下将首先调用PushGetHeadersMsg函数发送”获取区块头“的请求,该函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go]
func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator,
stopHash *chainhash.Hash) error {
...
msg := wire.NewMsgGetHeaders() // L917
msg.HashStop = *stopHash
for _, hash := range locator {
err := msg.AddBlockLocatorHash(hash)
if err != nil {
return err
}
} // L924
p.QueueMessage(msg, nil) // L925
...
}
L917-L924行构造了用于获取”获取区块头“的请求消息 (msg),并通过L925行的QueueMessage函数将该请求发送出去。 QueueMessage函数只是对QueueMessageWithEncoding函数进行了调用,两者的代码分别如下所示:
1
2
3
4
5
6
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler ->
// handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go] ->
// QueueMessage
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}){
p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}
1
2
3
4
5
6
7
8
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler ->
// handleNewPeerMsg -> startSync -> PushGetHeadersMsg [peer.go] ->
// QueueMessage -> QueueMessageWithEncoding
func (p *Peer) QueueMessageWithEncoding(...){
...
p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan:
doneChan} // L1832
}
L1832行代码将数据以outMsg的形式发送到outputQueue管道中,该管道另一端连接Peer.queueHandler()函数。queueHandler()函数在Peer.start [peer.go] 中的L2101行被调用,函数定义的代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// queueHandler [peer.go]
func (p *Peer) queueHandler() {
...
waiting := false
...
queuePacket := func(...) bool { // L1571
if !waiting {
p.sendQueue <- msg
} else {
list.PushBack(msg)
}
// we are always waiting now.
return true
} // L1579
out:
for {
select {
case msg := <-p.outputQueue:
waiting = queuePacket(msg, pendingMsgs, waiting) // L1584
...
}
}
...
}
L1584通过调用queuePacket函数将msg发送出去,该函数定义在L1571-1579之间。当waitingfalse时,该函数将msg通过sendQueue管道发送出去,sendQueue管道另一端连接着Peer.outHanlder函数。outHanlder()函数在Peer.start [peer.go] 中的L2102行被调用,函数定义的代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// queueHandler [peer.go] -> outHandler
func (p *Peer) outHandler() {
out:
for {
select {
case msg := <-p.sendQueue:
switch m := msg.msg.(type) {
...
err := p.writeMessage(msg.msg, msg.encoding) // L1732
...
}
}
...
}
outHandler主要在L1732行调用writeMessage函数,后者又主要调用WriteMessageWithEncodingN函数。 WriteMessageWithEncodingN函数主要是将两部分数据写入到连接中:

  • 消息头: messageHeader [message.go]。需要注意的是,messageHeader中存在一个command字段,该字段通过调用msg.Command()函数赋值。在当前场景下,msg的实际类型是MsgGetHeaders,从而Command函数返回值为CmdGetHeaders (见msggetheaders.go文件中L116行)
  • 消息体: payload

B. peer B 响应"获取区块头"的请求

作为P2P连接中数据的响应端,peer B运行了一个inHandler协程,用于接收其他peer发来的消息,该inHandler协程在Peer.start函数中启动,启动代码如下所示:

1
2
3
4
5
6
7
8
// start [peer.go]
func (p *Peer) start() error {
...
go p.inHandler()
go p.queueHandler()
go p.outHandler()
...
}
inHandler函数代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// start [peer.go] -> inHandler
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgGetAddr:
...
case *wire.MsgHeaders: // L1442
if p.cfg.Listeners.OnHeaders != nil {
p.cfg.Listeners.OnHeaders(p, msg) // L1444
}
...
case *wire.MsgGetHeaders: // L1462
if p.cfg.Listeners.OnGetHeaders != nil {
p.cfg.Listeners.OnGetHeaders(p, msg) // L1464
}
...
}
...
}
...
}
L1334行使用readMessage函数从peer连接中读取数据,该函数进一步调用ReadMessageWithEncodingN函数,ReadMessageWithEncodingN定义如下:
1
2
3
4
5
6
7
8
9
10
11
// start [peer.go] -> inHandler -> readMessage -> ReadMessageWithEncodingN [message.go]
func ReadMessageWithEncodingN(r io.Reader, ...) (...) {
...
n, hdr, err := readMessageHeader(r)
...
command := hdr.command // L363
...
msg, err := makeEmptyMessage(command) // L371
...
return totalBytes, msg, payload, nil
}
L363行读取消息头中的command字段。回忆一下,该字段在第I.A小节中被赋值为CmdGetHeaders,因此在L371行生成的msg将是MsgGetHeaders类型。

回到Peer.inHandler函数中,L1462行的case得到调用,并调用L1464行的OnGetHeaders函数。该函数被赋值为serverPeer.OnGetHeaders, 函数定义如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
// start [peer.go] -> inHandler -> OnGetHeaders [server.go]
func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg
*wire.MsgGetHeaders) {
chain := sp.server.chain
headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop
...
blockHeaders := make([]*wire.BlockHeader, len(headers))
for i := range headers {
blockHeaders[i] = &headers[i]
}
sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) // L794
}
OnGetHeaders函数只是简单得将headers封装成MsgHeaders,然后通过函数QueueMessage发送出去。QueueMessage函数我们在I.A节已经介绍过,其一步步经过QueueMessageWithEncoding函数, outputQueue管道, queuePacket函数, sendQueue管道, writeMessage函数, 最终在WriteMessageWithEncodingN函数中将数据发送出去。 和第I.A小节不同的是,在当前场景下,WriteMessageWithEncodingN函数中消息头hdrcommand被赋值为CmdHeaders.

C. peer A 处理 "获取区块头"的返回数据

和第I.B小节类似的,inHandler函数使用readMessage函数从peer连接中读取数据,该函数进一步调用ReadMessageWithEncodingN函数。 所不同的是,这一次ReadMessageWithEncodingN函数中L363行返回CmdHeaders;相应地,L371行生成的msg将是MsgHeaders类型。 回到Peer.inHandler函数中,L1442行的case得到调用,并调用L1444行的OnHeaders函数。该函数被赋值为serverPeer.OnHeaders, 函数定义如下所示:

1
2
3
4
// start [peer.go] -> inHandler -> OnHeaders [server.go]
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
sp.server.syncManager.QueueHeaders(msg, sp.Peer)
}
该函数调用QueueHeaders函数,后者定义如下:
1
2
3
4
5
6
7
// start [peer.go] -> inHandler -> OnHeaders [server.go] -> 
// QueueHeaders [manager.go]
func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer
*peerpkg.Peer) {
...
sm.msgChan <- &headersMsg{headers: headers, peer: peer} // L1492
}
该函数在L1942行将headers数据封装成headersMsg发送到msgChan通道中,该通道的另外一端连接SyncManager.blockHandler函数。上一篇博客 btcd源码解析——peer节点之间的区块数据同步 (1) 中,我们已经讨论过msgChan管道在newPeerMsg类型时的处理方式,现在再来看一下在headersMsg类型下的处理方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// start [peer.go] -> inHandler -> OnHeaders [server.go] -> 
// QueueHeaders [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case *headersMsg: // L1295
sm.handleHeadersMsg(msg) // L1296
...
}
...
}
}
...
}
handleHeadersMsg函数的定义如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// start [peer.go] -> inHandler -> OnHeaders [server.go] -> 
// QueueHeaders [manager.go] -> blockHandler -> handleHeadersMsg
func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
...
if !sm.headersFirstMode { // L902
...
peer.Disconnect()
return
} // L907
...
receivedCheckpoint := false // L916
var finalHash *chainhash.Hash
for _, blockHeader := range msg.Headers { // L918
blockHash := blockHeader.BlockHash()
finalHash = &blockHash
...
prevNodeEl := sm.headerList.Back() // L923
...
node := headerNode{hash: &blockHash}
prevNode := prevNodeEl.Value.(*headerNode)
if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { // L935
node.height = prevNode.height + 1
e := sm.headerList.PushBack(&node)
if sm.startHeader == nil {
sm.startHeader = e
}
} else {
...
} // L947
...
if node.height == sm.nextCheckpoint.Height { // L950
if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
receivedCheckpoint = true // L952
...
} else {
...
}
break
}
} // L968
...
if receivedCheckpoint { // L972
...
sm.fetchHeaderBlocks() // L981
return
}
...
locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) // L988
err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) // L989
...
}
L902首先判断当前模式是否为headersFirstMode模式:只有在headersFirstMode模式下才能收到headers,否则出错。 L916行定义了receivedCheckpoint变量,该变量用来表示是否收到了CheckPoint处的header。 L918-L968行代码对收到的header挨个进行处理,处理过程分为两部分:

  1. L923-L947行对header进行检查,检查通过后加入到headerList的队尾。检查的内容是:当前headerPrevBlock是否正好是当前headerList的队尾元素 (L935)。
  2. L950-L968行判断当前header是否是下一个checkpoint,若是, 则对receivedCheckpoint赋值。基于receivedCheckpoint的值,后续进行不同的处理。当receivedCheckpoint值为true时 (L972),开始获取blocks数据 (L981),fetchHeaderBlocks函数将在第I.D小节介绍;否则,以获取到的最后一个header作为locator (L988),开始新一轮的“区块头获取”(L989).

D. peer A 发送 "获取区块"的请求

fetchHeaderBlocks函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// fetchHeaderBlocks [manager.go]
func (sm *SyncManager) fetchHeaderBlocks() {
...
gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) // L847
...
for e := sm.startHeader; e != nil; e = e.Next() { // L849
...
node, ok := e.Value.(*headerNode)
...
iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) // L856
haveInv, err := sm.haveInventory(iv) // L857
...
if !haveInv {
...
gdmsg.AddInvVect(iv) // L876
...
}
...
} // L883
if len(gdmsg.InvList) > 0 {
sm.syncPeer.QueueMessage(gdmsg, nil) // L885
}
}
“获取数据”的请求是通过MsgGetData这个数据结构来承载的,该数据结构定义如下:
1
2
3
4
// fetchHeaderBlocks [manager.go] -> MsgGetData [msggetdata.go]
type MsgGetData struct {
InvList []*InvVect
}
容易看出,MsgGetData包含了InvVectslice. InvVect描述了一份数据,包括数据类型和数据哈希,其定义如下所示:
1
2
3
4
5
// fetchHeaderBlocks [manager.go] -> MsgGetData [msggetdata.go] -> InvVect [invvect.go]
type InvVect struct {
Type InvType // Type of data
Hash chainhash.Hash // Hash of the data
}
回到SyncManager.fetchHeaderBlocks函数中,L847行首先定义了一个MsgGetData类型的变量 (gdmsg). L849-L883遍历处理每一个header。基于每一个header,L856行调用NewInvVect函数新建了一个InvVect变量 (iv),NewInvVect函数中将数据类型设置为wire.InvTypeBlock. L876行将iv加入到gdmsg变量中. 如果gdmsg变量中InvList的长度大于0, 则调用QueueMessage函数将“获取数据”的请求发送出去。 QueueMessage函数在第I.A小节已经介绍过,其依次经由QueueMessageWithEncoding函数, outputQueue管道, Peer.queueHandler函数, queuePacket函数, sendQueue管道, writeMessage函数,和WriteMessageWithEncodingN函数,将数据发送到peer B中。 WriteMessageWithEncodingN函数在写消息头 (hdr)时,需要对hdr.command赋值。由于该msgMsgGetData类型,hdr.command将被赋值为CmdGetData.

E. peer B 响应 "获取区块"的请求

和I.B小节类似的,inHandler函数使用readMessage函数从peer连接中读取数据,该函数进一步调用ReadMessageWithEncodingN函数。 所不同的是,这一次ReadMessageWithEncodingN函数中L363行返回CmdGetData;相应地,L371行生成的msg将是MsgGetData类型。 回到Peer.inHandler函数中,我们重新将Peer.inHandler的代码贴在下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// inHandler [peer.go]
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgGetData: // L1452
if p.cfg.Listeners.OnGetData != nil {
p.cfg.Listeners.OnGetData(p, msg) // L1454
}
...
}
...
}
...
}

L1452行的case得到调用,并调用L1454行的OnHeaders函数。该函数被赋值为serverPeer.OnGetData, 函数定义如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
// inHandler [peer.go] -> OnGetData [server.go]
func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
...
for i, iv := range msg.InvList { // L672
switch iv.Type {
...
case wire.InvTypeBlock: // L689
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) // L690
...
}
} // L715
...
}
L672-L715对InvList中的每一个iv进行处理。I.D节中介绍过iv.Type被赋值为wire.InvTypeBlock,因此L689的case被执行,pushBlockMsg函数被调用。pushBlockMsg函数如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// inHandler [peer.go] -> OnGetData [server.go] -> pushBlockMsg
func (s *server) pushBlockMsg(...) error {
...
var blockBytes []byte // L1441
err := sp.server.db.View(func(dbTx database.Tx) error {
var err error
blockBytes, err = dbTx.FetchBlock(hash)
return err
}) // L1446
...
var msgBlock wire.MsgBlock // L1458
err = msgBlock.Deserialize(bytes.NewReader(blockBytes)) // L1459
...
sp.QueueMessageWithEncoding(&msgBlock, dc, encoding) // L1483
...
}
L1441-L1446行从数据库中读取数据存在blockBytes中,在L1459反序列化为msgBlock变量,最后通过QueueMessageWithEncoding函数发送出去 (L1483). QueueMessageWithEncoding函数我们在I.A节已经介绍过,其一步步经过outputQueue管道, queuePacket函数, sendQueue管道, writeMessage函数, 最终在WriteMessageWithEncodingN函数中将数据发送出去。 和I.A节不同的是,在当前场景下,WriteMessageWithEncodingN函数中消息头hdrcommand被赋值为CmdBlock.

需要注意的是,peer B在返回区块数据给peer A时,是逐个区块发送数据的,即每个区块调用一次pushBlockMsgQueueMessageWithEncoding函数。

F. peer A 处理 "获取区块"的返回数据

和I.B小节类似的,inHandler函数使用readMessage函数从peer连接中读取数据,该函数进一步调用ReadMessageWithEncodingN函数。 所不同的是,这一次ReadMessageWithEncodingN函数中L363行返回CmdBlock;相应地,L371行生成的msg将是MsgBlock类型。 回到Peer.inHandler函数中,我们重新将Peer.inHandler的代码贴在下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// inHandler [peer.go]
func (p *Peer) inHandler() {
...
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
...
rmsg, buf, err := p.readMessage(p.wireEncoding) // L1334
...
switch msg := rmsg.(type) {
...
case *wire.MsgBlock: // L1432
if p.cfg.Listeners.OnBlock != nil {
p.cfg.Listeners.OnBlock(p, msg, buf) // L1434
}
...
}
...
}
...
}

L1432行的case得到调用,并调用L1434行的OnBlock函数。该函数被赋值为serverPeer.OnBlock, 函数定义如下所示:

1
2
3
4
5
6
7
8
// inHandler [peer.go] -> OnBlock [server.go]
func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
...
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
...
sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
...
}
OnBlock利用QueueBlock把数据发送出去,QueueBlock函数的定义如下:
1
2
3
4
5
6
// inHandler [peer.go] -> OnBlock [server.go] -> QueueBlock [manager.go]
func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done
chan struct{}) {
...
sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
}
QueueBlock函数利用msgChan管道传输数据,msgChan的另一端连接着blockHandler函数,该部分已经在上一篇博客 btcd源码解析——peer节点之间的区块数据同步 (1)讲述过,我们重新回顾一下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// inHandler [peer.go] -> OnBlock [server.go] -> QueueBlock [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case *blockMsg:
sm.handleBlockMsg(msg) // L1289
msg.reply <- struct{}{}
...
}
}
...
}
L1289行的handleBlockMsg函数被调用,该函数的主要功能就是将block的数据存储下来。 由于block的数据存储过程比较复杂,我们在后面会再写一篇博客介绍:从P2P连接中接收到的block数据是如何存储的。

II. 小结

至此,我们介绍完了headersFirstMode模式下节点之间的区块数据同步的过程,下一篇博客会介绍 “非headersFirstMode模式下的区块数据同步的过程”。