Dead knock Ethereum source code analysis of TXpool
Read with the following code :github.com/blockchainG…
Writing an article is not easy, but I hope you can point out more problems, make friends and mix circles
Concept and principle of trading pool
Overview of the trading pool:
- The data sources of the trading pool mainly come from:
- Local commit, which is when a third-party application calls a local Ethereum node
RPC
Transactions submitted by the service; - Remote synchronization refers to synchronizing transaction data from other Ethereum nodes to the local node through broadcast synchronization.
- Local commit, which is when a third-party application calls a local Ethereum node
- Transactions in the trading pool: obtained and verified by Miner module for mining; The successful mining is written into the block and broadcast
Miner
A pull transaction is replication, and there is no reduction in the number of transactions in the pool. Transactions are not removed from the trading pool until they are written into the normative chain;- If a trade is written into a fork, the pool of trades is not reduced, waiting to be repackaged.
Critical data structure
TxPoolConfig
type TxPoolConfig struct {
Locals []common.Address // Store local account address
NoLocals bool // Whether to enable local transaction mechanism
Journal string // The local transaction path
Rejournal time.Duration // Persist the interval between local transactions
PriceLimit uint64 // The price is out of proportion. If you want to cover a transaction, you cannot cover it if the price does not increase by the proportion required
PriceBump uint64 // Replace the minimum price increase percentage for an existing transaction (once)
AccountSlots uint64 // Executable transaction limits per account
GlobalSlots uint64 // Maximum executable transaction for all accounts
AccountQueue uint64 // Unexecutable transaction limits for a single account
GlobalQueue uint64 // Maximum non-executed transaction limit for all accounts
Lifetime time.Duration // The length of time an account can survive a transaction in the queue
}
Copy the code
Default configuration:
Journal: "transactions.rlp",
Rejournal: time.Hour,
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour
Copy the code
TxPool
type TxPool struct {
config TxPoolConfig // Trade pool configuration
chainconfig *params.ChainConfig // Blockchain configuration
chain blockChain // Define the blockchain interface
gasPrice *big.Int
txFeed event.Feed / / the time flow
scope event.SubscriptionScope // Subscription range
signer types.Signer / / signature
mu sync.RWMutex
istanbul bool // Fork indicator whether we are in the istanbul stage.
currentState *state.StateDB // The state of the current header block
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
}
Copy the code
Txpool initialization
Txpool initialization does the following:
Check the configuration. If there is any problem with the configuration, fill it with the default value
config = (&config).sanitize()
Copy the code
For this check, look at the TxPoolConfig field.
② : Initializes the local account
pool.locals = newAccountSet(pool.signer)
Copy the code
③ add the configured local account address to the trading pool
pool.locals.add(addr)
Copy the code
When we install the Ethereum client, we can specify a data store directory, which will store all the account keystore files that we import or create through the local client. The loading process is to load the account data from that directory
④ update the trading pool
pool.reset(nil, chain.CurrentBlock().Header())
Copy the code
⑤ : Create a list of all transactions stored, and store the prices of all transactions with the minimum heap
pool.priced = newTxPricedList(pool.all)
Copy the code
By sorting, transactions with higher gasprice are processed first.
⑥ : Load local transactions from local disk if local transactions are enabled
if! config.NoLocals && config.Journal ! ="" {
pool.journal = newTxJournal(config.Journal)
iferr := pool.journal.load(pool.AddLocals); err ! =nil {
log.Warn("Failed to load transaction journal"."err", err)
}
iferr := pool.journal.rotate(pool.local()); err ! =nil {
log.Warn("Failed to rotate transaction journal"."err", err)
}
}
Copy the code
⑦ : Subscribe to event messages on the chain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
Copy the code
⑧ : Start the main loop
go pool.loop()
Copy the code
Note: Local transactions have higher permissions than remote transactions. First, they are not easily replaced. The other is persistence, where unpackaged local transactions are kept in a local Journal file. So when the node starts, local transactions are loaded locally first.
Local addresses are whitelisted, and transactions sent from this address are considered local, whether they are sent locally or from a remote end.
This is the end of the trading pool loading process.
Add a transaction to txPool
As we said before, the source of the transaction in the transaction pool is either broadcast by other nodes, or locally submitted, and if you go back to the source code one is AddLocal, one is AddRemote, so addTxs will be called either way. Our discussion of adding transactions will start with this function, which does the following things, illustrated with a sketch:
-
Transactions that already exist in the filter
ifpool.all.Get(tx.Hash()) ! =nil { errs[i] = fmt.Errorf("known transaction: %x", tx.Hash()) knownTxMeter.Mark(1) continue } Copy the code
-
Adds the transaction to the queue
newErrs, dirtyAddrs := pool.addTxsLocked(news, local) Copy the code
Add (tx, local) - addTxsLockedCopy the code
Enter the pool.add function, which adds transactions to a queue and waits for a later promote to pending. If it already exists in queue or pending and its gas price is higher, the previous transaction will be overwritten. Let’s break down the add function.
① : See if the transaction has been received, if it has been discarded
ifpool.all.Get(hash) ! =nil { log.Trace("Discarding already known transaction"."hash", hash) knownTxMeter.Mark(1) return false, fmt.Errorf("known transaction: %x", hash) } Copy the code
② : If the transaction does not pass the verification, it will be discarded.
ValidateTx: Basically does the following things - the transaction size cannot exceed32KB - The transaction amount cannot be negative - The transaction gas value cannot exceed the gaslimit set by the current trading pool - The transaction signature must be correct - If the transaction is a remote transaction, it is necessary to verify whether the gasprice of the transaction is less than the minimum gasprice of the trading pool. If it is a local transaction, packaging is preferred. Cost == V + GP * GL - Determine whether the transaction cost gas is less than its estimated cost gasCopy the code
③ If the trading pool is full, discard trades that are too low in price
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { if! local && pool.priced.Underpriced(tx, pool.locals) { ... } drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue- 1), pool.locals) for _, tx := range drop { ... pool.removeTx(tx.Hash(), false)}}Copy the code
Notice GlobalSlots and GlobalQueue, which is the maximum capacity of pending and queue. If the number of transactions in the pool is greater than the sum of the two, we discard the transactions that are too low priced.
④ Determine whether there is a transaction with the same Nonce value in the pending queue. If so, determine whether the gasprice set by the current exchange exceeds the PriceBump percentage set. If so, replace overwrites existing transactions. Otherwise, return an error and replace transaction gasprice is too low, and throw it to queue (enqueueTx).
iflist := pool.pending[from]; list ! =nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if! inserted { pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
ifold ! =nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction"."hash", hash, "from", from, "to", tx.To())
returnold ! =nil.nil
}
// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx)
Copy the code
So much for the process of adding transactions. Next is how to quickly promote transactions added to a queue into a pending transaction.
-
Increase trading
Promoting transactions basically throws transactions from queue to pending, which we’ll focus on in the next section
done := pool.requestPromoteExecutables(dirtyAddrs) Copy the code
Deal to upgrade
PromoteExecutables moves transactions from the Future queue to pending, and removes many invalid transactions, such as low NOnce or low balance, in the following steps:
① Delete all transactions whose Nonce in queue is lower than the current nonce of the account from all
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed old queued transaction"."hash", hash)
}
Copy the code
② Remove all transactions in queue where the cost is greater than the account balance or gas is greater than the limit
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed unpayable queued transaction"."hash", hash)
}
Copy the code
③ Move all executable transactions from queue to Pending (proteTx)
Note: Executable transactions: several consecutive transactions whose Nonce value in pending is greater than or equal to the current account status Nonce are regarded as prepared transactions
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction"."hash", hash)
promoted = append(promoted, tx)
}
}
Copy the code
This method differs from add in that addTx inserts the new transaction into pending, while promoteTx inserts the Txs from the queue into pending.
inserted, old := list.Add(tx, pool.config.PriceBump)
if! inserted {// An older transaction was better, discard this
// Old transaction is better, delete this transaction
pool.all.Remove(hash)
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
// Now this transaction is better, delete the old transaction
ifold ! =nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)}else{... }Copy the code
I mainly did the following things:
- Insert transaction
pending
If the transaction is to be insertednonce
inpending
List exists, then the transaction to be insertedgas price
Greater than or equal to the original transaction value110% (
withpricebump
Set about), replace the original transaction - If the new trade replaces a trade from
all
Delete old transactions from the list - Last update
all
The list of
After proteTx, any transactions to be thrown pending are placed in promoted []*types.Transaction, and then back to promoteExecutables, continue the following steps:
④ if the non-local AccountQueue is larger than the limit AccountQueue, remove the transaction with a large nonce from the end
if! pool.locals.contains(addr) { caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
...
}
Copy the code
⑤ : If the transaction of this account in the queue is empty, the account will be deleted
if list.Empty() {
delete(pool.queue, addr)
}
Copy the code
That’s all we need to do with the upgrade deal.
Trading relegation
A few scenarios for trading downgrades:
- A new block appears that will be taken from
pending
To remove transactions that appear in a blockqueue
In the - Or another deal (
gas price
Higher), will be frompending
Remove thequeue
In the
Key function: demoteUnexecutables, which does the following:
① : Iterate over the transaction list corresponding to all addresses in pending
for addr, list := rangepool.pending { ... }Copy the code
② : Delete all transactions considered too old (low nonce)
olds := list.Forward(nonce)
for _, tx := range olds {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed old pending transaction"."hash", hash)
}
Copy the code
③ Delete all overpriced transactions (low balance or exhausted) and queue all invalid transactions for later use
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction"."hash", hash)
pool.all.Remove(hash)
}
pool.priced.Removed(len(olds) + len(drops))
pendingNofundsMeter.Mark(int64(len(drops)))
for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction"."hash", hash)
pool.enqueueTx(hash, tx)
}
Copy the code
④ If there is a gap before the transaction, the subsequent transaction is moved to the queue
if list.Len() > 0 && list.txs.Get(nonce) == nil {
gapped := list.Cap(0)
for _, tx := range gapped {
hash := tx.Hash()
log.Error("Demoting invalidated transaction"."hash", hash)
pool.enqueueTx(hash, tx)
}
pendingGauge.Dec(int64(len(gapped)))
}
Copy the code
Note: gaps are usually caused by transaction balance problems. If the original standard chain A transaction M costs 10, after the fork, the account in the fork chain B issued A transaction M costs 20, which leads to the account balance could have paid A transaction on the CHAIN A, but may not be enough on the chain B. If the transaction with insufficient balance is N +3 in B, there will be A gap between n+2 and N +4 transactions in A chain, which will lead to the downgrade of all transactions from n+3 onwards.
This is the end of the downgrade.
Reset trading pool
Resetting the trading pool retrieves the current state of the blockchain (which changes the chain state mainly due to updates) and ensures that the contents of the trading pool are valid for the chain state.
Reset is called when:
TxPool
Initialization process:NewTxPool
;TxPool
Event listenersgo
Procedure Received a specification chain update event
The flow chart is as follows:
According to the above flow chart, the main function is to rearrange the trading pool due to the update of the normative chain:
① if the old block block is not empty and the old block block is not the parent block of the new block, it means that the new and old blocks are not on the same chain
ifoldHead ! =nil&& oldHead.Hash() ! = newHead.ParentHash {}Copy the code
② if the difference between the new header block and the old header block is greater than 64, then all transactions do not need to fall back to the trading pool
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg"."depth", depth)
}
Copy the code
③ if the head block of the old chain is larger than the head block height of the new chain, the old chain backs up and reclaims all backtracked transactions
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()- 1); rem == nil {
log.Error("Unrooted old chain seen by tx pool"."block", oldHead.Number, "hash", oldHead.Hash())
return}}Copy the code
④ if the head block of the new chain is larger than the head block of the old chain, the new chain backs up and reclaims the transaction
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()- 1); add == nil {
log.Error("Unrooted new chain seen by tx pool"."block", newHead.Number, "hash", newHead.Hash())
return}}Copy the code
⑤ : When the old and new chains reach the same height, they fall back at the same time, until the common parent node is found
forrem.Hash() ! = add.Hash() { discarded =append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()- 1); rem == nil {
log.Error("Unrooted old chain seen by tx pool"."block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()- 1); add == nil {
log.Error("Unrooted new chain seen by tx pool"."block", newHead.Number, "hash", newHead.Hash())
return}}Copy the code
⑥ : Set the latest world status for the trading pool
statedb, err := pool.chain.StateAt(newHead.Root)
iferr ! =nil {
log.Error("Failed to reset txpool state"."err", err)
return
}
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
Copy the code
⑦ : Put the old chain back into the trading pool
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
Copy the code
At this point, the entire reset process is over.
Reference:
mindcarver.cn/
Github.com/mindcarver/…
Learnblockchain. Cn / 2019/06/03 /…
Blog.csdn.net/lj900911/ar…