区块链由业余爱好开始转向正式工作,进而可以全身心的学习,努力不能停止。。。。
前面曾经分析学习过以太坊c++版本的代码,现在开始go语言版本的学习,虽然白皮书是同一份,实现的架构,原理相同,但是go语言跟c++版本实现流程还是很不一样的,刚刚接触go语言,借此机会学习下源码,熟悉下go语言
老习惯,学习一个新系统,总想着在脑中先搭建起一个整体的框架(当然初期,这个框架也是不完整的),形成一条主线,后面再分模块学习,再融合,再调整框架,最终丰富起整个系统的实现。
先有一个印象再脑中:
创建一个节点,整个节点是个组装者,他提供了service的接口,其他模块实现接口(例如ethereum),然后他在将相关模块进行组合。
好,开始。
目录
1. 系统入口
【文件位置】cmd/geth/main.go
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
这里看着代码很简单,但是又很迷糊,直接Run的哪里呢?因为go语言执行特性,会先执行每个包中init()函数
这里又使用了cli框架 https://github.com/urfave/cli/tree/v2 在init()里面配置了action, command, flags, befer, after等,
然后action才是真正的入口
这里的三个函数 makeFullNode startNode node.Wait 是启动了整个系统
2. 总入口之makeFullNode
因为首先想着搭建一个框架印象出来,所以很多代码细节没有详细深入,待后续继续分析
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)
RegisterEthService(stack, &cfg.Eth)
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
......
utils.RegisterShhService(stack, &cfg.Shh)
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
这里的工作就是配置了一个node,把配置中的eth, dashboard, ssh ethstats 服务注册进去,这里服务注册有很重要的细节,主要看下eth的服务注册过程
【文件位置】cmd/utils/flags.go
// RegisterEthService adds an Ethereum client to the stack(node).
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
// 这里是将服务的构造函数最为回调注册给了node
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
这里分les ful, 不管哪种情况,都是讲构造函数最为回调,注册给了stack,即node,后面启动node,会创建所有的service,即调用这里的回调来创建,可以看下stack.Register函数
【文件位置】node/service.go
定义构造函数回调函数结构
定义可以注册的service要实现的接口
【文件位置】node/node.go
这样,实现了上面Service接口的各个service,把自己的构造函数注册到node.serviceFuncs里面
3. 总入口之startNode
这里省略很多代码,直接跳到node自己的start里面
【文件位置】node/node.go
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
// 配置p2p.Config(p2p/server.go)的参数
// Initialize the p2p server. This creates the node key and
// discovery databases.
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
// 创建了P2P server,传入了config
running := &p2p.Server{Config: n.serverConfig}
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
//这里的serviceFuncs,就是上面截图标红的变量,里面保存各个注册service的构造函数
for _, constructor := range n.serviceFuncs {
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access
ctx.services[kind] = s
}
//这里开始创建各个service,这里要进入eth service 的new详细看流程的
// Construct and save the service
service, err := constructor(ctx)
if err != nil {
return err
}
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service
}
// 这里是将service(因为事先了node的service接口)里面需要的Protocols注册给p2p server
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
// 这里启动了p2p.Server,这里是走向P2P底层代码的入口,涉及到节点发现 处理等,要详细看
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// 这里是启动最初注册的所有service,这里是P2P上层应用的代码
// Start each of the services
var started []reflect.Type
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
//启动RPC
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}
顺着代码流程,看看每一步的注释,这里代码流程开始分叉,
1路学习services的new,start,这相当于P2P上层的应用,
1路学习p2p.server的start,这里是P2P底层节点维护操作,
最后在各种消息业务流程中,这两路又汇合配合
3.1 第一路,services相关流程
还是关注eth service的流程,首先是回调构造函数,创建service
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
......
// Assemble the Ethereum object
chainDb, err := ctx.OpenDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles, "eth/db/chaindata/")
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.ConstantinopleOverride)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, config.MinerNoverify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.MinerGasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = "<nil>"
if bcVersion != nil {
dbVer = fmt.Sprintf("%d", *bcVersion)
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId, "dbversion", dbVer)
if !config.SkipBcVersionCheck {
if bcVersion != nil && *bcVersion > core.BlockChainVersion {
return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, params.VersionWithMeta, core.BlockChainVersion)
} else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
EWASMInterpreter: config.EWASMInterpreter,
EVMInterpreter: config.EVMInterpreter,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManager(chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil {
return nil, err
}
eth.miner = miner.New(eth, chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.MinerExtraData))
eth.APIBackend = &EthAPIBackend{ctx.ExtRPCEnabled(), eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.MinerGasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
return eth, nil
}
代码很长,也很兴奋,通过各个调用函数的名称,看到了一条区块链的建立(当然细节还需要深入学习)。这里先只是看下 NewProtocolManager
/ NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(), //维护的节点
whitelist: whitelist,
newPeerCh: make(chan *peer), // 4个用来同步的信道
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
.......
//这里定义了一个SubProtocol,这个就是上面node.start里面,传递给p2p.server的protocol
// 注意这里的三个函数指针 RUN NodeInfo PeerInfo,很重要
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id enode.ID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
//创建 downloader
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
//创建 fetcher
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
// If fast sync is running, deny importing weird blocks
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
return manager.blockchain.InsertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
这里主要创建了三个管理组件,p2p.Protocols, downloader , fetcher
p2p.Protocols,再node.start会传入到p2p.server中,三个函数指针特别关注,尤其是RUN,这里面调用了manager.handle(perr) , 这个就是本地节点被动接收外面消息的处理函数入口,后面还要深入分析
downloader, 下载网络上的hash和block
fetcher, 收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。初始化传入的几个参数 都是用于处理同步区块链数据的函数指针
这里是创建,在node.start后面接着就有了service.Start(running)来启动services
还是主要看eth的start, 他最后启动了ProtocolManager的Start
eth/backend.go
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
// 这里调用了前面创建ProtocolManager的start
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
eth/handler.go
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
这里面最后起了4个goroutine,主动发起,负责不同的工作,配合上面被动接收,有收有发,节点层面的通信就有了接口了
1. txBroadcastLoop, 广播新出现的交易,会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,立刻调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体
2. minedBroadcastLoop() ,广播新挖掘出的区块,持续等待本个体的新挖掘出区块事件,然后广播给需要的相邻个体。当不再订阅挖掘区块事件时,函数才结束等待并返回。
在收到新挖掘出区块事件后,会连续调用2次BroadcastBlock(), 两次调用仅仅是bool型参数不一样,当为true时,将整个区块一次发给相邻节点的一部分
transferLen := int(math.Sqrt(float64(len(peers))))
transferLen > minBroadcastPeers
transferLen < len(peers)
当为false时,仅仅将新区块的Hash值和Number发送给所有相邻节点
3. Syncer(), 定时与相邻个体进行区块全链的强制同步,首先启动fetcher成员,然后进入循环,每次循环中都像相邻peer列表中最优的那个peer做一次区块全链同步。
同步有两种:如果有新登记加入的相邻个体,则在整个peer列表数目大与5时,发起
如果没有新peer到达,以10s为间隔定时发起
这里所谓的最优,是指peer中所维护区块链的TotalDifficulty(td)最高的,由于td是全链中从创世区块到最新头块的Difficulty值总和,所以td最高就意味着区块链是最新的,跟这样的peer做区块链同步,显然改动最小,即最优
4. txsyncLoop() 将新出现的交易对象均匀的同步给相邻个体,首先是一个数据类型 txsync{p, txs},包含peer和tx列表,通道txsyncCh用来接收txsync{}对象,
每次循环时,
如果从txsyncCh中收到新的数据,把他存到本地map中,key是peer.ID, value是txsync{},并将这组tx对象发送给这个peer;每次向peer发送tx对象的上限数目100*1024,如果 txsync{}对象中有剩余tx, 则该txsync{}对象集训存入map并更新tx数目;
如果本次循环没有新到达的txsync{} ,则从map结构中随机找出一个txsync对象,将其中的tx组发送给相应的peer
重复循环
4个流程总结起来就是 广播交易,广播区块,同步交易,同步区块。
3.2 第二路,p2p.server相关流程
node.start() 里面创建了p2p.server的实例running, 把所有services需要的protocols 添加进去,然后启动running.start(), 从这里开始看下去
【文件位置】p2p/server.go
// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
// static fields
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.c == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *enode.Node)
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
return err
}
}
if err := srv.setupDiscovery(); err != nil {
return err
}
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
srv.loopWG.Add(1)
go srv.run(dialer)
return nil
}
server的启动,重要的流程涉及到 setupListening, setupDiscovery, srv. run 这后续就是底层节点发现,节点维护的操作了,又是另外一个大的模块了
总结:
总结上面的过程:
函数入口到结束,完成功能:
1. makeFullNode 配置节点
2. node start 启动自己,
3. 解锁账号,并注册钱包反馈事件
4. 启动RPC(RPC提供一种能通过网络或者其他I/O连接访问的能力)
5. 如果配置支持挖矿,启动挖矿
6. Node wait()阻塞住程序,直到node stop
重点是makefullnode和node.start 这里面逻辑的配合:
1. makefullnode,会注册各种service(主要看ethereum service),它们都实现了node定义的service接口,注册的时候传入自己的创建回调函数
2. Node.start时,调用各种service的构造函数,创建service的实例,
Ethereum service 在new的时候,创建了 protocolManager (NewProtocolManager),当然也创建了基本的区块链(NewBlockChain)
NewProtocolManager的时候,
创建peers集合,创建与之相关的newPeerCh noMorePeers txsyncCh quitSync同步信道,
创建manager.SubProtocols ,创建以太坊P2P server的通讯协议。这个协议里面有很重要的三个函数指针(Run, NodeInfo PeerInfo) 其中run里面又关联了被动消息处理函数handle,待对端peer有消息来时,就会调用处理
3. 启动 p2p.server,并把各个service支持的protocols传递给p2p.server,例如上面的 ethereum.manager.SubProtocols,
开启监听,发现等维护节点的流程
4. 启动各个service,会将p2p.server传进去(只是后去maxpeers个数用)
Ethereum service的start,会启动ProtocolManager, 进而manger启动4个协程,主动的广播区块,广播交易,同步区块,同步交易