比特币源码分析-网络(三)

664 阅读7分钟

前两篇文章主要从整体逻辑上对代码进行了梳理,这篇文章将主要讲述网络模块主要的函数,以及其具体实现。

监听连接:ThreadSocketHandler

断开没有使用的节点,首先遍历节点数组vNodesCopy,如果节点标识断开连接(fDisconnect),或者没有任何引用、发送接收消息,则移除节点,关闭socket,添加到断开连接节点数组 (vNodesDisconnected)

// Disconnect unused nodes
std::vector<CNode *> vNodesCopy = vNodes;
   for (CNode *pnode : vNodesCopy) {
       if (pnode->fDisconnect) {
          // remove from vNodes
        	vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode),
                         vNodes.end());

          // release outbound grant (if any)
          pnode->grantOutbound.Release();

          // close socket and cleanup
          pnode->CloseSocketDisconnect();

          // hold in disconnected pool until all refs are released
          pnode->Release();
          vNodesDisconnected.push_back(pnode);
       }
   }

遍历断开连接节点数组,当节点没有引用,且能获取节点的相关锁,则移除节点,删除此节点。

// Delete disconnected nodes
std::list<CNode *> vNodesDisconnectedCopy = vNodesDisconnected;
   for (CNode *pnode : vNodesDisconnectedCopy) {
      // wait until threads are done using it
      if (pnode->GetRefCount() <= 0) {
         bool fDelete = false;
         {
             TRY_LOCK(pnode->cs_inventory, lockInv);
             if (lockInv) {
                TRY_LOCK(pnode->cs_vSend, lockSend);
                if (lockSend) {
                    fDelete = true;
                }
             }
         }
         if (fDelete) {
             vNodesDisconnected.remove(pnode);
             DeleteNode(pnode);
         }
     }
  }

重新设置节点数量。

size_t vNodesSize;
{
    LOCK(cs_vNodes);
    vNodesSize = vNodes.size();
}
if (vNodesSize != nPrevNodeCount) {
    nPrevNodeCount = vNodesSize;
    if (clientInterface) {
       clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
    }
}

遍历监听SOCKET数组(vhListenSocket)、节点数组,为每个节点的socket 设置发送、接收fd_set

for (const ListenSocket &hListenSocket : vhListenSocket) {
    FD_SET(hListenSocket.socket, &fdsetRecv);
    hSocketMax = std::max(hSocketMax, hListenSocket.socket);
    have_fds = true;
}

调用select函数监听socket。

int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv,
             &fdsetSend, &fdsetError, &timeout);

遍历监听socket数组,当接收到数据,且socket有效时,接受连接,新建节点,添加到节点数组

for (const ListenSocket &hListenSocket : vhListenSocket) {
    if (hListenSocket.socket != INVALID_SOCKET &&
        FD_ISSET(hListenSocket.socket, &fdsetRecv)) {
        AcceptConnection(hListenSocket);
    }
}

遍历节点数组,增加节点的引用次数。

std::vector<CNode *> vNodesCopy;
{
   LOCK(cs_vNodes);
   vNodesCopy = vNodes;
   for (CNode *pnode : vNodesCopy) {
       pnode->AddRef();
   }
}

遍历节点数组,接收网络数据,解析成消息,添加到节点的接收消息数组 (vRecvMsg)。当发送集合有数据时,把节点的发送消息(vSendMsg)发送出去:

for (CNode *pnode : vNodesCopy) {                                          
    if (interruptNet) {                                                    
        return;                                                            
    }                                                                                                                                         
    // Receive                                                                                                                                 
    bool recvSet = false;                                                  
    bool sendSet = false;                                                  
    bool errorSet = false;                                                 
    {                                                                      
        LOCK(pnode->cs_hSocket);                                           
        if (pnode->hSocket == INVALID_SOCKET) {                            
            continue;                                                      
        }                                                                  
        recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);                    
        sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);                    
        errorSet = FD_ISSET(pnode->hSocket, &fdsetError);                  
    }                                                                      
    if (recvSet || errorSet) {                                             
        ....                                                        
    }                                                                                                                                         
    // Send                                                                                                                                    
    if (sendSet) {                                                         
        LOCK(pnode->cs_vSend);                                             
        size_t nBytes = SocketSendData(pnode);                             
        if (nBytes) {                                                      
            RecordBytesSent(nBytes);                                       
        }                                                                  
    }                                                                      

满足以下几点是不活跃的socke连接:

  • 没有任何发送、接收消息(nLastSend、nLastRecv)
  • 距离上次发送消息超过了90分钟(nLastSend),且距离上次把全部消息发送 出去也超过了90分钟(nLastSendEmpty)
  • 距离上次接收消息超过了90分钟(nLastRecv)
int64_t nTime = GetSystemTimeInSeconds();                              
if (nTime - pnode->nTimeConnected > 60) {                              
    if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) {              
        LogPrint("net", "socket no message in first 60 seconds, %d "   
                        "%d from %d\n",                                
                 pnode->nLastRecv != 0, pnode->nLastSend != 0,         
                 pnode->id);                                           
        pnode->fDisconnect = true;                                     
    } else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) {          
        LogPrintf("socket sending timeout: %is\n",                     
                  nTime - pnode->nLastSend);                           
        pnode->fDisconnect = true;                                     
    } else if (nTime - pnode->nLastRecv >                              
               (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL   
                                                  : 90 * 60)) {        
        LogPrintf("socket receive timeout: %is\n",                     
                  nTime - pnode->nLastRecv);                           
        pnode->fDisconnect = true;                                     
    } else if (pnode->nPingNonceSent &&                                
               pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 <    
                   GetTimeMicros()) {                                  
        LogPrintf("ping timeout: %fs\n",                               
                  0.000001 *                                           
                      (GetTimeMicros() - pnode->nPingUsecStart));      
        pnode->fDisconnect = true;                                     
    } else if (!pnode->fSuccessfullyConnected) {                       
        LogPrintf("version handshake timeout from %d\n", pnode->id);   
        pnode->fDisconnect = true;                                     
    }                                                                  
}                                                                      

发送接收消息:ThreadMessageHandler

此线程为无限循环,每隔0.1秒循环一次:

while (!flagInterruptMsgProc) {
}

遍历节点数组,增加节点引用次数,检查节点是否需要同步:

std::vector<CNode *> vNodesCopy;      
{                                     
    LOCK(cs_vNodes);                  
    vNodesCopy = vNodes;              
    for (CNode *pnode : vNodesCopy) { 
        pnode->AddRef();              
    }                                 
}                                     

遍历节点数组, 处理接收的消息,发送消息。遍历节点数组,减少节点引用次数:

for (CNode *pnode : vNodesCopy) { 
}

接收消息

节点接收消息后,循环遍历节点的消息缓冲区,解析消息头、数据,校验消息的有效性,再处理消息数据,处理过程中如果发生异常,则节点发送拒绝消息,此节点停止发送、接收消息。 消息的有效性检查主要检查以下4项:

  1. 接收到的消息是否完整 ,类CNetMessage的complete()函数校验完整性,in_data为TRUE,且消 息大小与消息数据偏移相等,则消息是完整的。
  2. 消息头开始字符串是否与环境参数中的字符串相同。
  3. 消息头是否有效。
  4. 消息头校验和是否正确。

重新计算数据的校验和,检验与消息头中的校验和是否相等。

if (pnode->fDisconnect) {                                
    continue;                                            
}                                                        
                                                         
// Receive messages                                                                                           
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(   
    *config, pnode, *this, flagInterruptMsgProc);        
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);      
if (flagInterruptMsgProc) {                              
    return;                                              
}                                                        

发送消息

{                                                        
    LOCK(pnode->cs_sendProcessing);                      
    GetNodeSignals().SendMessages(*config, pnode, *this, 
                                  flagInterruptMsgProc); 
}                                                        
if (flagInterruptMsgProc) {                              
    return;                                              
}                                                        

调用这个函数:

bool SendMessages(const Config &config, CNode *pto, CConnman &connman,
                  const std::atomic<bool> &interruptMsgProc) {
                  
     ...
     ...
}

发送消息时,检验5个命令是否需要发送,分别是:ping、addr、getblocks、 inv、getdata。

当用户在RPC中请求ping命令(节点的fPingQueued为TRUE),或者距离上一次发送命令超过了30分钟,且发送消息队列为空,则发送ping命令。当节点的版本大于BIP0031_VERSION时,设置发送ping命令的开始时间,发送ping命令时带随机数,随机数保存在节点的nPingNonceSent中。低于此版本时,不设置开始时间,不带随机数。

随机选择一个节点,发送addr消息发送节点的发送地址数组中的地址,同时把地址插入到已知地址集中,每次最多发送1000个地址。发送完毕后,清空节点的发送地址数组。

当节点需要同步时,且不处于导入、重建索引,则发送getblocks消息获取区块。获取区块时,指定开始区块、结束区块。同时保存开始的区块索引到节点的 pindexLastGetBlocksBegin中,保存结束区块索引值到节点hashLastGetBlocksEnd中。当获取区块时,检查这2项,如果已经获取了最新的区块,则不再获取区块,避免重复发送getblocks消息。

遍历节点的发送Inventory数组,发送inv消息。发送的Inventory必须时尚未处理的,即不在节点已知Inventory集中(setInventoryKnown),发送后添加到已知 Inventory集中。发送inv消息时最多发送1000个inventory。如果是随机节点,且 inventory的类型时MSG_TX,则只发送1/4的inventory,先计算随机值,随机值的低 2bit位为0的inventory发送出去,不为0的inventory保留在节点的发送inventory数组 (vInventoryToSend)中。

如果节点存在当前时间早的延迟的inventory,则发送getdata消息。延迟的 inventory保存在节点的mapAskFor数组。发送的inventory必须是节点中没有的。每次发送getdata消息时最多发送1000个inventory。同时删除节点的mapAskFor中的一项。

主动与外部节点建立连接:ThreadOpenConnections

void CConnman::ThreadOpenConnections() {
	...
}

此线程也是无限循环,每隔500毫秒连接一次。

while (!interruptNet) {
 if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) {  
     return;                                                     
 }                                                               
}

连接地址时注意以下几点:

  • 不连接无效的、地址数组中的外部连接地址中已有的、本地的地址。
  • 最多尝试连接100个地址。
  • 不连接受限制的地址。
  • 尝试连接次数达到30次时,才尝试连接距离上一次尝试连接10分钟以内的地址。
  • 尝试连接次数达到50次时,才尝试连接地址端不是参数默认端的地址。
while (!interruptNet) {                                                     
    CAddrInfo addr = addrman.Select(fFeeler);                                                          
    if (!addr.IsValid() || setConnected.count(addr.GetGroup()) ||           
        IsLocal(addr)) {                                                    
        break;                                                              
    }                                                                                                                                                                                                        
    nTries++;                                                               
    if (nTries > 100) {                                                     
        break;                                                              
    }                                                                                                                                               
    if (IsLimited(addr)) {                                                  
        continue;                                                           
    }                                                                                                                                                                                        
    if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) {        
        continue;                                                           
    }                                                                                                                                                 
    // only consider very recently tried nodes after 30 failed attempts     
    if (nANow - addr.nLastTry < 600 && nTries < 30) {                       
        continue;                                                           
    }                                                                                   
    if ((addr.nServices & nRelevantServices) != nRelevantServices &&        
        (nTries < 40 || nOutbound >= (nMaxOutbound >> 1))) {                
        continue;                                                           
    }                                                                                                                       
    if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50) {       
        continue;                                                           
    }                                                                                                                                                
    addrConnect = addr;                                                     
    break;                                                                  
}                                                                           
                                                                            
if (addrConnect.IsValid()) {                                                
                                                                            
    if (fFeeler) {                                                          
        // Add small amount of random noise before connection to avoid      
        // synchronization.                                                 
        int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);             
        if (!interruptNet.sleep_for(                                        
                std::chrono::milliseconds(randsleep))) {                    
            return;                                                         
        }                                                                   
        LogPrint("net", "Making feeler connection to %s\n",                 
                 addrConnect.ToString());                                   
    }                                                                       
                                                                            
    OpenNetworkConnection(addrConnect,                                      
                          (int)setConnected.size() >=                       
                              std::min(nMaxConnections - 1, 2),             
                          &grant, nullptr, false, fFeeler);                 
}                                                                           

获取主机ip:GetLocal

节点启用时,获取主机名称、IP地址,添加到地址、服务对应的数组中(mapLocalHost)。获取本地地址,保存到节点的本地地址(addrLocal),且向节点网络广播地址(AdvertizeLocal)。这样获取的地址大于监听(LOCAL_IF)。

bool GetLocal(CService &addr, const CNetAddr *paddrPeer) {                      
    if (!fListen) return false;                                                 
                                                                                
    int nBestScore = -1;                                                        
    int nBestReachability = -1;                                                 
    {                                                                           
        LOCK(cs_mapLocalHost);                                                  
        for (std::map<CNetAddr, LocalServiceInfo>::iterator it =                
                 mapLocalHost.begin();                                          
             it != mapLocalHost.end(); it++) {                                  
            int nScore = (*it).second.nScore;                                   
            int nReachability = (*it).first.GetReachabilityFrom(paddrPeer);     
            if (nReachability > nBestReachability ||                            
                (nReachability == nBestReachability && nScore > nBestScore)) {  
                addr = CService((*it).first, (*it).second.nPort);               
                nBestReachability = nReachability;                              
                nBestScore = nScore;                                            
            }                                                                   
        }                                                                       
    }                                                                           
    return nBestScore >= 0;                                                     
}                                                                               

系统定义了几种本地地址类型,优先采用值大的类型(GetLocal函数)

enum {
    // unknown
    LOCAL_NONE,
    // address a local interface listens on
    LOCAL_IF,
    // address explicit bound to
    LOCAL_BIND,
    // address reported by UPnP
    LOCAL_UPNP,
    // address explicitly specified (-externalip=)
    LOCAL_MANUAL,

    LOCAL_MAX
};

本文由 copernicus 团队 冉小龙 编写,转载无需授权!