sequence
In this paper, we study the Artemis transactionTimeoutScanPeriod
transactionTimeoutScanPeriod
Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache/activemq/Artemis/core/config/impl/ConfigurationImpl Java
public class ConfigurationImpl implements Configuration, Serializable { //...... private long transactionTimeoutScanPeriod = ActiveMQDefaultConfiguration.getDefaultTransactionTimeoutScanPeriod(); / /... @Override public longgetTransactionTimeoutScanPeriod() {
return transactionTimeoutScanPeriod;
}
@Override
public ConfigurationImpl setTransactionTimeoutScanPeriod(final long period) {
transactionTimeoutScanPeriod = period;
returnthis; } / /... }Copy the code
- ConfigurationImpl defines transactionTimeoutScanPeriod attribute, the default is 1000
ActiveMQServerImpl
Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache/activemq/Artemis/core/server/impl/ActiveMQServerImpl jar a
public class ActiveMQServerImpl implements ActiveMQServer { //...... synchronized boolean initialisePart1(boolean scalingDown) throws Exception { //...... resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool); / /... } / /... }Copy the code
- ActiveMQServerImpl initialisePart1 using configuration. GetTransactionTimeout (), the configuration, getTransactionTimeoutScanPeriod ( ), scheduledPool creates ResourceManagerImpl
ResourceManagerImpl
Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache activemq/Artemis/core/transaction/impl/ResourceManagerIm pl.java
public class ResourceManagerImpl implements ResourceManager {
private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<>();
private final List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<>();
private final int defaultTimeoutSeconds;
private boolean started = false;
private TxTimeoutHandler task;
private final long txTimeoutScanPeriod;
private final ScheduledExecutorService scheduledThreadPool;
public ResourceManagerImpl(final int defaultTimeoutSeconds,
final long txTimeoutScanPeriod,
final ScheduledExecutorService scheduledThreadPool) {
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
this.scheduledThreadPool = scheduledThreadPool;
}
// ActiveMQComponent implementation
@Override
public int size() {
return transactions.size();
}
@Override
public void start() throws Exception {
if (started) {
return; } task = new TxTimeoutHandler(); Future<? > future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS); task.setFuture(future); started =true;
}
@Override
public void stop() throws Exception {
if(! started) {return;
}
if(task ! = null) { task.close(); } started =false; } / /... }Copy the code
- ResourceManagerImpl implements ResourceManager interface, whose start method creates TxTimeoutHandler, and schedules execution using fixedRate of txTimeoutScanPeriod
TxTimeoutHandler
Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache activemq/Artemis/core/transaction/impl/ResourceManagerIm pl.java
private class TxTimeoutHandler implements Runnable {
private boolean closed = false; private Future<? > future; @Override public voidrun() {
if (closed) {
return;
}
Set<Transaction> timedoutTransactions = new HashSet<>();
long now = System.currentTimeMillis();
for (Transaction tx : transactions.values()) {
if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
Transaction removedTX = removeTransaction(tx.getXid());
if(removedTX ! = null) { ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid()); timedoutTransactions.add(removedTX); }}}for (Transaction failedTransaction : timedoutTransactions) {
try {
failedTransaction.rollback();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorTimingOutTX(e, failedTransaction.getXid());
}
}
}
synchronized void setFuture(final Future<? > future) { this.future = future; } voidclose() {
if(future ! = null) { future.cancel(false);
}
closed = true; }}Copy the code
- TxTimeoutHandler implements the Runnable interface. The run method iterates through transactions, executing tx.hastimedOut (now, defaultTimeoutSeconds) one by one. For timeout, perform removeTransaction(tx.getxID ()) and rollback one by one
summary
ActiveMQServerImpl initialisePart1 using configuration. GetTransactionTimeout (), the configuration, getTransactionTimeoutScanPeriod ( ), scheduledPool creates ResourceManagerImpl; ResourceManagerImpl implements ResourceManager interface, whose start method creates TxTimeoutHandler, and uses fixedRate of txTimeoutScanPeriod to schedule execution. TxTimeoutHandler implements the Runnable interface. The run method iterates through transactions, executing tx.hastimedOut (now, defaultTimeoutSeconds) one by one. For timeout, perform removeTransaction(tx.getxID ()) and rollback one by one
doc
- ResourceManagerImpl