In a product requirement design, there is such a scenario that for a work order, the specialist can choose to suspend the work order (suspend 2 hours at a time; Suspend twice for 12 hours), or transfer (form input transfer days), and then submit work order
After the set time, the work order needs to be received again, and then reviewed again. For work orders, there is a concept of priority, which means that the higher the priority, the work order needs to be sent first. For this kind of scene, we can solve the scene problem based on Redis.
I. Business Overview
Let’s assume we have two queues, one to maintain regular work orders and the other to maintain pending work orders. For the suspended operation, we set the effective time of the key through Redis. When the key fails, the client listens for the failure event and obtains the work order to remove the suspended work order queue and join the formal queue.
Business flow chart
Two, code implementation
The whole business implementation can be divided into three big modules.
- Queue module, pass
WorkOrderQueueTransfer
Out of band provides internal calls. - Work order module, pass
OperateStrategyManager
The manager implements the operation of work order related scenarios. - Event listening, through inheritance
KeyExpirationEventMessageListener
, the implementation of Key expiration monitoring (special note: Redis serverredis.conf
Key expiration notification needs to be enabled.
2.1. Work order queue implementation
We can store data based on Redis ZSet, which is an ordered set and can be sorted based on score.
2.1.1 defining context classes (WorkOrderContext
)
/ * * *@description: workorder context object *@Date: 2020/7/13 4:28 PM *@Author: Shi Dongdong -Seig Heil */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WorkOrderContext {
/** * Only for testers */
private boolean isTest;
/** ** work order number */
private WorkOrder worOrder;
/** * Queue type */
private QueueType queueType;
/** * create - formal queue (need to be sent immediately *@return* /
public static WorkOrderContext buildImmediate(a) {
return WorkOrderContext.builder().queueType(QueueType.immediate).build();
}
/** * create - suspend queue (suspend n hours execution) *@return* /
public static WorkOrderContext buildSuspended(a) {
return WorkOrderContext.builder().queueType(QueueType.suspended).build();
}
/** * dump queue (run after n days) *@return* /
public static WorkOrderContext buildStored(a) {
return WorkOrderContext.builder().queueType(QueueType.stored).build();
}
/** * create - formal queue (need to be sent immediately) **@param workCode
* @param priority
* @return* /
public static WorkOrderContext buildImmediate(String workCode, double priority) {
WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(0).build();
return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.immediate).build();
}
/** * create - suspend queue (suspend n hours execution) **@param workCode
* @param priority
* @param delayedTime
* @return* /
public static WorkOrderContext buildSuspended(String workCode, double priority, long delayedTime) {
WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build();
return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.suspended).build();
}
/** * Dump queue (run after n days of dump) **@param workCode
* @param priority
* @param delayedTime
* @return* /
public static WorkOrderContext buildStored(String workCode, double priority, long delayedTime) {
WorkOrder workOrder = WorkOrder.builder().workCode(workCode).priority(priority).delayedTime(delayedTime).build();
return WorkOrderContext.builder().worOrder(workOrder).queueType(QueueType.stored).build();
}
/** * Queue type */
public enum QueueType {
/** * Official queue (need to be sent immediately) */
immediate,
/** * Suspend queue (suspend n hours execution) */
suspended,
/** * Dump queue (run after n dump days) */
stored
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public static class WorkOrder {
/** ** work order number */
private String workCode;
/** * Priority */
private double priority;
/** * Delay time */
private longdelayedTime; }}Copy the code
2.1.2. Define the Abstract cache class (AbstractCacheManager
)
The abstract class defines a method, and the abstraction defines a method with a retry mechanism. The injected BeanRedisService is our subclass’s wrapper based on the Redis API.
/ * * *@descriptionAbstract cache manager *@Date: 2020/7/18 9:41 PM *@Author: Shi Dongdong -Seig Heil */
@Slf4j
public abstract class AbstractCacheManager {
final int MAX_RETRIES = 3;
@Autowired
RedisService redisService;
/** * Retry the operation *@paramRetries Number of retries *@paramContext *@paramCall Retry action */
public <T> void retry(int retries, T context, Function<Integer,Boolean> call){
boolean done = false;
int retry = 1;
do {
try {
done = call.apply(retry);
log.info("[retry] context={},retry={},done={}", JSONObject.toJSON(context),retry,done);
retry ++;
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
log.error("[retry] abnormal CTX = {}", JSONObject.toJSON(context),e); retry ++; }}while (retry <= retries && !done);
}
}
Copy the code
2.1.3. Define Manager class based on Redis Cache (WorkOrderCacheManager
)
The main function of this class is to implement storage with Key failure mechanism based on Redis String object storage.
- Internal static class
CacheValue
, as the Value stored by the Redis String object. - Inner enumeration class
CacheType
Maintains the service prefix of the cache Key. - In particular, we form a naming convention for the Redis String store Key, for example:
CarCarthage: stored_cache_ single number
.
/ * * *@description: work order cache manager *@Date: 2020/7/14 4:28 PM *@Author: Shi Dongdong -Seig Heil */
@Component
@Slf4j
public class WorkOrderCacheManager extends AbstractCacheManager{
/** * Set the cache and set the cache expiration date *@param cache
*/
public void setCacheInExpire(CacheValue cache){
retry(MAX_RETRIES,cache,idx -> {
String redisKey = redisService.getKey(getRedisKeySuffix(cache.getType(),cache.getWorkCode()));
redisService.set(redisKey, JSONObject.toJSONString(cache),cache.getExpireSeconds());
log.info("[setCacheInExpire],redisKey={},CacheValue={}",redisKey,JSONObject.toJSONString(cache));
return Boolean.TRUE;
});
}
/** * Query the cache value of a work order number *@paramCacheType cacheType {@link CacheType}
* @paramWorkCode Work order number *@return* /
public CacheValue get(CacheType cacheType,String workCode){
String redisKey = redisService.getKey(getRedisKeySuffix(cacheType,workCode));
String value = redisService.get(redisKey,String.class);
return JSONObject.parseObject(value,CacheValue.class);
}
/** * Get the queue redis key * from the context queue type@paramCacheType cacheType {@link CacheType}
* @paramWorkCode Work order number *@return* /
String getRedisKeySuffix(CacheType cacheType,String workCode){
switch (cacheType){
case stored_cache:
return CacheType.stored_cache.getKey() + workCode;
case suspended_cache:
return CacheType.suspended_cache.getKey() + workCode;
default:
break;
}
return null;
}
/** * Cache value */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public static class CacheValue{
/** * Cache type */
private CacheType type;
/** ** work order number */
private String workCode;
/** * Priority */
private double priority;
/** * Delay expiration time (timestamp) */
private long delayedTime;
/** * Cache expiration time (unit: second) */
private long expireSeconds;
/** * create - suspend queue (suspend n hours execution) **@param workCode
* @param priority
* @param delayedTime
* @param expireSeconds
* @return* /
public static CacheValue buildSuspended(String workCode, double priority, long delayedTime,long expireSeconds) {
return CacheValue.builder()
.type(CacheType.suspended_cache)
.workCode(workCode)
.priority(priority)
.delayedTime(delayedTime)
.expireSeconds(expireSeconds)
.build();
}
/** * Dump queue (run after n days of dump) **@param workCode
* @param priority
* @param delayedTime
* @param expireSeconds
* @return* /
public static CacheValue buildStored(String workCode, double priority, long delayedTime,long expireSeconds) {
returnCacheValue.builder() .type(CacheType.stored_cache) .workCode(workCode) .priority(priority) .delayedTime(delayedTime) .expireSeconds(expireSeconds) .build(); }}/ * * * hang | archived cache key * /
@Getter
public enum CacheType{
stored_cache("stored_cache_"),
suspended_cache("suspended_cache_"),; CacheType(String key) {this.key = key;
}
privateString key; }}Copy the code
2.1.4 Work order queue Manager (WorkOrderQueueManager
)
This class is based on the ordered collection of Redis ZSet object structure, and can realize queue removal according to priority.
Related methods:
String getRedisKey(WorkOrderContext context)
From:WorkOrderContext
Returns the Key that the collection is to store.Long queueSize(WorkOrderContext context)
: Returns the queue sizeBoolean leftPush(WorkOrderContext context)
: Perform processing (queue operation)Boolean leftPushIfAbsent(WorkOrderContext context)
: Performs processing (join operation), joins if the join element is absent, returns true; Otherwise return false.Long remove(WorkOrderContext context)
: Removes an element from the queueWorkOrderContext.WorkOrder pop(WorkOrderContext context)
: Takes the member with the lowest score from the set and leaves the teamSet<WorkOrderContext.WorkOrder> rank(WorkOrderContext context)
: Displays all members in a queue in ascending orderLong removeRange(String key, long start, long end)
: Removes queue elements by scopeLong removeValues(String key, List<Object> values)
: Removes the specified elementlong getDelayedTime(WorkOrderContext.QueueType queueType,String workCode)
: Obtain the delay time of the corresponding work order (applicable to suspend and transfer)
/ * * *@description: workorder queue manager *@Date: 2020/7/14 4:28 PM *@Author: Shi Dongdong -Seig Heil */
@Component
@Slf4j
public class WorkOrderQueueManager extends AbstractCacheManager{
final String LOCK_KEY = "ZSET_ATOMIC_LOCK";
@Autowired
ZSetOperations<String, Object> zSetOperations;
@Autowired
WorkOrderCacheManager workOrderCacheManager;
/** * Get the queue redis key * from the context queue type@param context
* @return* /
String getRedisKey(WorkOrderContext context){
String keySuffix = null;
switch (context.getQueueType()){
case immediate:
keySuffix = CarthageConst.WorkOrderKey.IMMEDIATE_QUEUE_DEFAULT;
break;
case stored:
keySuffix = CarthageConst.WorkOrderKey.STORED_QUEUE_DEFAULT;
break;
case suspended:
keySuffix = CarthageConst.WorkOrderKey.SUSPENDED_QUEUE_DEFAULT;
break;
default:
break;
}
if(null! = keySuffix){if(context.isTest()){
keySuffix += CarthageConst.TEST_SUFFIX;
}
return redisService.getKey(keySuffix);
}
return null;
}
/** * returns the queue size *@param context
* @return* /
public Long queueSize(WorkOrderContext context) {
return zSetOperations.size(getRedisKey(context));
}
/** * Perform processing (queue operation) *@param context
* @return* /
public Boolean leftPush(WorkOrderContext context) {
String redisKey = getRedisKey(context);
String workCode = context.getWorOrder().getWorkCode();
double priority = context.getWorOrder().getPriority();
Boolean action = zSetOperations.add(redisKey,workCode,priority);
if(Objects.equals(Boolean.FALSE,action)){
Long value = zSetOperations.rank(redisKey,workCode);
log.info("[Queue.leftPush],hasLeftPushed,action={},value={}, context={}", action,value,JSONObject.toJSON(context));
if(Objects.nonNull(value)){
return Boolean.TRUE;
}
}
log.info("[Queue.leftPush] context={}", JSONObject.toJSON(context));
retry(MAX_RETRIES,context,idx -> action);
return Optional.ofNullable(action).orElse(Boolean.FALSE);
}
/** * Perform processing (join operation) * join if the join element is absent, returns true; Otherwise return false. *@param context
* @return* /
public Boolean leftPushIfAbsent(WorkOrderContext context) {
String redisKey = getRedisKey(context);
String workCode = context.getWorOrder().getWorkCode();
double priority = context.getWorOrder().getPriority();
Boolean action = zSetOperations.add(redisKey,workCode,priority);
log.info("[WorkOrderQueue.leftPushIfAbsent,action={},context={}",action, JSONObject.toJSON(context));
return Optional.ofNullable(action).orElse(Boolean.FALSE);
}
/** * Removes an element from the queue *@param context
* @return* /
public Long remove(WorkOrderContext context){
String redisKey = getRedisKey(context);
String workCode = context.getWorOrder().getWorkCode();
log.info("[WorkOrderQueue.remove] context={}", JSONObject.toJSON(context));
Long rem = zSetOperations.remove(redisKey,workCode);
Long action = Optional.ofNullable(rem).orElse(0L);
retry(MAX_RETRIES,context,idx -> action.longValue() > 0);
return action;
}
/** * Gets the member with the lowest score from the set *@param context
* @return* /
public WorkOrderContext.WorkOrder pop(WorkOrderContext context) {
WorkOrderContext.WorkOrder workOrder = null;
try {
String redisKey = getRedisKey(context);
// Implement zpopmin command operation of zset through distributed lock
boolean locked = redisService.lock(LOCK_KEY,5000);
if(locked){
//1
Set<ZSetOperations.TypedTuple<Object>> set = redisService.zSetOperations().rangeWithScores(redisKey,0.0);
if(set.isEmpty()){
return null;
}
//2. Remove the minimum rating element
Long value = redisService.zSetOperations().removeRange(redisKey,0.0);
retry(MAX_RETRIES,context,idx -> value.longValue() > 0);
//3. Return the team member
workOrder = WorkOrderContext.WorkOrder.builder().build();
for(ZSetOperations.TypedTuple<Object> each : set){
workOrder.setWorkCode(each.getValue().toString());
workOrder.setPriority(each.getScore());
workOrder.setDelayedTime(0);
break; }}}catch (Exception e) {
log.error("[WorkOrderQueue.pop] exception ctx={}", JSONObject.toJSON(context));
}finally {
redisService.unlock(LOCK_KEY);
}
return workOrder;
}
/** * View all members of the queue * in ascending order@param context
* @return* /
public Set<WorkOrderContext.WorkOrder> rank(WorkOrderContext context) {
Set<ZSetOperations.TypedTuple<Object>> set = redisService.zSetOperations().rangeWithScores(getRedisKey(context),0, -1);
Set<WorkOrderContext.WorkOrder> members = Sets.newLinkedHashSetWithExpectedSize(set.size());
set.forEach(each -> {
WorkOrderContext.WorkOrder every = WorkOrderContext.WorkOrder.builder()
.workCode(each.getValue().toString())
.priority(each.getScore())
.delayedTime(getDelayedTime(context.getQueueType(),each.getValue().toString()))
.build();
members.add(every);
});
return members;
}
/** * removes queue elements by range *@param key
* @param start
* @param end
* @return* /
public Long removeRange(String key, long start, long end){
String redisKey = redisService.getKey(key);
Long count = zSetOperations.removeRange(redisKey,start,end);
log.info("[WorkOrderQueue.removeRange] redisKey={},start={},end={},count={}", redisKey,start,end,count);
return count;
}
/** * removes the specified element *@param key
* @param values
* @return* /
public Long removeValues(String key, List<Object> values){
String redisKey = redisService.getKey(key);
LongAdder longAdder = new LongAdder();
values.forEach(each -> {
Long count = zSetOperations.remove(redisKey,each);
longAdder.add(count);
});
Long count = longAdder.longValue();
log.info("[WorkOrderQueue.removeValues] redisKey={},values={},count={}", redisKey,JSONObject.toJSONString(values),count);
return count;
}
/** * Get the delay time of the corresponding work order (applicable to suspend and dump) *@param queueType
* @param workCode
* @return* /
long getDelayedTime(WorkOrderContext.QueueType queueType,String workCode){
long delayedTime = 0;
WorkOrderCacheManager.CacheType cacheType = null;
switch (queueType){
case suspended:
cacheType = WorkOrderCacheManager.CacheType.suspended_cache;
break;
case stored:
cacheType = WorkOrderCacheManager.CacheType.stored_cache;
break;
default:
break;
}
if(null! = cacheType){ WorkOrderCacheManager.CacheValue cacheValue = workOrderCacheManager.get(cacheType,workCode);if(null != cacheValue){
delayedTime = cacheValue.getDelayedTime();
}
}
returndelayedTime; }}Copy the code
2.2 Work order queue transfer manager
2.2.1 Work order queue Transfer Manager (WorkOrderQueueTransfer
)
This class implements atomic operations (via Redis distributed locks) for delayed queue dequeueing and formal queue enqueueing.
/ * * *@description: Work order queue transfer manager *@Date: 2020/7/23 6:15 PM *@Author: Shi Dongdong -Seig Heil */
@Component
@Slf4j
public class WorkOrderQueueTransfer extends AbstractCacheManager{
final static String ATOMIC_KEY = "delayed_queue_key_expire_lock_{0}";
final static long ATOMIC_KEY_EXPIRE = 5000;
@Autowired
RedisService redisService;
@Autowired
WorkOrderQueueManager workOrderQueueManager;
@Autowired
WorkOrderCacheManager workOrderCacheManager;
/ * * * from [hang | temporary] queue to formal * in the queue@paramCacheType hang | temporary *@param delayedContext
* @return* /
public Boolean transferImmediateQueue(WorkOrderCacheManager.CacheType cacheType,WorkOrderContext delayedContext){
boolean tryLock = false;
Boolean done = Boolean.FALSE;
String lockKey = null;
try {
WorkOrderContext.WorkOrder workOrder = delayedContext.getWorOrder();
lockKey = redisService.getKey(MessageFormat.format(ATOMIC_KEY,workOrder.getWorkCode()));
tryLock = redisService.lock(lockKey,ATOMIC_KEY_EXPIRE);
if(tryLock){
// create a formal queue
WorkOrderContext immediateContext = WorkOrderContext.buildImmediate(workOrder.getWorkCode(),workOrder.getPriority());
done = workOrderQueueManager.leftPushIfAbsent(immediateContext);
//2. Remove this element from the current delay queue
Long count = workOrderQueueManager.remove(delayedContext);
log.info("[hang | archived team remove], count = {}, delayedContext = {}", count,JSONObject.toJSONString(delayedContext)); }}catch (Exception e) {
log.error("[transferImmediateQueue] is unusual, delayedContext = {}, cacheType = {}", JSONObject.toJSONString(delayedContext),cacheType);
}finally {
if(Objects.nonNull(lockKey) && tryLock){ redisService.unlock(lockKey); }}returnOptional.ofNullable(done).orElse(Boolean.FALSE); }}Copy the code
2.3 Redis expiration key listening
2.3.1 Redis expiration callback Listening (RedisKeyExpirationListener
)
/ * * *@description: Redis expiration callback listening *@Date: 2020/7/18 10:43 am *@Author: Shi Dongdong -Seig Heil */
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
final static String STORED_CACHE_KEY_PREFIX = WorkOrderCacheManager.CacheType.stored_cache.getKey();
final static String SUSPENDED_CACHE_KEY_PREFIX = WorkOrderCacheManager.CacheType.suspended_cache.getKey();
@Autowired
TraceLogService traceLogService;
@Autowired
RedisService redisService;
@Autowired
WorkOrderService workOrderService;
@Autowired
DelayedScheduledOperateBridge delayedScheduledOperateBridge;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
Date startTime = TimeTools.createNowTime();
String expiredKey = message.toString();
String bizPrefix = redisService.getKeyPrefix().getName();
if(! expiredKey.startsWith(bizPrefix)){return;
}
String caseOfStored = redisService.getKey(STORED_CACHE_KEY_PREFIX);
String caseOfSuspended = redisService.getKey(SUSPENDED_CACHE_KEY_PREFIX);
WorkOrderCacheManager.CacheType cacheType;
WorkOrderContext.QueueType queueType;
if(expiredKey.startsWith(caseOfStored)){
queueType = WorkOrderContext.QueueType.stored;
cacheType = WorkOrderCacheManager.CacheType.stored_cache;
}else if(expiredKey.startsWith(caseOfSuspended)){
queueType = WorkOrderContext.QueueType.suspended;
cacheType = WorkOrderCacheManager.CacheType.suspended_cache;
}else{
return;
}
String workCode = getWorkCode(expiredKey);
log.info(Redis key=[{}] expired,expiredKey);
if(Objects.nonNull(workCode)){
log.info("Listening to redis key = ({}), hang | archived order start processing, workCode = {}",expiredKey,workCode);
WorkOrder workOrder = workOrderService.queryOne(workCode);
if(Objects.isNull(workOrder)){
log.info("Listening to redis key = ({}), hang | archived order start processing, did not find the repair order, workCode = {}",expiredKey,workCode);
return;
}
WorkOrderContext delayedContext = WorkOrderContext.builder()
.worOrder(WorkOrderContext.WorkOrder.builder().delayedTime(5).priority(workOrder.getCasePriority()).workCode(workOrder.getWorkCode()).build())
.queueType(queueType). build();
Boolean done = delayedScheduledOperateBridge.transferImmediateQueue(cacheType,delayedContext);
saveTraceLog(delayedContext,done,traceLog -> {
JSONObject requestBody = new JSONObject();
requestBody.put("expiredKey",expiredKey);
requestBody.put("workCode",workCode); traceLog.setRequestBody(requestBody.toJSONString()); traceLog.setRequestTime(startTime); }); }}/** * traceLog database *@param context
* @param done
* @param consumer
*/
void saveTraceLog(WorkOrderContext context, Boolean done, Consumer<TraceLog> consumer){
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
JSONObject responseBody = new JSONObject();
responseBody.put("workOrderContext",context);
responseBody.put("transferImmediateQueue",done);
TraceLog traceLog = TraceLog.builder()
.appCode(context.getWorOrder().getWorkCode())
.url("["+hostAddress+"]redisKeyExpirationListener.onMessage")
.target(this.getClass().getPackage().getName() + "." + this.getClass().getSimpleName())
.responseBody(responseBody.toJSONString())
.responseTime(TimeTools.createNowTime())
.traceType(TraceTypeEnum.REDIS_KEY_EXPIRE.getIndex())
.build();
consumer.accept(traceLog);
traceLogService.insertRecord(traceLog);
} catch (Exception e) {
log.error("saveTraceLog exception,[context={}]",JSONObject.toJSONString(context),e); }}/** * intercepts the specified work order number * from the string@param value
* @return* /
String getWorkCode(String value){
return value.substring(value.lastIndexOf("_") + 1); }}Copy the code
2.4 delay order established processing bridge
The main function of this class is to realize the transfer of queue elements through WorkOrderQueueTransfer, and realize the database table operation of work order through OperateStrategyManager.
/ * * *@description: delayed order established treatment processing bridge * scene description: hang | archived work order to the established time *@Date: 2020/7/23 "*@Author : Seig Heil
*/
@Slf4j
@Component
public class DelayedScheduledOperateBridge {
static final String LOCK_KEY = CarthageConst.KEY_EXPIRE_LISTENER_LOCK;
static final long EXPIRE_SECONDS = 120;
@Autowired
RedisService redisService;
@Autowired
WorkOrderQueueTransfer workOrderQueueTransfer;
@Autowired
OperateStrategyManager operateStrategyManager;
/** * Implement the transfer of business processing from delay queue to formal queue, and update the status of work order *@param cacheType
* @param delayedContext
* @return* /
public Boolean transferImmediateQueue(WorkOrderCacheManager.CacheType cacheType, WorkOrderContext delayedContext){
String workCode = delayedContext.getWorOrder().getWorkCode();
boolean tryLock = false;
String redisKey = null;
try {
redisKey = redisService.getKey(MessageFormat.format(LOCK_KEY,workCode));
tryLock = redisService.lock(redisKey,EXPIRE_SECONDS);
if(! tryLock){ log.info("[DelayedScheduledOperateBridge tryLock failed to get the lock = {}, redisKey = {}] hang | archived time processing power and so on, established workCode = {}",tryLock,redisKey,workCode);
}
if(tryLock){
log.info("[DelayedScheduledOperateBridge tryLock = {} acquiring a lock is successful, redisKey = {}] hang | archived time processing power and so on, established workCode = {}",tryLock,redisKey,workCode);
Boolean done = workOrderQueueTransfer.transferImmediateQueue(cacheType,delayedContext);
if(! done.booleanValue()){return Boolean.FALSE;
}
OperateContext operateContext = OperateContext.builder()
.operateStrategyEnum(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER)
.operateParam( OperateContext.OperateParam.builder().workCode(workCode).build()
).build();
operateStrategyManager.execute(operateContext);
log.info("[DelayedScheduledOperateBridge.transferImmediateQueue],delayedContext={},callResult={}",
JSONObject.toJSONString(delayedContext),JSONObject.toJSONString(operateContext.getExecuteResult()));
returnoperateContext.getExecuteResult().isSuccess(); }}catch (Exception e) {
log.error("[DelayedScheduledOperateBridge] hang | archived established time handle exceptions, workCode = {}, delayedContext = {}",workCode,JSONObject.toJSONString(delayedContext));
}finally {
if(tryLock){ redisService.unlock(redisKey); }}return false; }}Copy the code
2.5. Work order Operation Manager
The main function of this class is to expose the management of the policy class for work order operations. The external management does not need to pay attention to the existence of the policy class. The policy class instance is created by this class.
- through
OPERATE_STRATEGY_MAP
Maintains the mapping of enumeration and policy class beans. - through
init()
implementationOPERATE_STRATEGY_MAP
Initialization of the container. - through
Result<String> execute(OperateContext context)
Implements the operation of providing a policy class externally.
/ * * *@description: GPS work order operation strategy management *@Date: 2020/7/15 5:43 PM *@Author: Shi Dongdong -Seig Heil */
@Component
@Slf4j
public class OperateStrategyManager {
static final Map<OperateContext.OperateStrategyEnum, AbstractOperateStrategy> OPERATE_STRATEGY_MAP = Maps.newHashMapWithExpectedSize(6);
@Autowired
CreateOperateStrategy createOperateStrategy;
@Autowired
AllotOrderOperateStrategy allotOrderOperateStrategy;
@Autowired
SubmitWithFinishOperateStrategy submitWithFinishOperateStrategy;
@Autowired
SubmitWithStoreOperateStrategy submitWithStoreOperateStrategy;
@Autowired
SubmitWithSuspendOperateStrategy submitWithSuspendOperateStrategy;
@Autowired
DelayedScheduledOperateStrategy delayedScheduledOperateStrategy;
@Autowired
AssignOrderOperateStrategy assignOrderOperateStrategy;
@PostConstruct
private void init(a) {
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.INIT_WORK_ORDER, createOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ALLOT_WORK_ORDER, allotOrderOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.STORE_WORK_ORDER, submitWithStoreOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.SUSPEND_WORK_ORDER, submitWithSuspendOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.FINISH_WORK_ORDER, submitWithFinishOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.DELAYED_SCHEDULED_ORDER, delayedScheduledOperateStrategy);
OPERATE_STRATEGY_MAP.put(OperateContext.OperateStrategyEnum.ASSIGN_ORDER, assignOrderOperateStrategy);
}
/** * provides calls to policy classes *@param context
* @return* /
public Result<String> execute(OperateContext context) {
StopWatch stopWatch = new StopWatch();
stopWatch.start("OperateStrategyManager.execute");
AbstractOperateStrategy operateStrategy = OPERATE_STRATEGY_MAP.get(context.getOperateStrategyEnum());
context.buildExecuteResultWithSuccess();
operateStrategy.execute(context);
Result<Boolean> executeResult = context.getExecuteResult();
if(context.getExecuteResult().isSuccess()) {
return Result.suc(executeResult.getMsg());
}
stopWatch.stop();
long spendMillSeconds = stopWatch.getLastTaskTimeMillis();
long duration = (System.currentTimeMillis() - spendMillSeconds) / 1000;
String executeResultMsg = executeResult.getMsg();
log.info("[execute] done,duration={},executeResultMsg={}",duration,executeResultMsg);
returnResult.fail(RemoteEnum.FAILURE, executeResultMsg); }}Copy the code
2.6 Implementation of work order policy class
Because the work order involves creation, suspension, transfer, processing completion and other operations, we can achieve this kind of scenario through the policy class.
2.6.1 Suspend operation
/ * * *@description: Submit survey results (pending action) - Policy class *@Date: 2020/7/15 5:32 PM *@Author: Shi Dongdong -Seig Heil */
@Slf4j
@Component
public class SubmitWithSuspendOperateStrategy extends AbstractSubmitOperateStrategy{
static final Map<MoveToEnum,AttentionEventEnum> suspend_to_attention_event_map = new HashMap<>();
static final Map<MoveToEnum,WorkOrderStatusEnum.SubStatusEnum> suspend_to_sub_status_map = new HashMap<>();
static final Map<MoveToEnum,Integer> suspend_count_map = new HashMap<>();
static {
suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_ONCE,AttentionEventEnum.SUSPENDED_AT_ONCE);
suspend_to_attention_event_map.put(MoveToEnum.SUSPENDED_AT_TWICE,AttentionEventEnum.SUSPENDED_AT_TWICE);
suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_ONCE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_ONCE);
suspend_to_sub_status_map.put(MoveToEnum.SUSPENDED_AT_TWICE,WorkOrderStatusEnum.SubStatusEnum.SUSPENDED_AT_TWICE);
suspend_count_map.put(MoveToEnum.SUSPENDED_AT_ONCE,1);
suspend_count_map.put(MoveToEnum.SUSPENDED_AT_TWICE,2);
log.info("init... suspend_to_attention_event_map={}",suspend_to_attention_event_map.toString());
log.info("init... suspend_to_sub_status_map={}",suspend_to_sub_status_map.toString());
log.info("init... suspend_count_map={}",suspend_count_map.toString());
}
@Autowired
DiamondConfigProxy diamondConfigProxy;
@Override
public void prepare(OperateContext context) {
super.prepare(context);
SurveyResult surveyResult = context.getSurveyResult();
MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo());
AttentionEvent attentionEvent = suspend_to_attention_event_map.getOrDefault(moveToEnum,null);
ATTENTION_EVENT_CONTEXT.set(attentionEvent);
context.setAttentionEvent(attentionEvent);
}
@Override
WorkOrder buildWorkOrder(OperateContext context){
SurveyResult surveyResult = context.getSurveyResult();
MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo());
WorkOrder workOrder = super.buildWorkOrder(context);
workOrder.setSuspendedCount(suspend_count_map.getOrDefault(moveToEnum,0).intValue());
workOrder.setMainStatus(WorkOrderStatusEnum.WAITING.getIndex());
workOrder.setSubStatus(suspend_to_sub_status_map.get(moveToEnum).getIndex());
workOrder.setIsFinished(Const.NON_INDEX);
workOrder.setIsStore(Const.NON_INDEX);
workOrder.setDelayedTime(context.getOperateParam().getDelayedTime());
return workOrder;
}
@Override
void operationExtend(OperateContext context) {
long delayedTime = context.getOperateParam().getDelayedTime().getTime();
intdelayedSeconds = context.getOperateParam().getDelayedSeconds(); WorkOrder workOrder = context.getWorkOrder(); WorkOrderContext cxt = WorkOrderContext.buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime); workOrderQueueManager.leftPush(cxt); WorkOrderCacheManager.CacheValue cacheValue = WorkOrderCacheManager.CacheValue. buildSuspended(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds); workOrderCacheManager.setCacheInExpire(cacheValue);super.operationExtend(context);
}
@Override
public void setDelayedTime(OperateContext context) {
SurveyResult surveyResult = context.getSurveyResult();
MoveToEnum moveToEnum = MoveToEnum.getByIndex(surveyResult.getMoveTo());
DiamondConfig.SuspendOrderConfig suspendOrderConfig = diamondConfigProxy.suspendOrderConfig();
Date delayedTime = TimeTools.createNowTime();
int timeUnit = Calendar.HOUR_OF_DAY;
int delayedSeconds = 0;
int value = suspendOrderConfig.getConfig().getOrDefault(moveToEnum.name(),0);
switch (suspendOrderConfig.getTimeUnit()){
case "DAY":
timeUnit = Calendar.DAY_OF_YEAR;
delayedSeconds = value * 24 * 3600;
break;
case "HOUR":
timeUnit = Calendar.HOUR_OF_DAY;
delayedSeconds = value * 3600;
break;
case "MINUTE":
timeUnit = Calendar.MINUTE;
delayedSeconds = value * 60;
break;
case "SECOND":
timeUnit = Calendar.SECOND;
delayedSeconds = value;
break;
default:
break; } TimeTools.addTimeField(delayedTime, timeUnit,value); context.getOperateParam().setDelayedTime(delayedTime); context.getOperateParam().setDelayedSeconds(delayedSeconds); }}Copy the code
2.6.2 Transfer operation
/ * * *@description: Submission of survey results (dump operation) - Strategy *@Date: 2020/7/15 5:32 PM *@Author: Shi Dongdong -Seig Heil */
@Slf4j
@Component
public class SubmitWithStoreOperateStrategy extends AbstractSubmitOperateStrategy{
/** ** Conversion number of seconds */
static final int DAY_TO_SECONDS = 24 * 60 * 60;
@Override
public void prepare(OperateContext context) {
ATTENTION_EVENT_CONTEXT.set(AttentionEventEnum.STORE_ORDER);
context.setAttentionEvent(AttentionEventEnum.STORE_ORDER);
super.prepare(context);
}
@Override
public boolean paramCheck(OperateContext context) {
if(Objects.isNull(context.getSurveyResult().getDelayedDays())){
context.buildExecuteResultWithFailure("[SurveyResult. delayedDays] is empty!");
}
if(context.getSurveyResult().getDelayedDays() == 0){
context.buildExecuteResultWithFailure("DelayedDays must be greater than 0!");
}
return super.paramCheck(context);
}
@Override
WorkOrder buildWorkOrder(OperateContext context){
WorkOrder workOrder = super.buildWorkOrder(context);
workOrder.setMainStatus(WorkOrderStatusEnum.PENDING.getIndex());
workOrder.setSubStatus(WorkOrderStatusEnum.SubStatusEnum.STORED.getIndex());
workOrder.setIsFinished(Const.NON_INDEX);
workOrder.setIsStore(Const.YES_INDEX);
//setSuspendedCount needs to be reset to 0
workOrder.setSuspendedCount(0);
workOrder.setDelayedTime(context.getOperateParam().getDelayedTime());
return workOrder;
}
@Override
void operationExtend(OperateContext context) {
long delayedTime = context.getOperateParam().getDelayedTime().getTime();
int delayedSeconds = context.getOperateParam().getDelayedSeconds();
WorkOrder workOrder = context.getWorkOrder();
WorkOrderContext cxt = WorkOrderContext.buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime);
workOrderQueueManager.leftPush(cxt);
WorkOrderCacheManager.CacheValue cacheValue = WorkOrderCacheManager.CacheValue.
buildStored(workOrder.getWorkCode(),workOrder.getCasePriority(),delayedTime,delayedSeconds);
workOrderCacheManager.setCacheInExpire(cacheValue);
super.operationExtend(context);
}
@Override
public void setDelayedTime(OperateContext context) {
intdelayedDays = context.getSurveyResult().getDelayedDays(); Date delayedTime = TimeTools.createNowTime(); TimeTools.addTimeField(delayedTime, Calendar.DAY_OF_YEAR,delayedDays); context.getOperateParam().setDelayedTime(delayedTime); context.getOperateParam().setDelayedSeconds(delayedDays * DAY_TO_SECONDS); }}Copy the code
Iii. Introduction of interactive UI
- System Link Logs
We can realize the monitoring of key failure events, and put the work order into the library, which is convenient for the later troubleshooting, and no longer need to check the server logs.
- Queue monitor
Output queue element, visually see the current suspended, saved work order. We can visually see how long key has left to live.
- Parameter configuration
Suspend the configuration, although the business is implemented according to the day dimension, but my code still can support the day, hour, minute, second dimension, providing scalability. Specific can see SubmitWithStoreOperateStrategy class methods void setDelayedTime (OperateContext context).
Four,
After receiving the requirement, although it was only a small part of the requirement, the whole product required 33 pages, which was rich in content. For the delay queue operation, I also considered other methods when investigating the technical solution, such as Java with delay feature
DelayedQueue (which is not suitable for distributed multi-instance scenarios), and RabbitMQ (which feels complicated), and Redis (which makes use of data features such as ZSet,String,Expire).
The following is the qr code picture of my public account, welcome to pay attention to, or the public account search [no frost in autumn night].