sequence
This paper mainly studies the state of Storm Trident
StateType
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/StateType. Java
public enum StateType {
NON_TRANSACTIONAL,
TRANSACTIONAL,
OPAQUE
}
Copy the code
- StateType has three types: NON_TRANSACTIONAL TRANSACTIONAL, TRANSACTIONAL TRANSACTIONAL, and OPAQUE OPAQUE transactions
- There are three types of spout: Non-transactional, Transactional, and Opaque Transactional
State
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/state. The Java
/**
* There's 3 different kinds of state: * * 1. non-transactional: ignores commits, updates are permanent. no rollback. a cassandra incrementing state would be like this 2. * repeat-transactional: idempotent as long as all batches for a txid are identical 3. opaque-transactional: the most general kind of state. * updates are always done based on the previous version of the value if the current commit = latest stored commit Idempotent even if the * batch for a txid can change. * * repeat transactional is idempotent for transactional spouts opaque transactional is idempotent for opaque or transactional spouts * * Trident should log warnings when state is idempotent but updates will not be idempotent because of spout */ // retrieving is encapsulated in Retrieval interface public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occuring off a DRPC stream void commit(Long txid); }Copy the code
- Non-transactional, ignoring commits, updates are persistent, no rollback, increstate of Cassandra is one of these types. At-most or at-least once
- Repeat – transactional, referred to as “transactional, asked whether replayed, of the same batch txid is always the same, and inside a tuple is constant, a tuple belongs only to a batch, no overlap between each batch; For state updates, replay encounters the same TXID and can be skipped; Requires less state in the database, but is less fault tolerant and ensures exactly once semantics
- Opaque Transactional (opaque for short) is widely used. It is more fault tolerant than opaque transactional. It does not require a tuple to be in the same batch/ TXID all the time. However, it can guarantee that every tuple is processed successfully only once in one batch. OpaqueTridentKafkaSpout is an implementation of this type, which can tolerate kafka node loss errors; For state updates, replay encounters the same TXID and overwrites it with the current prevValue; Requires more space in the database to store state, but is fault-tolerant and ensures exactly once semantics
MapState
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/MapState. Java
public interface MapState<T> extends ReadOnlyMapState<T> {
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);
}
Copy the code
- MapState inherits the ReadOnlyMapState interface, which in turn inherits the State interface
- Here, we mainly analyze several implementation classes of MapState
NonTransactionalMap
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/NonTransactionalMap. Java
public class NonTransactionalMap<T> implements MapState<T> {
IBackingMap<T> _backing;
protected NonTransactionalMap(IBackingMap<T> backing) {
_backing = backing;
}
public static <T> MapState<T> build(IBackingMap<T> backing) {
return new NonTransactionalMap<T>(backing);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
return _backing.multiGet(keys);
}
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<T> curr = _backing.multiGet(keys);
List<T> ret = new ArrayList<T>(curr.size());
for (int i = 0; i < curr.size(); i++) {
T currVal = curr.get(i);
ValueUpdater<T> updater = updaters.get(i);
ret.add(updater.update(currVal));
}
_backing.multiPut(keys, ret);
return ret;
}
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
_backing.multiPut(keys, vals);
}
@Override
public void beginCommit(Long txid) {
}
@Override
public void commit(Long txid) {
}
}
Copy the code
- NonTransactionalMap wraps IBackingMap, beginCommit and commit methods do nothing
- The multiUpdate method constructs the List RET, which is then implemented using the multiPut of IBackingMap
TransactionalMap
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/TransactionalMap. Java
public class TransactionalMap<T> implements MapState<T> {
CachedBatchReadsMap<TransactionalValue> _backing;
Long _currTx;
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {
_backing = new CachedBatchReadsMap(backing);
}
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {
returnnew TransactionalMap<T>(backing); } @Override public List<T> multiGet(List<List<Object>> keys) { List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys); List<T> ret = new ArrayList<T>(vals.size());for (CachedBatchReadsMap.RetVal<TransactionalValue> retval : vals) {
TransactionalValue v = retval.val;
if(v ! = null) { ret.add((T) v.getVal()); }else{ ret.add(null); }}return ret;
}
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<CachedBatchReadsMap.RetVal<TransactionalValue>> curr = _backing.multiGet(keys);
List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(curr.size());
List<List<Object>> newKeys = new ArrayList();
List<T> ret = new ArrayList<T>();
for (int i = 0; i < curr.size(); i++) {
CachedBatchReadsMap.RetVal<TransactionalValue> retval = curr.get(i);
TransactionalValue<T> val = retval.val;
ValueUpdater<T> updater = updaters.get(i);
TransactionalValue<T> newVal;
boolean changed = false;
if (val == null) {
newVal = new TransactionalValue<T>(_currTx, updater.update(null));
changed = true;
} else {
if(_currTx ! = null && _currTx.equals(val.getTxid()) && ! retval.cached) { newVal = val; }else {
newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));
changed = true;
}
}
ret.add(newVal.getVal());
if(changed) { newVals.add(newVal); newKeys.add(keys.get(i)); }}if(! newKeys.isEmpty()) { _backing.multiPut(newKeys, newVals); }return ret;
}
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());
for(T val : vals) { newVals.add(new TransactionalValue<T>(_currTx, val)); } _backing.multiPut(keys, newVals); } @Override public void beginCommit(Long txid) { _currTx = txid; _backing.reset(); } @Override public void commit(Long txid) { _currTx = null; _backing.reset(); }}Copy the code
- The “beginCommit” will set the current txID and reset _backing while the “commit” will reset the TXID. Then reset _backing
- The multiUpdate method checks if _currTx already has a value and the value! retval.cached(
That is, it is not multiPut in this transaction
), then the value will not be updated (skip the update
), newVal = val - TransactionalValue multiPut methods in batch, and then use the CachedBatchReadsMap. MultiPut (List < List > keys, List vals) method, this method will update to the cache update the value
OpaqueMap
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/trident/state/map/OpaqueMap. Java
public class OpaqueMap<T> implements MapState<T> { CachedBatchReadsMap<OpaqueValue> _backing; Long _currTx; protected OpaqueMap(IBackingMap<OpaqueValue> backing) { _backing = new CachedBatchReadsMap(backing); } public static <T> MapState<T> build(IBackingMap<OpaqueValue> backing) { return new OpaqueMap<T>(backing); } @Override public List<T> multiGet(List<List<Object>> keys) { List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys); List<T> ret = new ArrayList<T>(curr.size()); for (CachedBatchReadsMap.RetVal<OpaqueValue> retval : curr) { OpaqueValue val = retval.val; if(val ! = null) {if (retval.cached) { ret.add((T) val.getCurr()); } else{ ret.add((T) val.get(_currTx)); }}else{ ret.add(null); }}return ret; } @Override public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) { List<CachedBatchReadsMap.RetVal<OpaqueValue>> curr = _backing.multiGet(keys); List<OpaqueValue> newVals = new ArrayList<OpaqueValue>(curr.size()); List<T> ret = new ArrayList<T>(); for (int i = 0; i < curr.size(); i++) { CachedBatchReadsMap.RetVal<OpaqueValue> retval = curr.get(i); OpaqueValue<T> val = retval.val; ValueUpdater<T> updater = updaters.get(i); T prev; if (val == null) { prev = null; } else { if (retval.cached) { prev = val.getCurr(); } else { prev = val.get(_currTx); } } T newVal = updater.update(prev); ret.add(newVal); OpaqueValue<T> newOpaqueVal; if (val == null) { newOpaqueVal = new OpaqueValue<T>(_currTx, newVal); } else { newOpaqueVal = val.update(_currTx, newVal); } newVals.add(newOpaqueVal); } _backing.multiPut(keys, newVals); return ret; } @Override public void multiPut(List<List<Object>> keys, List<T> vals) { List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size()); for (T val : vals) { updaters.add(new ReplaceUpdater<T>(val)); } multiUpdate(keys, updaters); } @Override public void beginCommit(Long txid) { _currTx = txid; _backing.reset(); } @Override public void commit(Long txid) { _currTx = null; _backing.reset(); } static class ReplaceUpdater<T> implements ValueUpdater<T> { T _t; public ReplaceUpdater(T t) { _t = t; } @Override public T update(Object stored) { return_t; }}}Copy the code
- The “beginCommit” will set the current TXID and reset _backing. The “commit” will reset the TXID and then reset _backing
- Unlike TransactionalMap, ReplaceUpdater is used when multiPut is performed, and multiUpdate is called to enforce overwriting
- The multiUpdate method differs from TransactionalMap in that it updates based on the prev value and evaluates newVal
summary
- Trident updates state in strict batch order. For example, batch with TXID 3 can be processed only after batch with TXID 2 is processed
- There are three types of state: Non-transactional, Transactional, and Opaque Transactional. The corresponding SPOUT is also of these three types
- Non-transactional does not guarantee exactly once. It can be at-least once or at-most once. Its state calculation refers to NonTransactionalMap, and beginCommit and COMMIT operations are not processed
- Transactional type can guarantee exactly once, but it has strict requirements. Txids and tuples of the same batch must remain the same when replayed. Therefore, it has poor fault tolerance, but its state calculation is relatively simple. Refer to TransactionalMap and skip the value of the same TXID
- The Opaque Transactional type can also guarantee exactly once. It allows a tuple to be processed in another batch after a tuple fails, so it has good fault tolerance. However, more PREV values are stored in state calculation. If the value of the same TXID is encountered, overwrite the current value with the prev value
- Trident will ensure that the exactly Once state calculation is wrapped up and used by passing the corresponding StateFactory to the persistentAggregate. A Factory that supports multiple StateTypes can choose to use the StateType attribute to construct different Transactional states by passing in different parameters. You can also customize the StateFactory by implementing StateFactory, or you can customize stateQuery queries by inheriting BaseQueryFunction, or by inheriting BaseStateUpdater for custom updates. It is passed in through partitionPersist
doc
- Trident Tutorial
- Trident State