Author: Lin Guanhong/The Ghost at my Fingertips. Transporters, please: Be sure to identify the source.
The Denver nuggets: juejin. Cn/user / 178526…
Blog: www.cnblogs.com/linguanh/
Making: github.com/af913337456…
Published books:
- 1.0- Blockchain Ethereum DApp Development Combat
- “2.0 — Blockchain DApp Development: Based on Ethereum and Bitcoin Public Chain”
directory
- Before the order
- Ethereum trading pool
knowledge
conclusion - The source code inside
- Local deals
- Initialization of the local wallet address
- Loading local Transactions
- pool.journal.load
- pool.AddLocals
- Update of local transaction files
- The remote trading
- Initialization of P2P communication module
- Receiving P2P Messages
- Add a remote transaction to a trading pool
- Local deals
- “Eggs”
First article in 21 years, open source writing for 6 years.
Bitcoin and Ethereum prices have also taken off recently, and a BTC can now drive a full-size Tesla Model 3. Wide of the mark.
Release this article: From the perspective of a blockchain technology developer, it was last year when I talked about my experience in the blockchain industry and my understanding of it. Now looking back at the last paragraph, it became a prophecy.
Back to the point.
Usually do data pool development and so on. For example: order pool, request pool… Traditional server-side thinking leads us to think directly to message-oriented middleware. Use messaging components like RocketMQ, Redis, Kafka…
However, in the application of blockchain public chain, each of the known multiple public chains has such a functional module as transaction pool, and their code implementation has not been introduced into messaging middleware to implement.
When I read the source code of the Ethereum public chain earlier, I felt novel to the realization idea of the Ethereum trading pool. Today, I summarize and share with you the practice and characteristics of the blockchain public chain application that does not rely on messaging-oriented middleware to realize the trading pool.
Ethereum trading pool summary _(BTW: During the interview can be remembered) :
- Classification of transactions:
- From the point of view of local file storage and non-storage:
- Local transaction, if the sender address of the transaction is
Configuration variables
At the specified address, the transaction is considered to be local:When a node is started, you can specify in the configuration file that local transactions are not enabled
.
- Remote transactions, transactions that do not meet condition 1.
- Local transaction, if the sender address of the transaction is
- From a memory storage perspective:
- Queue, the transaction to be entered Pending, structure is
map[addr]TxList
; - Pending: Transactions to be queued. The structure of Pending is the same as Queue.
- Queue, the transaction to be entered Pending, structure is
- From the point of view of local file storage and non-storage:
- Transaction input (generation) :
- At the beginning of the program:
- Local transaction, load from local file to memory, local if not, natural is 0 input;
- Remote transaction, by P2P communication module, received transaction data, stored in memory.
- Program running:
- To receive transactions on their own
RPC requests
, SendTransaction or SendRawTransaction; - Through P2P communication module, receive information from other nodes, including the following actions:
- Removal of old deals;
- An increase in new deals.
- To receive transactions on their own
- At the beginning of the program:
- Persistence strategy for transactions:
- Local transactions:
timing
Select local transactions from Pending and QueueStore to a local file
;- Storage, file replacement,
First, the new
A,To rename
A wave; - Note point 2, file substitution, which means
That is to update
.Also delete
Operation; - Encoding mode,
RLP coding
, not JSON.
- Remote transactions:
- Does not exist, does not persist, always relies on other nodes P2P communication synchronization.
- Local transactions:
- Interrupt recovery:
- Local transactions, same as above
At the beginning of the program
The operation; - Remote transactions, no recovery, memory transactions lost is lost, no impact. Even if the current node is down, the other nodes are still working.
- Local transactions, same as above
In the fourth point above, interrupt recovery. Compared with the messaging middleware of traditional back-end services, the guarantee of message loss and the approach of blockchain public chain are completely maintained by distribution. Data loss of a single node can be synchronized from other nodes. Therefore, the implementation of their transaction pool is relatively more flexible, and the coding difficulty lies in the message synchronization part.
Now comes the boring source code analysis phase, read the spare readers can continue
Look at the notes.
Local deals
1. Initialize the local wallet address
Go, config.Locals, specified by the configuration file, is an array of Ethereum wallet addresses.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool{...for _, addr := range config.Locals { // Add the local address from the configuration file
log.Info("Setting new local account"."address", addr)
// add to the locals variable, which is used later to filter out whether an address is a local address
pool.locals.add(addr)
}
...
}
Copy the code
2. Load transaction data from a local file, that is, load a local transaction
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool{... pool.locals = newAccountSet(pool.signer)for _, addr := range config.Locals {
log.Info("Setting new local account"."address", addr)
pool.locals.add(addr)
}
...
// This is done
// If local transactions and journaling is enabled, load from disk
if! config.NoLocals && config.Journal ! ="" { // If the configuration enables the local load requirement
pool.journal = newTxJournal(config.Journal)
// Load is the loading function, and pool.addlocals is the actual adding function
iferr := pool.journal.load(pool.AddLocals); err ! =nil {
log.Warn("Failed to load transaction journal"."err", err)
}
iferr := pool.journal.rotate(pool.local()); err ! =nil {
log.Warn("Failed to rotate transaction journal"."err", err)
}
}
...
go pool.loop() // Loop through events
}
Copy the code
3. pool.journal.load
Source file: tx_journal.go
func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
// Skip the parsing if the journal file doesn't exist at all
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path) // Open the file and read the stream data
iferr ! =nil {
return err
}
...
stream := rlp.NewStream(input, 0) // Use the RLP encoding algorithm to decode data. loadBatch :=func(txs types.Transactions) {
for _, err := range add(txs) { // Call the add function to add
iferr ! =nil {
log.Debug("Failed to add journaled transaction"."err", err)
dropped++
}
}
}
// loadBatch is called below. }Copy the code
4. pool.AddLocals
Pool.addlocals is the actual add function. After a series of internal calls, we end up with the tx_pool.add function. Pool queues are all map queues and can be de-weighted based on the same key.
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error){...// The following if, if already in pool.pending, indicates that it was previously added to the queue
iflist := pool.pending[from]; list ! =nil && list.Overlaps(tx) {
...
pool.journalTx(from, tx) // Internally call journal.insert
returnold ! =nil.nil
}
replaced, err = pool.enqueueTx(hash, tx) // In this case, it will be added to pool.enqueue
iferr ! =nil {
return false, err
}
pool.journalTx(from, tx) // Internally call journal.insert. }func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// Local wallet address, skip if not specified
if pool.journal == nil| |! pool.locals.contains(from) {return
}
// Insert will cause repeated add-ons, but load will be unloaded according to addr when it comes out
iferr := pool.journal.insert(tx); err ! =nil {
log.Warn("Failed to journal local transaction"."err", err)
}
}
Copy the code
Local transactions have been added to the pool queue as of this point.
When a node starts up, in addition to loading transactions locally to the queue, it constantly listens for events in the chain, such as receiving transactions and adding them to the queue.
5. Update (insert/delete) of local transaction file
Loop is the triggered entry. Except that the active journal. Insert achieves the purpose of inserting local transactions.
The following update operations also achieve the purpose of including inserts: delete the old transaction from the file and store the new transaction to the file by means of replacement
func (pool *TxPool) loop(a){...for {
select{...// Handle local transaction journal rotation
Rotate The following local transaction data files are updated regularly on the journal timer
case <-journal.C:
ifpool.journal ! =nil {
pool.mu.Lock()
iferr := pool.journal.rotate(pool.local()); err ! =nil {
log.Warn("Failed to rotate local tx journal"."err", err)
}
pool.mu.Unlock()
}
}
}
}
Copy the code
Journal. Rotate stores transactions related to the locals wallet address from the pool’s transaction pending and queue to the file using file substitution. Note, only save local wallet address, other, do not save.
/ / input
func (pool *TxPool) local(a) map[common.Address]types.Transactions{...for addr := range pool.locals.accounts {
ifpending := pool.pending[addr]; pending ! =nil {
// Add pending
txs[addr] = append(txs[addr], pending.Flatten()...)
}
ifqueued := pool.queue[addr]; queued ! =nil {
// Add queue
txs[addr] = append(txs[addr], queued.Flatten()...) }}return txs
}
// all from local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error{...// journal. Path +".new" suffix.new
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
iferr ! =nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
iferr = rlp.Encode(replacement, tx); err ! =nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()
// rename, rename file to original path, update, replace purpose
if err = os.Rename(journal.path+".new", journal.path); err ! =nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
iferr ! =nil {
return err
}
...
return nil
}
Copy the code
The remote trading
Initialization of P2P communication module
Source file: eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error){...ifconfig.TxPool.Journal ! ="" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
// Initialize the trading pool
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
...
// Initialize the protocolManager with a transaction pool pointer object as an argument
if eth.protocolManager, err = NewProtocolManager(
chainConfig, checkpoint, config.SyncMode, config.NetworkId,
eth.eventMux, `eth.txPool`, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err ! =nil {
return nil, err
}
...
return eth, nil
}
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// Initialize tx_fetcher and assign to addTxs with txpool.addremotes
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}
Copy the code
Receiving P2P Messages
Source file: eth/handler.go
func (pm *ProtocolManager) handleMsg(p *peer) error{...switch{...// Received transaction data from other nodes
case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
...
// Enqueue adds the transaction to the transaction pool
pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)
}
...
}
/ / tx_fetcher. Go file
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error{... errs := f.addTxs(txs)// Add, this function is actually tx_pool.go AddRemotes. }Copy the code
Add a remote transaction to a trading pool
// tx_pool.go
// addTxs adds transactions to Pending and Queue
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false.false)}Copy the code
Finished work
For more on ethereum development, see my book:
“2.0 — Blockchain DApp Development: Based on Ethereum and Bitcoin Public Chain”