以太坊 p2p Server 原理及实现
以太坊p2p原理與實(shí)現(xiàn)
區(qū)塊鏈技術(shù)的去中心依賴于底層組網(wǎng)技術(shù),以太坊的底層實(shí)現(xiàn)了p2pServer,大約可以分為這樣三層。
- 底層路由表。封裝了kad路由,節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)以及計(jì)算記錄,節(jié)點(diǎn)搜索,驗(yàn)證等功能。
- 中層peer抽象,message開放發(fā)送接口,server對外提供peer檢測,初始化,事件訂閱,peer狀態(tài)查詢,啟動(dòng),停止等功能
- 以太坊最上層peer,peerset再封裝,通過協(xié)議的Run函數(shù),在中層啟動(dòng)peer時(shí),獲取peer,最終通過一個(gè)循環(huán)截取穩(wěn)定peer,包裝在peerset中使用。
底層路由表
這里簡化問題僅討論Node Discovery Protocol。 這一層維護(hù)了一個(gè)buckets桶,總共有17個(gè)桶,每個(gè)桶有16個(gè)節(jié)點(diǎn)和10個(gè)替換節(jié)點(diǎn)。 Node放入時(shí)先要計(jì)算hash和localNode的距離。再按距離選擇一個(gè)桶放進(jìn)去,取的時(shí)候逐個(gè)計(jì)算target和每個(gè)桶中對象的舉例,詳細(xì)參考closest函數(shù),后面會(huì)貼出來。
距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節(jié)點(diǎn)數(shù)量 map為一個(gè)負(fù)相關(guān)的映射關(guān)系。
簡單來說就是相似越多,值越小。細(xì)節(jié)參考Node.go的logdist函數(shù)。 這里需要了解算法Kademlia,
. ├── database.go //封裝node數(shù)據(jù)庫相關(guān)操作 ├── node.go //節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu) ├── ntp.go //同步時(shí)間 ├── table.go //路由表 ├── udp.go //網(wǎng)絡(luò)相關(guān)操作其中最重要的就是table對象,table公共方法有:
- newTable 實(shí)例創(chuàng)建
- Self local節(jié)點(diǎn)獲取
- ReadRandomNodes 隨機(jī)讀取幾個(gè)節(jié)點(diǎn)
- Close 關(guān)閉
- Resolve 在周邊查找某個(gè)節(jié)點(diǎn)
- Lookup 查找某個(gè)節(jié)點(diǎn)的鄰近節(jié)點(diǎn)
逐個(gè)來分析這些方法:
newTable
- 1:生成對象實(shí)例(獲取數(shù)據(jù)庫客戶端,LocalNode etc)
- 2:載入引導(dǎo)節(jié)點(diǎn),初始化k桶。
- 3:將節(jié)點(diǎn)放入到桶里,生成一條協(xié)程用于刷新,驗(yàn)證節(jié)點(diǎn)。
載入種子節(jié)點(diǎn)
func (tab *Table) loadSeedNodes(bond bool) {seeds := tab.db.querySeeds(seedCount, seedMaxAge)//數(shù)據(jù)庫中的種子節(jié)點(diǎn)和引導(dǎo)節(jié)點(diǎn)合并seeds = append(seeds, tab.nursery...) if bond {seeds = tab.bondall(seeds) //節(jié)點(diǎn)驗(yàn)證}for i := range seeds {seed := seeds[i]age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)tab.add(seed) //節(jié)點(diǎn)入桶}}節(jié)點(diǎn)入桶,同時(shí)也要檢查ip等限制。
func (tab *Table) add(new *Node) {tab.mutex.Lock()defer tab.mutex.Unlock()b := tab.bucket(new.sha) //獲取當(dāng)前節(jié)點(diǎn)對應(yīng)的桶if !tab.bumpOrAdd(b, new) {// Node is not in table. Add it to the replacement list.tab.addReplacement(b, new)}}桶的選擇
func (tab *Table) bucket(sha common.Hash) *bucket {d := logdist(tab.self.sha, sha) //計(jì)算hash舉例if d <= bucketMinDistance {//這里按算法來看,只要hash前三位相等就會(huì)到第一個(gè)bucketsreturn tab.buckets[0]}return tab.buckets[d-bucketMinDistance-1]}Resolve
根據(jù)Node的Id查找Node,先在當(dāng)前的桶里面查找,查找一遍之后沒找到就在周邊的節(jié)點(diǎn)里面搜索一遍再找。
// Resolve searches for a specific node with the given ID.// It returns nil if the node could not be found.func (tab *Table) Resolve(targetID NodeID) *Node {// If the node is present in the local table, no// network interaction is required.hash := crypto.Keccak256Hash(targetID[:])tab.mutex.Lock()//查找最近節(jié)點(diǎn)cl := tab.closest(hash, 1)tab.mutex.Unlock()if len(cl.entries) > 0 && cl.entries[0].ID == targetID {return cl.entries[0]}// Otherwise, do a network lookup.//不存在 搜索鄰居節(jié)點(diǎn)result := tab.Lookup(targetID)for _, n := range result {if n.ID == targetID {return n}}return nil}這里需要理解的函數(shù)是 closest,遍歷所有桶的所有節(jié)點(diǎn),查找最近的一個(gè)
// closest returns the n nodes in the table that are closest to the// given id. The caller must hold tab.mutex.func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {// This is a very wasteful way to find the closest nodes but// obviously correct. I believe that tree-based buckets would make// this easier to implement efficiently.close := &nodesByDistance{target: target}for _, b := range tab.buckets {for _, n := range b.entries {close.push(n, nresults)}}return close}func (h *nodesByDistance) push(n *Node, maxElems int) {ix := sort.Search(len(h.entries), func(i int) bool {return distcmp(h.target, h.entries[i].sha, n.sha) > 0})if len(h.entries) < maxElems {h.entries = append(h.entries, n)}if ix == len(h.entries) {// farther away than all nodes we already have.// if there was room for it, the node is now the last element.} else {// slide existing entries down to make room// this will overwrite the entry we just appended.//近的靠前邊copy(h.entries[ix+1:], h.entries[ix:])h.entries[ix] = n}}ReadRandomNodes
整體思路是先拷貝出來,再逐個(gè)桶的抽最上面的一個(gè),剩下空桶移除,剩下的桶合并后,下一輪再抽桶的第一個(gè)節(jié)點(diǎn),直到填滿給定數(shù)據(jù)或者桶全部空掉。最后返回填到數(shù)組里面的數(shù)量。
// ReadRandomNodes fills the given slice with random nodes from the// table. It will not write the same node more than once. The nodes in// the slice are copies and can be modified by the caller.func (tab *Table) ReadRandomNodes(buf []*Node) (n int) {if !tab.isInitDone() {return 0}tab.mutex.Lock()defer tab.mutex.Unlock()// Find all non-empty buckets and get a fresh slice of their entries.var buckets [][]*Node//拷貝節(jié)點(diǎn)for _, b := range tab.buckets {if len(b.entries) > 0 {buckets = append(buckets, b.entries[:])}}if len(buckets) == 0 {return 0}// Shuffle the buckets.for i := len(buckets) - 1; i > 0; i-- {j := tab.rand.Intn(len(buckets))buckets[i], buckets[j] = buckets[j], buckets[i]}// Move head of each bucket into buf, removing buckets that become empty.var i, j intfor ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {b := buckets[j]buf[i] = &(*b[0]) //取第一個(gè)節(jié)點(diǎn)buckets[j] = b[1:] //移除第一個(gè)if len(b) == 1 {//空桶移除buckets = append(buckets[:j], buckets[j+1:]...) }if len(buckets) == 0 {break }}return i + 1}Lookup
lookup會(huì)要求已知節(jié)點(diǎn)查找鄰居節(jié)點(diǎn),查找的鄰居節(jié)點(diǎn)又遞歸的找它周邊的節(jié)點(diǎn)
for {// ask the alpha closest nodes that we haven't asked yetfor i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {n := result.entries[i]if !asked[n.ID] {asked[n.ID] = truependingQueries++ go func() {// Find potential neighbors to bond withr, err := tab.net.findnode(n.ID, n.addr(), targetID)if err != nil {// Bump the failure counter to detect and evacuate non-bonded entriesfails := tab.db.findFails(n.ID) + 1tab.db.updateFindFails(n.ID, fails)log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)if fails >= maxFindnodeFailures {log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)tab.delete(n)}}reply <- tab.bondall(r)}()}}if pendingQueries == 0 {// we have asked all closest nodes, stop the searchbreak}// wait for the next replyfor _, n := range <-reply { //此處會(huì)阻塞請求if n != nil && !seen[n.ID] {seen[n.ID] = trueresult.push(n, bucketSize)}}pendingQueries--}桶的維護(hù)
桶初始化完成后會(huì)進(jìn)入一個(gè)循環(huán)邏輯,其中通過三個(gè)timer控制調(diào)整周期。
- 驗(yàn)證timer 間隔 10s左右
- 刷新timer 間隔 30 min
- 持久化timer 間隔 30s
刷新邏輯:重新加載種子節(jié)點(diǎn),查找周邊節(jié)點(diǎn),隨機(jī)三個(gè)節(jié)點(diǎn),并查找這三個(gè)節(jié)點(diǎn)的周圍節(jié)點(diǎn)。
func (tab *Table) doRefresh(done chan struct{}) {defer close(done)tab.loadSeedNodes(true)tab.lookup(tab.self.ID, false)for i := 0; i < 3; i++ {var target NodeIDcrand.Read(target[:])tab.lookup(target, false)}}驗(yàn)證邏輯:驗(yàn)證每個(gè)桶的最末尾節(jié)點(diǎn),如果該節(jié)點(diǎn)通過驗(yàn)證則放到隊(duì)首(驗(yàn)證過程是本地節(jié)點(diǎn)向它發(fā)送ping請求,如果回應(yīng)pong則通過)
last, bi := tab.nodeToRevalidate() //取最后一個(gè)節(jié)點(diǎn)if last == nil {// No non-empty bucket found.return}// Ping the selected node and wait for a pong.err := tab.ping(last.ID, last.addr()) //通信驗(yàn)證tab.mutex.Lock()defer tab.mutex.Unlock()b := tab.buckets[bi]if err == nil {// The node responded, move it to the front.log.Debug("Revalidated node", "b", bi, "id", last.ID)b.bump(last) //提到隊(duì)首return}Peer/Server
相關(guān)文件
. ├── dial.go //封裝一個(gè)任務(wù)生成處理結(jié)構(gòu)以及三種任務(wù)結(jié)構(gòu)中(此處命名不太精確) ├── message.go //定義一些數(shù)據(jù)的讀寫接口,以及對外的Send/SendItem函數(shù) ├── peer.go //封裝了Peer 包括消息讀取 ├── rlpx.go //內(nèi)部的握手協(xié)議 ├── server.go //初始化,維護(hù)Peer網(wǎng)絡(luò),還有一些對外的接口這一層會(huì)不斷的從路由中提取節(jié)點(diǎn),提取出來的節(jié)點(diǎn)要經(jīng)過身份驗(yàn)證,協(xié)議檢查之后加入到peer里面,緊接著如果沒有人使用這個(gè)peer,這個(gè)peer就會(huì)被刪除,再重新選擇一些節(jié)點(diǎn)出來繼續(xù)這個(gè)流程,peer再其中是隨生隨銷,這樣做是為了平均的使用所有的節(jié)點(diǎn),而不是僅僅依賴于特定的幾個(gè)節(jié)點(diǎn)。因而這里從Server開始入手分析整個(gè)流程
Peers() //peer對象PeerCount() //peer數(shù)量AddPeer(node *discover.Node) //添加節(jié)點(diǎn)RemovePeer(node *discover.Node) //刪除節(jié)點(diǎn)SubscribeEvents(ch chan *PeerEvent) //訂閱內(nèi)部的事件(節(jié)點(diǎn)的增加,刪除)//以上四個(gè)屬于對外的接口,不影響內(nèi)部邏輯Start() //server開始工作SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) //啟動(dòng)一個(gè)連接,經(jīng)過兩次驗(yàn)證之后,如果通過則加入到peer之中。Start初始化
Start做了三件事,生成路由表于建立底層網(wǎng)絡(luò)。生成DialState用于驅(qū)動(dòng)維護(hù)本地peer的更新與死亡,監(jiān)聽本地接口用于信息應(yīng)答。這里主要分析peer的維護(hù)過程。函數(shù)是run函數(shù)。
func (srv *Server) Start() (err error) {//**************初始化代碼省略if !srv.NoDiscovery && srv.DiscoveryV5 {unhandled = make(chan discover.ReadPacket, 100)sconn = &sharedUDPConn{conn, unhandled}}// node tableif !srv.NoDiscovery {//路由表生成cfg := discover.Config{PrivateKey: srv.PrivateKey,AnnounceAddr: realaddr,NodeDBPath: srv.NodeDatabase,NetRestrict: srv.NetRestrict,Bootnodes: srv.BootstrapNodes,Unhandled: unhandled,}ntab, err := discover.ListenUDP(conn, cfg)if err != nil {return err}srv.ntab = ntab}if srv.DiscoveryV5 {//路由表生成var (ntab *discv5.Networkerr error)if sconn != nil {ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)} else {ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)}if err != nil {return err}if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {return err}srv.DiscV5 = ntab}dynPeers := srv.maxDialedConns()//newDialState 對象生成,這個(gè)對象包含Peer的實(shí)際維護(hù)代碼dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)// handshake 協(xié)議加載srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}for _, p := range srv.Protocols {srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())}// listen/dial//監(jiān)聽本地端口if srv.ListenAddr != "" {if err := srv.startListening(); err != nil {return err}}if srv.NoDial && srv.ListenAddr == "" {srv.log.Warn("P2P server will be useless, neither dialing nor listening")}srv.loopWG.Add(1)//重要的一句,開個(gè)協(xié)程,在其中做peer的維護(hù)go srv.run(dialer)srv.running = truereturn nil}run 開始peer的生成
該函數(shù)中定義了兩個(gè)隊(duì)列
runningTasks []task //正在執(zhí)行的任務(wù)queuedTasks []task //尚未執(zhí)行的任務(wù)定義了三個(gè)匿名函數(shù)
//從正在執(zhí)行任務(wù)中刪除任務(wù)delTask := func(t task) {for i := range runningTasks {if runningTasks[i] == t {runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)break}}}//開始一批任務(wù)startTasks := func(ts []task) (rest []task) {i := 0for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {t := ts[i]srv.log.Trace("New dial task", "task", t)go func() {t.Do(srv); taskdone <- t }()runningTasks = append(runningTasks, t)}return ts[i:]}//啟動(dòng)開始一批任務(wù)再調(diào)用dialstate的newTasks函數(shù)生成一批任務(wù),加載到任務(wù)隊(duì)列里面scheduleTasks := func() {// Start from queue first.queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)// Query dialer for new tasks and start as many as possible now.if len(runningTasks) < maxActiveDialTasks {nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())queuedTasks = append(queuedTasks, startTasks(nt)...)}}定義了一個(gè)循環(huán),分不同的chanel執(zhí)行對應(yīng)的邏輯
for {//調(diào)度開始找生成任務(wù)scheduleTasks()select {case <-srv.quit://退出break runningcase n := <-srv.addstatic: //增加一個(gè)節(jié)點(diǎn) 該節(jié)點(diǎn)最終會(huì)生成一個(gè)dialTask //并在newTasks的時(shí)候加入到讀列srv.log.Debug("Adding static node", "node", n)dialstate.addStatic(n)case n := <-srv.removestatic://直接刪除該節(jié)點(diǎn) 節(jié)點(diǎn)不再參與維護(hù),很快就會(huì)死掉了dialstate.removeStatic(n)if p, ok := peers[n.ID]; ok {p.Disconnect(DiscRequested)}case op := <-srv.peerOp:// Peers 和 PeerCount 兩個(gè)外部接口,只是讀取peer信息op(peers)srv.peerOpDone <- struct{}{}case t := <-taskdone://task完成后會(huì)根據(jù)不同的任務(wù)類型進(jìn)行相應(yīng)的處理srv.log.Trace("Dial task done", "task", t)dialstate.taskDone(t, time.Now())delTask(t)case c := <-srv.posthandshake://身份驗(yàn)證通過 if trusted[c.id] {// Ensure that the trusted flag is set before checking against MaxPeers.c.flags |= trustedConn}select {case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):case <-srv.quit:break running}case c := <-srv.addpeer://身份協(xié)議驗(yàn)證通過 加入隊(duì)列err := srv.protoHandshakeChecks(peers, inboundCount, c)if err == nil {// The handshakes are done and it passed all checks.p := newPeer(c, srv.Protocols)// If message events are enabled, pass the peerFeed// to the peerif srv.EnableMsgEvents {p.events = &srv.peerFeed}name := truncateName(c.name)srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)go srv.runPeer(p) //觸發(fā)事件 此處是最上層截取peer的位置,如果此物沒有外部影響,那么這個(gè)peer很快就被銷毀了peerAdd++fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)peers[c.id] = pif p.Inbound() {inboundCount++}}// The dialer logic relies on the assumption that// dial tasks complete after the peer has been added or// discarded. Unblock the task last.select {case c.cont <- err:case <-srv.quit:break running}case pd := <-srv.delpeer://移除peerd := common.PrettyDuration(mclock.Now() - pd.created)pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)delete(peers, pd.ID())peerDel++fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)if pd.Inbound() {inboundCount--}}}記住上面的代碼,再來逐個(gè)的看:
scheduleTasks
scheduleTasks調(diào)度生成任務(wù),生成的任務(wù)中有一種dialTask的任務(wù),該任務(wù)結(jié)構(gòu)如下
type dialTask struct {flags connFlagdest *discover.NodelastResolved time.TimeresolveDelay time.Duration}func (t *dialTask) Do(srv *Server) {if t.dest.Incomplete() {if !t.resolve(srv) {return}}err := t.dial(srv, t.dest) //此處會(huì)調(diào)用到setupConn函數(shù)if err != nil {log.Trace("Dial error", "task", t, "err", err)// Try resolving the ID of static nodes if dialing failed.if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {if t.resolve(srv) {t.dial(srv, t.dest)}}}}dial最終回調(diào)用到setupConn函數(shù),函數(shù)只保留重點(diǎn)的幾句,篇幅有點(diǎn)長了
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {//身份驗(yàn)證碼 獲取設(shè)備,標(biāo)識(shí)等信息if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != //此處會(huì)往chanel中添加連接對象,最終觸發(fā)循環(huán)中的posthandshake分支err = srv.checkpoint(c, srv.posthandshake) //協(xié)議驗(yàn)證phs, err := c.doProtoHandshake(srv.ourHandshake)c.caps, c.name = phs.Caps, phs.Name//此處會(huì)往chanel中添加連接對象 最終觸發(fā)循環(huán)中的addpeer分支err = srv.checkpoint(c, srv.addpeer)}posthandshake 分支僅僅做了驗(yàn)證,addpeer做的事情就比較多了,重要的就是執(zhí)行runPeer函數(shù)
func (srv *Server) runPeer(p *Peer) {// 廣播 peer addsrv.peerFeed.Send(&PeerEvent{Type: PeerEventTypeAdd,Peer: p.ID(),})// run the protocolremoteRequested, err := p.run() //// 廣播 peer dropsrv.peerFeed.Send(&PeerEvent{Type: PeerEventTypeDrop,Peer: p.ID(),Error: err.Error(),})//移除peersrv.delpeer <- peerDrop{p, err, remoteRequested}}func (p *Peer) run() (remoteRequested bool, err error) {//*************writeStart <- struct{}{}p.startProtocols(writeStart, writeErr)//*************//這一句阻塞性確保了peer的存活p.wg.Wait() }func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {p.wg.Add(len(p.running))for _, proto := range p.running {proto := protoproto.closed = p.closedproto.wstart = writeStartproto.werr = writeErrvar rw MsgReadWriter = protoif p.events != nil {rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)}p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))go func() {//其他的都是為這一句做準(zhǔn)備的,在以太坊中p2p就是靠這一句對上層暴露peer對象err := proto.Run(p, rw)if err == nil {p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))err = errProtocolReturned} else if err != io.EOF {p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)}p.protoErr <- errp.wg.Done() }()}}這樣就可以可理出一條思路 scheduleTasks執(zhí)行生成dialTask任務(wù) dialTask任務(wù)執(zhí)行過程中逐個(gè)填充posthandshake,addPeer這兩個(gè)chanel。 addPeer執(zhí)行時(shí)對上層暴露了Peer對象,完成后填充了delpeer,最后刪除了Peer。
任務(wù)的生成
具體看代碼中的注釋
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {if s.start.IsZero() {s.start = now}var newtasks []task//這里聲明了一個(gè)添加任務(wù)的函數(shù) addDial := func(flag connFlag, n *discover.Node) bool {if err := s.checkDial(n, peers); err != nil {log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)return false}s.dialing[n.ID] = flag //排除掉已經(jīng)再測試的newtasks = append(newtasks, &dialTask{flags: flag, dest: n})return true}// Compute number of dynamic dials necessary at this point.needDynDials := s.maxDynDials //當(dāng)前系統(tǒng)中最大連接數(shù)目for _, p := range peers { //扣除已建立鏈接的peerif p.rw.is(dynDialedConn) {needDynDials--}}for _, flag := range s.dialing { //扣除已建立鏈接的peerif flag&dynDialedConn != 0 {needDynDials--}}//外部命令添加的節(jié)點(diǎn) 這種節(jié)點(diǎn)不占用needDynDials數(shù)目,//是為了保證手動(dòng)加的節(jié)點(diǎn)能夠起效for id, t := range s.static {err := s.checkDial(t.dest, peers)switch err {case errNotWhitelisted, errSelf:log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)delete(s.static, t.dest.ID)case nil:s.dialing[id] = t.flagsnewtasks = append(newtasks, t)}}// If we don't have any peers whatsoever, try to dial a random bootnode. This// scenario is useful for the testnet (and private networks) where the discovery// table might be full of mostly bad peers, making it hard to find good ones.if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && //檢查引導(dǎo)節(jié)點(diǎn) 因?yàn)橐龑?dǎo)節(jié)點(diǎn)比搜索到的節(jié)點(diǎn)更大概率靠譜 因而比較靠前now.Sub(s.start) > fallbackInterval {bootnode := s.bootnodes[0]s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)s.bootnodes = append(s.bootnodes, bootnode)if addDial(dynDialedConn, bootnode) {needDynDials--}}//隨機(jī)的從路由中抽取最大節(jié)點(diǎn)的二分之一randomCandidates := needDynDials / 2if randomCandidates > 0 {n := s.ntab.ReadRandomNodes(s.randomNodes)for i := 0; i < randomCandidates && i < n; i++ {if addDial(dynDialedConn, s.randomNodes[i]) {needDynDials--}}}// 從lookupbuf中抽取i := 0for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {if addDial(dynDialedConn, s.lookupBuf[i]) {needDynDials--}}s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]// 如果還是不夠,路由再去搜索節(jié)點(diǎn)if len(s.lookupBuf) < needDynDials && !s.lookupRunning {s.lookupRunning = truenewtasks = append(newtasks, &discoverTask{})}// waitif nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {t := &waitExpireTask{s.hist.min().exp.Sub(now)}newtasks = append(newtasks, t)}return newtasks}消息發(fā)送
另一個(gè)是message中的Send,SendItem函數(shù) 實(shí)現(xiàn)了MsgWriter的對象都可以調(diào)用這個(gè)函數(shù)寫入,覺得這里沒什么必要,完全可以封裝到peer里面去,不過它上層做廣播的時(shí)候確實(shí)是調(diào)用的這兩個(gè)函數(shù)。
func Send(w MsgWriter, msgcode uint64, data interface{}) error {size, r, err := rlp.EncodeToReader(data)if err != nil {return err}return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})}func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {return Send(w, msgcode, elems)}以太坊上層調(diào)用
Peer/PeerSet
文件:go-ethereum/eth/peer.go
定義了兩個(gè)struct,Peer和PeerSet。Peer封裝了底層的p2p.Peer,集成了一些和業(yè)務(wù)相關(guān)的方法,比如SendTransactions,SendNewBlock等。PeerSet是Peer的集合
type peer struct {id string*p2p.Peerrw p2p.MsgReadWriterversion int // Protocol version negotiatedforkDrop *time.Timer // Timed connection dropper if forks aren't validated in timehead common.Hashtd *big.Intlock sync.RWMutexknownTxs *set.Set // Set of transaction hashes known to be known by this peerknownBlocks *set.Set // Set of block hashes known to be known by this peer}type peerSet struct {peers map[string]*peerlock sync.RWMutexclosed bool}Peer注冊/注銷
文件:go-ethereum/eth/handler.go manager.handle在檢查了peer后會(huì)把這個(gè)peer注冊到peerset中,表示此peer可用,發(fā)生錯(cuò)誤后peerset注銷該peer,返回錯(cuò)誤,最后再Server中銷毀。
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))for i, version := range ProtocolVersions {// Skip protocol version if incompatible with the mode of operationif mode == downloader.FastSync && version < eth63 {continue}// Compatible; initialise the sub-protocolversion := version // Closure for the runmanager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{Name: ProtocolName,Version: version,Length: ProtocolLengths[i],Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {peer := manager.newPeer(int(version), p, rw)select {case manager.newPeerCh <- peer:manager.wg.Add(1)defer manager.wg.Done()//此處如果順利會(huì)進(jìn)入for循環(huán) 如果失敗返回錯(cuò)誤我會(huì)銷毀掉這個(gè)peerreturn manager.handle(peer) case <-manager.quitSync:return p2p.DiscQuitting}},NodeInfo: func() interface{} {return manager.NodeInfo()},PeerInfo: func(id discover.NodeID) interface{} {if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {return p.Info()}return nil},})}參考
源碼:https://github.com/ethereum/go-ethereum/tree/master/p2p
Kademlia算法:https://en.wikipedia.org/wiki/Kademlia
轉(zhuǎn)自:(魂祭心)?https://my.oschina.net/hunjixin/blog/1803029
如果你希望高效的學(xué)習(xí)以太坊DApp開發(fā),可以訪問匯智網(wǎng)提供的最熱門在線互動(dòng)教程:
1.適合區(qū)塊鏈新手的以太坊DApp實(shí)戰(zhàn)入門教程
2.區(qū)塊鏈+IPFS+Node.js+MongoDB+Express去中心化以太坊電商應(yīng)用開發(fā)實(shí)戰(zhàn)
其他更多內(nèi)容也可以訪問這個(gè)以太坊博客
總結(jié)
以上是生活随笔為你收集整理的以太坊 p2p Server 原理及实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Opensetack + Kuberne
- 下一篇: VS2010 LNK1123:转换到 C