sequence

In this paper, we study the Artemis transactionTimeoutScanPeriod

transactionTimeoutScanPeriod

Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache/activemq/Artemis/core/config/impl/ConfigurationImpl Java

public class ConfigurationImpl implements Configuration, Serializable { //...... private long transactionTimeoutScanPeriod = ActiveMQDefaultConfiguration.getDefaultTransactionTimeoutScanPeriod(); / /... @Override public longgetTransactionTimeoutScanPeriod() {
      return transactionTimeoutScanPeriod;
   }

   @Override
   public ConfigurationImpl setTransactionTimeoutScanPeriod(final long period) {
      transactionTimeoutScanPeriod = period;
      returnthis; } / /... }Copy the code
  • ConfigurationImpl defines transactionTimeoutScanPeriod attribute, the default is 1000

ActiveMQServerImpl

Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache/activemq/Artemis/core/server/impl/ActiveMQServerImpl jar a

public class ActiveMQServerImpl implements ActiveMQServer { //...... synchronized boolean initialisePart1(boolean scalingDown) throws Exception { //...... resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool); / /... } / /... }Copy the code
  • ActiveMQServerImpl initialisePart1 using configuration. GetTransactionTimeout (), the configuration, getTransactionTimeoutScanPeriod ( ), scheduledPool creates ResourceManagerImpl

ResourceManagerImpl

Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache activemq/Artemis/core/transaction/impl/ResourceManagerIm pl.java

public class ResourceManagerImpl implements ResourceManager {

   private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<>();

   private final List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<>();

   private final int defaultTimeoutSeconds;

   private boolean started = false;

   private TxTimeoutHandler task;

   private final long txTimeoutScanPeriod;

   private final ScheduledExecutorService scheduledThreadPool;

   public ResourceManagerImpl(final int defaultTimeoutSeconds,
                              final long txTimeoutScanPeriod,
                              final ScheduledExecutorService scheduledThreadPool) {
      this.defaultTimeoutSeconds = defaultTimeoutSeconds;
      this.txTimeoutScanPeriod = txTimeoutScanPeriod;
      this.scheduledThreadPool = scheduledThreadPool;
   }

   // ActiveMQComponent implementation

   @Override
   public int size() {
      return transactions.size();
   }

   @Override
   public void start() throws Exception {
      if (started) {
         return; } task = new TxTimeoutHandler(); Future<? > future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS); task.setFuture(future); started =true;
   }

   @Override
   public void stop() throws Exception {
      if(! started) {return;
      }
      if(task ! = null) { task.close(); } started =false; } / /... }Copy the code
  • ResourceManagerImpl implements ResourceManager interface, whose start method creates TxTimeoutHandler, and schedules execution using fixedRate of txTimeoutScanPeriod

TxTimeoutHandler

Activemq – Artemis – 2.11.0 / Artemis server/SRC/main/Java/org/apache activemq/Artemis/core/transaction/impl/ResourceManagerIm pl.java

   private class TxTimeoutHandler implements Runnable {

      private boolean closed = false; private Future<? > future; @Override public voidrun() {
         if (closed) {
            return;
         }

         Set<Transaction> timedoutTransactions = new HashSet<>();

         long now = System.currentTimeMillis();

         for (Transaction tx : transactions.values()) {

            if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
               Transaction removedTX = removeTransaction(tx.getXid());
               if(removedTX ! = null) { ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid()); timedoutTransactions.add(removedTX); }}}for (Transaction failedTransaction : timedoutTransactions) {
            try {
               failedTransaction.rollback();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.errorTimingOutTX(e, failedTransaction.getXid());
            }
         }
      }

      synchronized void setFuture(final Future<? > future) { this.future = future; } voidclose() {
         if(future ! = null) { future.cancel(false);
         }

         closed = true; }}Copy the code
  • TxTimeoutHandler implements the Runnable interface. The run method iterates through transactions, executing tx.hastimedOut (now, defaultTimeoutSeconds) one by one. For timeout, perform removeTransaction(tx.getxID ()) and rollback one by one

summary

ActiveMQServerImpl initialisePart1 using configuration. GetTransactionTimeout (), the configuration, getTransactionTimeoutScanPeriod ( ), scheduledPool creates ResourceManagerImpl; ResourceManagerImpl implements ResourceManager interface, whose start method creates TxTimeoutHandler, and uses fixedRate of txTimeoutScanPeriod to schedule execution. TxTimeoutHandler implements the Runnable interface. The run method iterates through transactions, executing tx.hastimedOut (now, defaultTimeoutSeconds) one by one. For timeout, perform removeTransaction(tx.getxID ()) and rollback one by one

doc

  • ResourceManagerImpl