sequence
This article focuses on ILeaseService for RocketMQ-Streams
ILeaseService
/** * Use db to implement lease and lock, which can be more lightweight, reduce the dependency of other middleware. Use active/standby scenario, only one instance runs, when the current instance fails, within a certain period of time, Public interface ILeaseService {/** * default lock time */ static final int DEFALUT_LOCK_TIME = 60 * 5; /** * Check whether a user has a lease. This method is pure memory operation, no performance overhead * * @return true, lease valid; False, lease void */ Boolean hasLease(String name); /** * When applying for a lease, a thread will be started to apply for the lease until the application is successful. After the successful application, each lease term /2 renewal. * * @param name Specifies the name of the lease. There are no special requirements. The same name is used to compete for the lease. /** * When applying for a lease, a thread will be started to apply for the lease until the application is successful. After the successful application, each lease term /2 renewal. * * @param name The name of the lease. There is no special requirement. The same name will compete for the lease. * @param callback */ void startLeaseTask(final String name, ILeaseGetCallback callback); /** * When applying for a lease, a thread will be started to apply for the lease until the application is successful. After the successful application, each lease term /2 renewal. * * @param Name Specifies the name of the lease. There are no special requirements. The same name will compete for the lease. * @Param leaseTermSecond Specifies the lease term. In seconds * @param callback When the lease is first fetched, */ void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback); /** * Request lock, whether successful or not, return immediately. If I don't release it, The maximum lock duration is 5 minutes * * @param name Service name * @param lockerName Lock name * @return Whether the lock succeeded */ Boolean lock(String name, String lockerName); /** * Request lock, whether successful or not, return immediately. The default lock duration is 5 minutes. * * @param name Service name * @param lockerName Lock name * @param lockTimeSecond Maximum lock duration if not released. Boolean lock(String name, String lockerName, int lockTimeSecond); /** * Apply for lock, if not, wait, wait time can be specified, if -1, wait indefinitely. * * @param lockerName Service name * @param lockerName Lock name * @param waitTime What is the maximum time to wait when the lock is not acquired? */ Boolean tryLocker(String name, String lockerName, long waitTime); /** * Apply for lock, if not, wait, wait time can be specified, if -1, wait indefinitely. If not released, the maximum lock time is lockTimeSecond * * @param name Business name * @param lockerName Lock name * @param waitTime The maximum time to wait when the lock is not acquired. If -1, wait indefinitely * @param lockTimeSecond Maximum time to lock if not released, */ Boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond); Unlock (String name, String lockerName); /** * unlock(String name, String lockerName); /** * If the lock has already been acquired, you can use this method to hold the lock forever. The difference with lease is that after the lock is released, no other instance preempts. * * @param lockerName Business name * @param lockerName lock name * @param lockTimeSecond lease. HoldLock (String name, String lockerName, int lockTimeSecond); / Boolean holdLock(String name, String lockerName, int lockTimeSecond); /** * Whether to hold the lock, will not apply for lock. Return true if it has been applied before and has not expired, Otherwise return false * * @param name Service name * @param lockerName Lock name * @return */ Boolean hasHoldLock(String name, String lockerName); List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix); }Copy the code
- ILeaseService interface defines hasLease, startLeaseTask, lock, tryLocker, unlock, holdLock, hasHoldLock, queryLockedInstanceByNamePrefix method
BasedLesaseImpl
public abstract class BasedLesaseImpl implements ILeaseService { private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class); private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_"; private static final AtomicBoolean syncStart = new AtomicBoolean(false); private static final int synTime = 120; Protected ScheduledExecutorService taskExecutor = null; // 5 minutes is too long for consistent hash synchronization. protected int leaseTerm = 300 * 2; // protected TRANSIENT JDBCDriver jdbcDataSource = null; protected ILeaseStorage leaseStorage; protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); / / each lease name corresponding to the lease expired time public BasedLesaseImpl () {taskExecutor = new ScheduledThreadPoolExecutor (10); } /** * lease_name: consistent_hash_ip, lease_user_ip: IP, refresh the lease_INFO table periodically, * * @param name * @return */ @Override public Boolean hasLease(String name) {// If there is no lease information in the memory, there is no lease Date leaseEndTime = leaseName2Date.get(name); If (leaseEndTime == null) {// log.info (" + name + "); return false; } // log. info(" query for lease name:" + name + ", current time: "+ new SimpleDateFormat(" YYYY-MM-DD HH: MM :ss"). Format(new Date()) // +" Lease Expiration Date "+ new SimpleDateFormat(" YYYY-MM-DD ") HH:mm:ss").format(leaseEndTime)); If (new Date().before(leaseEndTime)) {return true; } return false; } private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>(); @Override public void startLeaseTask(final String name) { startLeaseTask(name, this.leaseTerm, null); } @Override public void startLeaseTask(final String name, ILeaseGetCallback callback) { startLeaseTask(name, this.leaseTerm, callback); } @Override public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) { ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback); startLeaseTask(name, applyTask, leaseTerm / 2, true); } /** * Start timer, execute task regularly, Ensure that tasks are reentrant * * @param name * @param runnable Task * @param scheduleTime Scheduling time * @param startNow Whether to start immediately */ protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) { AtomicBoolean isStartLease = startLeaseMap.get(name); If (isStartLease == null) {synchronized (this) {isStartLease = startleasemap. get(name); if (isStartLease == null) { isStartLease = new AtomicBoolean(false); startLeaseMap.put(name, isStartLease); } } } if (isStartLease.compareAndSet(false, true)) { if (startNow) { runnable.run(); } taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS); }} / /... }Copy the code
- The BasedLesaseImpl declaration implements ILeaseService, which relies on ILeaseStorage, the startLeaseTask method that creates an ApplyTask and then schedules its execution at regular intervals
ApplyTask
/** * protected class ApplyTask implements Runnable {protected String name; protected int leaseTerm; protected ILeaseGetCallback callback; public ApplyTask(int leaseTerm, String name) { this(leaseTerm, name, null); } public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) { this.name = name; this.leaseTerm = leaseTerm; this.callback = callback; } @override public void run() {try {// log.info ("LeaseServiceImpl name: "+ name +"... ); AtomicBoolean newApplyLease = new AtomicBoolean(false); Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease); if (leaseDate ! = null) { leaseName2Date.put(name, leaseDate); LOG.info("LeaseServiceImpl, name: "+ name +" "+ getSelfUser() +" The lease is obtained successfully. The lease expiration time is "+ new SimpleDateFormat(" YYYY-MM-DD HH: MM :ss").format(leaseDate)); } else {// fix.2020.08.13 If the lease corresponding to name is still valid, or the host still has the lease, remove // leasename2date.remove (name); Log.info ("LeaseServiceImpl name: "+ name +" "+ getSelfUser() +" Failed to obtain lease "); } if (newApplyLease.get() && callback ! = null) { callback.callback(leaseDate); }} Catch (Exception e) {log. error(" LeaseServiceImpl name: "+ name +" "+ getSelfUser() +" error ", e); }}} /** * Apply for lease, if the current lease is valid, directly renew a lease period, if the current lease is invalid, first check whether there is a valid lease, if the application fails, */ protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {// Calculate next lease time = current lease time + Lease duration Date nextLeaseDate = dateutil. addSecond(new Date(), leaseTerm); If (hasLease(name)) {// log.info (" user already has a lease, update the lease information in database and memory "); // Update database LeaseInfo LeaseInfo = queryValidateLease(name); if (leaseInfo == null) { LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null"); return null; } // fix.2020.08.13, is the same as the local IP address and meets the consistent hash allocation policy. In other cases, null String leaseUserIp = leaseInfo.getleaseUserIP (); if (! leaseUserIp.equals(getSelfUser())) { return null; } leaseInfo.setLeaseEndDate(nextLeaseDate); updateLeaseInfo(leaseInfo); return nextLeaseDate; } // 2 Boolean success = canGetLease(name); if (! // log.info (" other machines have a valid lease "); // log.info (" other machines have a valid lease "); return null; } // 3 Run Boolean flag = tryGetLease(name, nextLeaseDate);} // 3 Run Boolean flag = tryGetLease(name, nextLeaseDate); If (flag) {// The lease is obtained successfully. NewApplyLease. Set (true); return nextLeaseDate; } return null; }Copy the code
- ApplyTask internally calls applyLeaseTask. If there is a lease, the ApplyTask updates the lease time. If there is no lease, the ApplyTask determines whether the lease can be obtained
tryGetLease
/** * Update database, * * @param time */ protected Boolean tryGetLease(String name, Date time) {// log.info (" try to obtain lease name is: "+ name +" " // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)); LeaseInfo validateLeaseInfo = queryValidateLease(name); If (validateLeaseInfo == null) {// There are two cases: 1 there is no lease information in the database. 2 There is lease information in the database but it has expired. Integer Count = countLeaseInfo(name); If (count = = null | | count = = 0) {/ / said now the database does not have any lease information, insert the lease is said to be successful, success failure said at this moment the other machine to obtain the lease / / LOG info (" database lease do not have the information, Try atomic insert lease :" + name); // fix.2020.08.13, after a consistent hash calculation, the task with this name should not be executed on the local machine. Only machines assigned hash execution permission can insert and get leases if (! getSelfUser().equals(getConsistentHashHost(name))) { return false; } validateLeaseInfo = new LeaseInfo(); validateLeaseInfo.setLeaseName(name); validateLeaseInfo.setLeaseUserIp(getSelfUser()); validateLeaseInfo.setLeaseEndDate(time); validateLeaseInfo.setStatus(1); validateLeaseInfo.setVersion(1); If (insert(validateLeaseInfo)) {log.info (+ name); return true; } else {log.info (" there is no lease information in database temporarily, atom insertion failed, lease has been obtained by another machine :" + name); return false; // log. info(" there is an invalid lease information in the database, try to update the lease information according to the version number atomically :" + name); LeaseInfo inValidateLeaseInfo = queryInValidateLease(name); If (inValidateLeaseInfo == null) {if (inValidateLeaseInfo == null) {if (inValidateLeaseInfo == null) {if (inValidateLeaseInfo == null) { return false; } // fix.2020.08.13, after the machine is restarted, the task with the name of the machine is not assigned to execute on the machine, directly returned, without updating the database if (! getSelfUser().equals(getConsistentHashHost(name))) { return false; } inValidateLeaseInfo.setLeaseName(name); inValidateLeaseInfo.setLeaseUserIp(getSelfUser()); inValidateLeaseInfo.setLeaseEndDate(time); inValidateLeaseInfo.setStatus(1); boolean success = updateDBLeaseInfo(inValidateLeaseInfo); If (success) {log.info ("LeaseServiceImpl atomic update succeeded :" + name); } else {log.info ("LeaseServiceImpl atomic update lease failed, lease was acquired by another machine :" + name); } return success; }} else {// Check whether you have obtained the lease by yourself. If you have obtained the lease by yourself, update the time (memory and database). 2020.08.13. The IP address of the lease is the same as that of the local IP address and satisfies the consistent hash policy. Will be executed locally String leaseUserIp = validateLeaseInfo. GetLeaseUserIp (); If (leaseUserIp. Equals (getSelfUser ())) {/ / if the current user has lease information, update the database validateLeaseInfo. SetLeaseEndDate (time); boolean hasUpdate = updateLeaseInfo(validateLeaseInfo); If (hasUpdate) {log.info ("LeaseServiceImpl ") {if (hasUpdate) {log.info ("LeaseServiceImpl ") The lease information for the name: "+ validateLeaseInfo getLeaseName () +" IP: "+ validateLeaseInfo. GetLeaseUserIp () +" due time: " + new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate())); return true; } else {log.info ("LeaseServiceImpl ", "LeaseServiceImpl "); return false; }} / / LOG. The info (" LeaseServiceImpl lease by other machines, the lease information for the name: "+ validateLeaseInfo. GetLeaseName () +" IP: "/ / + validateLeaseInfo. GetLeaseUserIp () +" due time: " // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate())); return false; } } protected LeaseInfo queryValidateLease(String name) { //String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()"; //// LOG.info("LeaseServiceImpl query validate lease sql:" + sql); //return queryLease(name, sql); return leaseStorage.queryValidateLease(name); }Copy the code
- TryGetLease queries the lease information using queryValidateLease. If no lease is available, tryGetLease will be inserted. If the lease expires, tryGetLease will be updated based on the version number
LeaseServiceImpl
public class LeaseServiceImpl extends BasedLesaseImpl { private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class); private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap(); protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>(); Public LeaseServiceImpl() {super(); } /** * try to acquire the lock, you can wait for waitTime, if the point does not return, return directly. If it is -1, wait * * @param name service name * @param lockerName lock name * @param waitTime waitTime, * @return */ @override public Boolean tryLocker(String name, String lockerName, long waitTime) { return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME); } @Override public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) { long now = System.currentTimeMillis(); boolean success = lock(name, lockerName, lockTimeSecond); while (! success) { if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) { break; } success = lock(name, lockerName, lockTimeSecond); if (success) { return success; } try { Thread.sleep(100); } catch (InterruptedException e) { LOG.error("LeaseServiceImpl try locker error", e); } } return success; } @Override public boolean lock(String name, String lockerName) { return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME); } @Override public boolean lock(String name, String lockerName, int leaseSecond) { lockerName = createLockName(name, lockerName); Future future = seizeLockingFuntures.get(lockerName); if (future ! = null && ((HoldLockFunture)future).isDone == false) { return false; } Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseSecond); // By default, it will be locked for 5 minutes and released immediately after use. If time is not synchronized, the lock may fail. Return tryGetLease(lockerName, nextLeaseDate); } @Override public boolean unlock(String name, String lockerName) { // LOG.info("LeaseServiceImpl unlock,name:" + name); lockerName = createLockName(name, lockerName); LeaseInfo validateLeaseInfo = queryValidateLease(lockerName); if (validateLeaseInfo == null) { LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName); } if (validateLeaseInfo ! = null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) { validateLeaseInfo.setStatus(0); updateDBLeaseInfo(validateLeaseInfo); } HoldLockTask holdLockTask = holdLockTasks.remove(lockerName); if (holdLockTask ! = null) { holdLockTask.close(); } leaseName2Date.remove(lockerName); return false; } /** * if there is a lock, the lock is always held, if cannot obtain, the end. Unlike the lease, the lease will try again if the other party hangs up. * * @param name * @param secondeName * @param lockTimeSecond lock time * @return */ @override public Boolean holdLock(String name, String secondeName, int lockTimeSecond) { if (hasHoldLock(name, secondeName)) { return true; } synchronized (this) { if (hasHoldLock(name, secondeName)) { return true; } String lockerName = createLockName(name, secondeName); Date nextLeaseDate = DateUtil.addSecond(new Date(), lockTimeSecond); boolean success = tryGetLease(lockerName, nextLeaseDate); // Apply lock, lock time is leaseTerm if (! success) { return false; } leaseName2Date.put(lockerName, nextLeaseDate); if (! holdLockTasks.containsKey(lockerName)) { HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this); holdLockTask.start(); holdLockTasks.putIfAbsent(lockerName, holdLockTask); } } return true; } /** * whether to hold the lock, do not access the database, * * @param name * @param secondeName * @return */ @override public Boolean hasHoldLock(String name, String name) String secondeName) { String lockerName = createLockName(name, secondeName); return hasLease(lockerName); } @Override public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) { String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix); return queryValidateLeaseByNamePrefix(leaseNamePrefix); } / /... }Copy the code
- LeaseServiceImpl extends BasedLesaseImpl. TryLocker executes a lock loop based on wait time, lock executes tryGetLease, unlock updates lease information and removes memory records. HoldLock Determines whether the lock is held by hasHoldLock. If so, the lock is returned. If not, tryGetLease is executed
ILeaseStorage
Public interface ILeaseStorage {/** * update lease info; @return */ Boolean updateLeaseInfo(leaseInfo leaseInfo); ** @param leaseName Specifies the name of the LeaseInfo object. The same name will compete for the lease * @return */ Integer countLeaseInfo(String leaseName); @return */ LeaseInfo queryInValidateLease(String leaseName); @return */ LeaseInfo queryValidateLease(String leaseName); / * * * * * based on the information of prefix queries valid lease @ param namePrefix * @ return * / List < LeaseInfo > queryValidateLeaseByNamePrefix (String namePrefix); Void addLeaseInfo(leaseInfo leaseInfo); void addLeaseInfo(leaseInfo leaseInfo); }Copy the code
- ILeaseStorage interface defines updateLeaseInfo, countLeaseInfo, queryInValidateLease, queryValidateLease, queryValidateLeaseByNamePrefix, AddLeaseInfo method
DBLeaseStorage
public class DBLeaseStorage implements ILeaseStorage {
private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;
public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}
@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = " WHERE id=#{id} and version=#{version}";
if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {
sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {
synchronized (this) {
leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {
System.out.println(count);
}
return success;
} catch (Exception e) {
LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error " + sql, e);
}
}
@Override
public Integer countLeaseInfo(String leaseName) {
String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1";
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
Long value = (Long) rows.get(0).get("c");
return value.intValue();
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}
@Override
public LeaseInfo queryInValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}
@Override
public LeaseInfo queryValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}
@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {
List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
for (Map<String, Object> row : rows) {
LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}
return leaseInfos;
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}
@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
" REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {
LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error " + sql, e);
}
}
protected JDBCDriver getOrCreateJDBCDataSource() {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
synchronized (this) {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}
protected LeaseInfo queryLease(String name, String sql) {
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
return convert(rows.get(0));
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}
protected LeaseInfo convert(Map<String, Object> map) {
LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {
leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {
leaseInfo.setVersion(version);
}
return leaseInfo;
}
@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
return (T) value;
}
private Long getMapLongValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof BigInteger) {
return ((BigInteger) value).longValue();
}
return null;
}
private Date getMapDateValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Date) {
return (Date) value;
}
if (value instanceof String) {
return DateUtil.parseTime(((String) value));
}
return null;
}
}
Copy the code
- DBLeaseStorage implements the ILeaseStorage interface and implements its methods using JDBC
summary
Rocketmq-streams’ LeaseService implements lease and lock based on DB and can be used to switch between active and standby scenarios.