0%

btcd源码解析——节点P2P连接建立的过程 (1)

从这一篇博客开始,我们将介绍btcd节点之间P2P连接建立的过程。考虑到内容太长,分为上下两篇博客来讲解。这两篇博客之后,我们将继续介绍节点之间数据的同步过程。 源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3commit id 版本。

I. 从节点的“启动”说起

btcd节点的启动主要由btcd.go文件中的btcdMain函数完成,其中P2P的连接过程又是由 server.Start()代码完成,如下所示:

1
2
3
4
5
6
7
8
// btcdMain [btcd.go]
func btcdMain(serverChan chan<- *server) error { // L43
...
server, err := newServer(...) // L149
...
server.Start() // L162
...
}
Start函数中和P2P连接相关的部分在go s.peerHandler()完成,如下代码所示:
1
2
3
4
5
6
// btcdMain [btcd.go] -> Start [server.go]
func (s *server) Start() { // L2291
...
go s.peerHandler() // L2305
...
}
在比特币P2P连接的语境中,一个节点就是一个peer. peerHandler函数包含三个最关键的启动工作:

  • 启动addrManager,进行peer地址的管理
  • 启动 syncManager,进行peer之间数据的同步
  • 启动 connManager,进行peer之间连接的管理

代码如下所示:

1
2
3
4
5
6
7
8
9
// btcdMain [btcd.go] -> Start [server.go] -> peerHandler
func (s *server) peerHandler() { // L2062
...
s.addrManager.Start() // L2068
s.syncManager.Start() // L2069
...
go s.connManager.Start() // L2093
...
}
其中跟P2P连接相关的主要是addrManagerconnManager的启动。具体而言:

  • addrManager负责对其他peer地址的管理,主要是一些本地的工作,不涉及直接的网络连接或传输;
  • connManager则主要负责与其他peer建立P2P连接,建立连接是需要对方peer的地址,这便依赖于addrManager中管理的地址。

II. P2P连接中peer地址的管理

A. AddrManager数据结构相关

P2P连接中peer地址的管理主要由addrManager完成,addrManager变量中包含了各种用于地址管理的信息,其数据结构如下所示:

1
2
3
4
5
6
7
8
9
10
// AddrManager [addrmanager.go]
type AddrManager struct { // L32
...
peersFile string // L34
...
addrIndex map[string]*KnownAddress // L38
addrNew [newBucketCount]map[string]*KnownAddress
addrTried [newBucketCount]map[string]*KnownAddress
...
}
其中最重要的四个字段是peersFile, addrIndex, addrNew, 和addrTried:

  • peersFile 对应于一个文件名,该文件主要保存序列化后的addrManager,用于节点重启时能快速建立连接。该文件路径名默认为$data-dir/data/mainnet/peers.json
  • addrIndex 缓存所有KnownAddressmap
  • addrNew 缓存所有新地址的map slice
  • addrTried 缓存所有已经尝试连接过的地址的list slice

准确来说,peersFile中保存的并不是直接序列化后的addrManager,因为addrManager中的一些信息是运行时信息,并不需要保存下来。因此源码中构造了专门用于序列化addrManager的数据结构,如下所示:

1
2
3
4
5
6
7
8
// serializedAddrManager [addrmanager.go]
type serializedAddrManager struct { // L64
Version int
Key [32]byte
Addresses []*serializedKnownAddress
NewBuckets [newBucketCount][]string // string is NetAddressKey
TriedBuckets [triedBucketCount][]string
}

B. 从peersFile中反序列化填充AddrManager变量

addrManagerStart函数中的loadPeers函数用来从peersFile中反序列化出peers的信息,并填充到addrManager中.

1
2
3
4
5
6
7
// Start [addrmanager.go]
func (a *AddrManager) Start() { // L567
...
a.loadPeers()
...
go a.addressHandler() // L580
}
loadPeers函数中主要做事的是deserializePeers函数,如下所示:
1
2
3
4
5
6
// Start [addrmanager.go] -> loadPeers
func (a *AddrManager) loadPeers() { // L423
...
err := a.deserializePeers(a.peersFile)
...
}
deserializePeers函数中的工作主要包括两个部分:

  1. peersFile文件中的数据反序列化成serializedAddrManager变量 (sam)
  2. sam中的AddressesNewBucketsTriedBuckets字段处理赋值给AddrManager变量 (a) 的addrIndex, addrNewaddrTried字段

deserializePeers函数代码如下所示, 其中:

  1. L444 - L456行代码将peersFile文件中的数据反序列化成sam变量,
  2. L471 - L502行代码将sam中的Addresses字段处理赋值给的addrIndex,
  3. L504 - L518将sam中的NewBuckets字段处理赋值给的addrNew,
  4. L519 - L531将sam中的TriedBuckets字段处理赋值给的addrTried
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Start [addrmanager.go] -> loadPeers -> deserializePeers
func (a *AddrManager) deserializePeers(filePath string) error { // L442
...
r, err := os.Open(filePath) // L444
...
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam) // L456
...
for _, v := range sam.Addresses { // L471
ka := new(KnownAddress)
...
ka.na, err = a.DeserializeNetAddress(v.Addr, v.Services)
...
ka.srcAddr, err = a.DeserializeNetAddress(v.Src, v.SrcServices)
...
a.addrIndex[NetAddressKey(ka.na)] = ka
} // L502
...
for i := range sam.NewBuckets { // L504
for _, val := range sam.NewBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrNew[i][val] = ka
}
} // L518
for i := range sam.TriedBuckets { // L519
for _, val := range sam.TriedBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrTried[i].PushBack(ka)
}
} // L531
...
}

C. 将AddrManager变量序列化存储到peersFile中

II.B小节中Start()函数的L580行启动了一个addressHandler函数的协程,该函数每隔10分钟调用一次savePeers函数,将peers信息 (sam变量) 序列化并保存到peersFile文件中。addressHandler函数和savePeers函数的代码分别如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
// Start [addrmanager.go] -> addressHandler 
func (a *AddrManager) addressHandler() { // L567
...
for {
select {
case <- dumpAddressTicker.C:
a.savePeers()
...
}
}
...
}

1
2
3
4
5
6
7
8
9
10
// Start [addrmanager.go] -> addressHandler -> savePeers
func (a *AddrManager) savePeers() { // L361
...
w, err := os.Create(a.peersFile) // L408
...
enc := json.NewEncoder(w) // L413
...
enc.Encode(&sam) // L415
...
}

III. 与其他peer建立P2P连接

第I节的peerHandler函数中的L2093启动了一个新协程,该协程运行s.connManager.Start()代码启动了ConnManager管理器,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Start [connmanager.go]
func (cm *ConnManager) Start() {
...
go cm.connHandler() // L518
...
if cm.cfg.OnAccept != nil {
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenerHandler(listener) // L525
}
}

for i := atomic.LoadUnit64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq() // L530
}
}
Start函数中最重要的代码包括三部分:

  1. L530行主动发起与其他peer节点的连接
  2. L518行对所有的主动连接进行管理
  3. L525行被动接受其他peer节点的连接

需要进一步说明的是,1主要是完成主动连接的建立过程,2主要完成主动连接建立之后的管理。1每建立一次连接后,都需要由2完成后续的管理工作,如登记到conns变量中。

A. 主动发起连接

节点主动发起连接的行为由NewConnReq函数完成,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Start [connmanager.go] -> NewConnReq [server.go]
func (cm *ConnManager) NewConnReq() {
...
done := make(chan struct{}) // L376
select {
case cm.requests <- registerPending{c, done}:
case <-cm.quit:
return
}
...
select {
case <-done:
case <-cm.quit:
return
} // L398

addr, err := cm.cfg.GetNewAddress() // L400
...
c.Addr = addr

cm.Connect(c) // L402
}
其中需要解释的代码包括三个部分:

  1. L376-L398: 将当前连接请求登记到pending变量中,方便对该连接进行后续管理。登记过程是通过requests管道完成的,管道的另一端连接connHandler函数,后续在讲解connHandler函数的再做详细介绍;
  2. L400: 获取将要连接的peer地址,将在III.A.1节讲述
  3. L402: 完成实际的连接过程,将在III.A.2节讲述

1. 获取将要连接的peer地址

由上可知,获取将要连接的peer地址是由GetNewAddress函数实现的,该函数是connmanager.go文件中config数据结构的字段. 在初始化cmgr变量时,该字段被赋值为newAddressFunc,代码如下所示:

1
2
3
4
5
6
7
8
9
10
// newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
GetNewAddress: newAddressFunc,
})
...
...
}
进一步查看newAddressFunc函数的定义,其也定义在newServer函数中,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// newServer [server.go]
func newServer(...) (*server, error) {
...
var newAddressFunc func() (net.Addr, error) // L2773
if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {
newAddressFunc = func() (net.Addr, error) {
for tries := 0; tries < 100; tries++ {
addr := s.addrManager.GetAddress() // L2777
...
addrString := addrmgr.NetAddressKey(addr.NetAddress())
return addrStringToNetAddr(addrString)
}
...
}
}
...
}
其中最重要的代码在L2777行,利用addrManagerGetAddress函数获取可用的连接地址。GetAddress函数主要在addrTriedaddrNew两个列表中随机地挑选可用的地址,这就与第II节的内容联系起来了。

2. 完成实际的连接过程

实际的连接过程是由cm.Connect(c)完成的,其代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Start [connmanager.go] -> NewConnReq [server.go] -> Connect [connmanager.go]
func (cm *ConnManager) Connect(c *ConnReq) {
...
conn, err := cm.cfg.Dial(c.Addr) // L444
if err != nil {
select {
case cm.requests <- handleFailed{c, err}: // L447
case <-cm.quit:
}
return
}
select {
case cm.requests <- handleConnected{c, conn}: // L454
case <-cm.quit:
}
}
L444行通过调用Dial函数进行连接,该函数主要对golangnet包的Dial方法进行了一些包装。 L447L454行分别用来处理连接失败和成功的情况,具体处理过程也是通过向requests通道传递数据来完成。