0%

btcd源码解析——节点P2P连接建立的过程 (2)

我们接着上一篇博客,继续讲解P2P主动连接的管理被动接受连接的过程。

III. 与其他peer建立P2P连接

...

B. 主动连接的管理

前面介绍到Start [connmanager.go]函数中调用的connHandler函数,主要用来管理主动连接。此外,在III.A节中我们提及requests管道的接收工作就是在connHandler函数中完成的。本小节我们就来看看connHandler函数的具体细节,先看看其代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Start [connmanager.go] -> connHandler
func (cm *ConnManager) connHandler() {
...
var (
...
pending = make(map[uint64]*ConnReq) // L233
...
conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // L236
)

out:
for {
select {
case req := <.cm.requests: // L242
switch msg :=req.(type) {

case registerPending: // L245
connReq := msg.c
connReq.updateState(ConnPending)
pending[msg.c.id] = connReq
close(msg.done)

case handleConnected: // L251
connReq := msg.c

connReq.updateState(ConnEstablished) // L254
connReq.conn = msg.conn
conns[connReq.id] = connReq
log.Debugf("Connected to %v", connReq)
connReq.retryCount = 0
cm.failedAttempts = 0

delete(pending, connReq.id) // L270

if cm.cfg.OnConnection != nil { // L272
go cm.cfg.OnConnection(connReq, msg.conn) // L273
}

case handleDisconnected:
...
case handleFailed:
...
}
case <-cm.quit:
break out
}
}
...
}
L242行代码接收requests管道中发过来的数据,判断数据类型后,交由不同的case处理。下面主要介绍registerPendinghandleConneted两种类型的数据

  • L245行代码处理registerPending类型的信息,其主要将conn变量的状态更新后加入到pending变量中
  • L251行代码处理handleConnected类型的信息,其首先也是对conn变量的状态进行了更新,将conn加入到conns变量中,并从pending中删除;然后利用OnConnection函数进行连接后的处理。OnConnection函数也是在server.go文件的newServer中赋值的,代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Start [connmanager.go] -> connHandler -> newServer [server.go]
    func newServer(...) (*server, error) {
    ...
    cmgr, err := connmgr.New(&connmgr.Config{ // L2818
    ...
    OnConnection: s.outboundPeerConnected,
    ...
    GetNewAddress: newAddressFunc,
    })
    ...
    ...
    }
    outboundPeerConnected函数的定义如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //  Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected
    func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
    sp := newServerPeer(s, c.Permanent) // L2024
    p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
    ...
    sp.Peer = p
    sp.connReq = c
    sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) // L2032
    sp.AssociateConnection(conn) // L2033
    go s.peerDoneHandler(sp) // L2034
    s.addrManager.Attempt(sp.NA()) // L2035
    }
    L2024-L2032代码创建了一个serverPeer变量,该变量包含了一个Peer变量;L2033行代码将sp变量与conn进行了绑定,并在AssociateConnection函数中启动了Peer变量,代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection
    func (p *Peer) AssociateConnection(conn net.Conn) {
    ...
    p.conn = conn // L2118
    ...
    go func() {
    if err := p.start(); err != nil { // L2137
    ...
    }
    } ()
    }
    L2118行代码将conn赋值给Peer变量中相应的字段,L2137行代码的start函数用来处理P2P连接中的数据传输,start函数代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> start [peer.go]
    func (p *Peer) start() error {
    ...
    negotiateErr := make(chan error, 1) // L2075
    go func() {
    if p.inbound {
    negotiateErr <- p.negotiateInboundProtocol() // L2078
    } else {
    negotiateErr <- p.negotiateOutboundProtocol() // L2080
    }
    }() // L2082
    ...
    go p.stallHandler() // L2099
    go p.inHandler()
    go p.queueHandler()
    go p.outHandler()
    go p.pingHandler() // L2103
    ...
    }
    L2075-L2082行代码主要协商双方的协议版本,判断双方版本是否兼容。L2099-L2103行代码启动了五个协程,用于发送和接收数据 (包括区块的同步、交易的发送等),具体代码细节,我们将在下一篇博客中分析。这里需要特别提一下的是negotiateInboundProtocolnegotiateOutboundProtocol函数,该函数虽然名为“协商版本函数”,但该函数完成的工作却远远不止协商版本这么简单,以下以negotiateOutboundProtocol函数为例进行简单介绍。

1. negotiateOutboundProtocol函数

negotiateOutboundProtocol函数代码如下所示:

1
2
3
4
5
6
7
8
// negotiateOutboundProtocol [peer.go]
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil { // L2064
return err
}

return p.readRemoteVersionMsg() // L2068
}
L2064行代码用于给对方peer发送版本信息,L2068接收对方peer发来的版本信息,readRemoteVersionMsg函数代码如下:
1
2
3
4
5
6
7
8
9
10
11
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg
func (p *Peer) readRemoteVersionMsg() error {
...
p.versionKnown = true // L1911
...
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg) // L1949
...
}
...
}
L1911行代码对versionKnown进行了赋值,该值将在III.B.2小节使用到,后文再说。L1949行代码调用OnVersion函数,该函数是MessageListeners结构中的变量,其在server.gonewPeerConfig函数中被赋值为sp.OnVersionsp.OnVersion函数的代码如下:
1
2
3
4
5
6
7
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go]
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion)
*wire.MsgReject {
...
sp.server.AddPeer(sp) // L501
...
}
L501行代码调用了AddPeer函数,该函数向管道newPeers中发送数据,代码如下所示:
1
2
3
4
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer
func (s *server) AddPeer(sp *serverPeer) { // L2162
s.newPeers <- sp
}
newPeers管道的另一端连接着server.peerHandler函数,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler
func (s *server) peerHandler() {
...
out:
for {
select {
// New peers connected to the server.
case p := <-s.newPeers:
s.handleAddPeerMsg(state, p) // L2100
...
case p := <-s.donePeers: // L2103
s.handleDonePeerMsg(state, p)
...
case <-s.quit:
...
}
}
...
}
L2100调用handleAddPeerMsg函数,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler -> handleAddPeerMsg
func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
...
if sp.Inbound() { // L1643
state.inboundPeers[sp.ID()] = sp
} else {
state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
if sp.persistent {
state.persistentPeers[sp.ID()] = sp
} else {
state.outboundPeers[sp.ID()] = sp
}
} // L1652
...
}
该函数在L1643-L1652行代码将peer变量(sp)登记到state状态中,便于后面的进一步管理,如在断开连接时清理peer相关数据。

2. peerDoneHandler函数

回到outboundPeerConnected函数中,L2034行代码调用了peerDoneHandler函数。该函数主要用于在peer断开连接时利用管道发出一些信号,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> peerDoneHandler
func (s *server) peerDoneHandler(sp *serverPeer) {
sp.WaitForDisconnect() // L2041
s.donePeers <- sp // L2042

...
if sp.VersionKnown() { // L2045
s.syncManager.DonePeer(sp.Peer) // L2046
...
}
close(sp.quit) // L2056
}
L2041行代码等待关闭连接的信号,L2042通过donePeers管道向server发送信号,L2056行通过quit管道向server发送信号,这两个管道的另一端也都连接着server.peerHandler函数,如III.B.1小节所示。 此外,L2045spversionKnown字段进行了判断,该字段在readRemoteVersionMsg函数的L1911行被赋值为true. L2046syncManager发送信号,通知syncManager停止数据的同步。

C. 被动接受连接

下面我们介绍节点被动接受连接的过程。 我们再将Start函数中被动接收连接相关的代码贴在这儿:

1
2
3
4
5
6
7
8
9
10
11
// Start [connmanager.go]
func (cm *ConnManager) Start() {
...
if cm.cfg.OnAccept != nil { // L522
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenerHandler(listener) // L525
}
}
...
}
L522首先判断OnAccept字段是否为nil. OnAccept字段也是在server.go文件的newServer中赋值的,代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
// newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
OnAccept: s.inboundPeerConnected,
...
GetNewAddress: newAddressFunc,
})
...
}
回到Start函数中L525行的代码,继续看listenerHandler函数,其代码如下所示:
1
2
3
4
5
6
7
8
9
10
// Start [connmanager.go] -> listenHandler
func (cm *ConnManager) listenHandler(listener net.Listener) {
...
for atomic.LoadInt32(&cm.stop) == 0 {
conn, err := listener.Accept() // L494
...
go cm.cfg.OnAccept(conn) // L502
}
...
}
L494行通过调用golangnet包的Accept方法接收连接请求,并将该请求作为参数传递给OnAccept函数。前面已经介绍过,OnAccept字段被赋值为inboundPeerConnected函数,后者的代码如下所示:
1
2
3
4
5
6
7
8
// inboundPeerConnected [server.go]
func (s *server) inboundPeerConnected(conn net.Conn) {
sp := newServerPeer(s, false) // L2011
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) // L2013
sp.AssociateConnection(conn) // L2014
go s.peerDoneHandler(sp) // L2015
}
inboundPeerConnected函数的代码和outboundPeerConnected函数的代码大同小异,此处不再赘述。

IV. 总结

至此,我们完成了P2P连接建立的源码解析。 其主要分为两个部分:peer地址的管理和peer连接的建立。 其中peer连接建立又可分为两种:主动发起连接和被动接收连接。 被动接受连接的代码比较简单。只需要listenerHandler一个协程就可以完成;主动发起连接的代码相对复杂,需要NewConnReqconnHandler两个协程来完成。 在此,我们没有介绍P2P连接的断开过程,这部分代码后面有机会再来分析。