Motivation for using locks:

  • Under the premise of ensuring thread safety, try to make all threads execute successfully
  • Only one thread is allowed to execute successfully under the premise of ensuring thread safety

The former applies to the second kill scenario. As a merchant, let each order take effect until the item is sold out, under the premise of ensuring thread safety. In this case, the distributed lock can be written as: retry repeatedly or block waiting. That is, a recursive or while true loop tries to get and blocks to wait.

The latter applies to scheduled tasks in distributed systems or multi-node projects. For example, the same code is deployed on two servers, A and B, but the database is the same. If no restriction is made, the two servers will pull the task list and execute the task repeatedly.

Consider distributed locking, which allows only one thread to fetch tasks from the database at crON-triggered times.

Why are distributed locks difficult to design?

Distributed processing is generally complex.

Redis distributed locks need to consider the lock expiration time.

  • Why set a lock expiration time
  • How long is the lock expiration time set appropriate

But in extreme cases (where the project is restarted or unexpectedly goes down while the task is in progress), the current task may hang without unlocking the Redis distributed lock (deadlock), and the next task will remain locked out of the method waiting. The lock marker in Redis has not been removed.

At this time need to install an automatic unlock door, that is, to set a lock expiration time, when the expiration time is up, the lock automatically invalid, but then there will be a new problem: how long is the lock expiration time set?

It is difficult to determine, as the timing of scheduled tasks may change as the project progresses.

If the expiration time of the lock is set too long, the execution time of the scheduled task is relatively short. If the lock is not released during the execution of the task, the longer the expiration time is set, the wider the impact will be, and other operations will be blocked.

If the design time is too short and the next task is executed before the last one is completed, it may lead to repeated execution

Message queues are designed to minimize task execution time. Keep it as short as possible (pull it and throw it to the queue and leave it unprocessed {this will cause the same task to be pulled, causing the task to repeat})

thinking

  • How to handle lock expiration time
  • How do I prevent deadlocks after a restart

`@Slf4j @Component @EnableScheduling public class ResumeCollectionTask implements ApplicationListener {

@Resource private RedisService redisService; @Resource private AsyncResumeParser asyncResumeParser; /** * Public static Final String MACHINE_ID = String.valueOf(ThreadLocalRandom.current().nextLong(10000000)); /** * Try to delete the previous lock (if any) after the project starts to prevent deadlocks * @param contextRefreshedEvent */ @override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { redisService.releaseLock(RedisKeyConst.RESUME_PULL_TASK_LOCK); } / * * * every minute of it Call this method at a time * / @ Scheduled (cron = "0 1 * * * * /?" Public void resumeSchedule(){// Try a lock and return true or false. The lock will expire in 10 minutes. Boolean lock = redisservice. tryLock(rediskeyconsent. RESUME_PULL_TASK_LOCK, MACHINE_ID, 10, TimeUnit.MINUTES); If (lock) {log.info(" node: {} lock succeeded, task started ",MACHINE_ID); try { collectResume(); }catch (Exception e){ e.printStackTrace(); Log.info (" node {}, task execution exception ",e); }finally {redisservice. unLock(rediskeyconconst.RESUME_PULL_TASK_LOCK,MACHINE_ID); Log.info (" node: {}, task completed, release lock ",MACHINE_ID); }}else {log.info(" node: {}, failed to get lock ",MACHINE_ID); }} /** * Task body * 1. Obtain the qualified HR mailbox * 2 from the database. Pull attached resume from HR email * 3. Call remote service to parse resume asynchronously * 4. 5. Add the id of the pending task to the Redis Message Queue. Let the Consumer process it asynchronize */ private void collectResume() throws InterruptedException { Retrieve task resume from database ",MACHINE_ID); List<ResumeCollectionDO> resumeCollectionDOList = new ArrayList<>(); ResumeCollectionDOList. Add (new ResumeCollectionDO (1 l, "zhang SAN's resume. PDF")); ResumeCollectionDOList. Add (new ResumeCollectionDO (2 l, "li si resume. HTML")); ResumeCollectionDOList. Add (new ResumeCollectionDO (3 l, "fifty and resume. Doc")); TimeUnit.SECONDS.sleep(2); Log.info (" Submit task to message queue: {}",resumeCollectionDOList.stream().map(ResumeCollectionDO::getName).collect(Collectors.joining(","))); ResumeCollectionDOList. ForEach (resumeCollectionDO - > {/ / analytical resume through a third party to obtain analytical id Long asyncParseId = asyncResumeParser.asyncParse(resumeCollectionDO); ResumeCollectionDTO ResumeCollectionDTO = new ResumeCollectionDTO(); BeanUtils.copyProperties(resumeCollectionDO,resumeCollectionDTO); resumeCollectionDTO.setAsyncPredictId(asyncParseId); redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE,resumeCollectionDTO); }); }Copy the code

} `

“@Component @Slf4j public class RedisMessageQueueConsumer implements ApplicationListener {

@Resource private RedisService redisService; @Resource private AsyncResumeParser asyncResumeParser; @Resource private ObjectMapper objectMapper; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { Log.info (" Start listening RedisMessageQueue...") ); / / create a new asynchronous tasks to monitor CompletableFuture runAsync (() - > {/ / large cycle constantly listen to the message in the message queue blocked type while (true) {/ / block the listening Once every 5 seconds to get Don't empty return ResumeCollectionDTO resumeCollectionDTO = (ResumeCollectionDTO) redisService .popQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE,5, TimeUnit.SECONDS); If (resumeCollectionDTO! = null){ int rePullCount = 0; int reTryCount = 0; The info (" removed from the queue resume: {} ", resumeCollectionDTO. The getName ()); The info (" -- -- -- -- -- -- -- -- -- -- begin to pull your resume: {} -- -- -- -- -- - ", resumeCollectionDTO. The getName ()); / / resume for asynchronous analytic results ID Long asyncPredictId = resumeCollectionDTO. GetAsyncPredictId (); While (true){try {PredictResult result = asyncResumeParser.getResult(asyncPredictId); // rePullCount++; / / resume after parsing the if (2 = = result. GetStatus ()) try {{/ / save the database log. The info (" parse resume success: {} ", resumeCollectionDTO. The getName ()); Log.info (" parsed Json: {}", result.getresultjson ()); ResumeCollectionDO resumeCollectionDO = objectMapper.readValue(result.getResultJson(), ResumeCollectionDO.class); Log.info (" resume: {} save to database ",resumeCollectionDO); rePullCount = 0; reTryCount = 0; break; }catch (Exception e){ discardTask(resumeCollectionDTO); e.printStackTrace(); Log.info (" Resume pull/parse failed... Discard the task "); rePullCount = 0; reTryCount = 0; break; }}else {// try {}catch (Exception e){int timeout = 1; If (rePullCount <= 3){// Wait 1s for the first three times and retry timeout = 1; TimeUnit.SECONDS.sleep(timeout); }else if (rePullCount <= 6){ timeout = 2; TimeUnit.SECONDS.sleep(timeout); }else if (rePullCount <= 9){ timeout = 2; TimeUnit.SECONDS.sleep(timeout); }else { discardTask(resumeCollectionDTO); Log.info (" resume pulled several times but not result, discard resume: {}"); rePullCount = 0; reTryCount = 0; break; } log.info(" resume: {} not finished, retry {}, pause after {} seconds..." ,resumeCollectionDTO.getName(),rePullCount,timeout); } } } catch (Exception e) { if (reTryCount > 3){ discardTask(resumeCollectionDTO); The info (" < < < < < < < < < < < < < < < < < < < resume: try again {} {} after time to give up, rePullCount: {}, retryCount: {} ", resumeCollectionDTO. GetName (), retryCount, rePullCount, reTryCount); rePullCount = 0; reTryCount = 0; break; } reTryCount++; The info (" resume: {} remote invocation is unusual, is preparing to first {} retries ", resumeCollectionDTO. The getName (), reTryCount, e); e.printStackTrace(); }}}}}); } private void discardTask(ResumeCollectionDTO ResumeCollectionDTO) {log.info("------ discardTask: {}------",resumeCollectionDTO.getName()); }Copy the code

} `

Only one scheduled task can go to the database to fetch the task, then the multi-node deployment will look like this (Redis is usually deployed independently, independent of the node code)

Summary: There are many problems with the above code. The result of asynchronous invocation should be divided into several steps instead of waiting in a loop: 1. Invoke the asynchronous interface and obtain the unique ID 2 of the asynchronous result. Save the result ID to the task table as a task. 3. Start the scheduled task and pull the final result based on the ID. (If there is no result, skip the current task and wait for the next scheduled task.)

For distributed scheduled tasks in formal projects, you can use xxl-job or elastice-job distributed locks. Redisson is recommended