Module dependencies and module encapsulation
preface
The last article has done a general analysis of the system and briefly introduced the functions of each module. This article will introduce some important functions of each module of the system in detail.
First, version selection
- The Springboot version is 2.1.4, so the corresponding G version (Greenwich.RELEASE) selected by Spring Cloud must be the version corresponding to Springboot version, otherwise the project cannot start normally. The detailed corresponding relationship can be found on your own.
- Nacos uses 1.4, which can be downloaded from the official website.
- RocketMq test environment is installed in Windows version 4.3.0, installation can be customized, click version remember to change the configuration file.
- Redis version 4.0.
- Mysql recommends 5.7 above.
- MongoDB 4.0 is recommended.
- Nginx uses 1.18 to support WebSocket.
Module dependency
1. Overall structure
The parent project pom
. <modules> <module>common-ws-starter</module> <module>pc-wx-spider</module> <module>java-wx-spider</module> <module>redis-ws-starter</module> <module>db-ws-starter</module> <module>sql-wx-spider</module> <module>rocketmq-ws-starter</module> <module>mobile-wx-spider</module> </modules> ......Copy the code
Screenshot display:
2. Dependency
Note Starter indicates the encapsulated starter, and spider indicates the Springboot project that runs. Familiarize yourself with the Springboot startup process before familiarize yourself with the starter.
- common-ws-starter
Do not rely on other modules
- db-ws-starter
Do not rely on other modules
- redis-ws-starter
Do not rely on other modules
- rocketmq-ws-starter
Do not rely on other modules
- sql-wx-spider
It relies on common-WS-starter, com.wx.spider
<! Spider </groupId> <artifactId>common-ws-starter</artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! Spider </groupId> <artifactId>db-ws-starter</artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency >Copy the code
- pc-wx-spider
It depends on common-WS-starter, Redis-WS-Starter, and RocketMQ-WS-starter
<! Spider </groupId> <artifactId>common-ws-starter</artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! > <dependency> <groupId>com.wx.spider</groupId> <artifactId> Redis-ws-starter </artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! > <dependency> <groupId>com.wx.spider</groupId> <artifactId> Rocketmq-ws-starter </artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency >Copy the code
- mobile-wx-spider
It depends on common-WS-starter, Redis-WS-Starter, and RocketMQ-WS-starter
<! Spider </groupId> <artifactId>common-ws-starter</artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! > <dependency> <groupId>com.wx.spider</groupId> <artifactId> Redis-ws-starter </artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! > <dependency> <groupId>com.wx.spider</groupId> <artifactId> Rocketmq-ws-starter </artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency >Copy the code
- java-wx-spider
It depends on common-WS-Starter and RocketMQ-WS-Starter
<! Spider </groupId> <artifactId>common-ws-starter</artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency > <! > <dependency> <groupId>com.wx.spider</groupId> <artifactId> Rocketmq-ws-starter </artifactId> < version > 1.0 - the SNAPSHOT < / version > < / dependency >Copy the code
Iii. Detailed explanation of functional modules
This section explains some key functions in the module.
1, the common – ws – the starter
The public module mainly puts some model, POJO, VO, util, etc., which can be used by multiple modules at the same time. It encapsulates some customized functions of the system in Util, such as parsing the JSON returned by captured packets, which is not difficult to understand. It is worth noting that the entity class corresponding to Mongo and mysql needs to annotate fields, so it needs to import response coordinates in POm. XML. When introducing other modules, it needs to exclude the default automatic assembly function of SpringBoot, otherwise it may fail to start after packaging itself.
2, the db – ws – the starter
Custom data source module, this module through custom annotation and AOP to achieve multi-data source dynamic switch, this module does not have any business logic, although Mybatis – Plus has its own dynamic data source function annotation, but the flexibility is not strong, so their own encapsulation of data for later added distributed transactions and database and table preparation.
Enumeration qualifies the data source name
package com.wx.spider.db.constant;
/**
* @author:feng
* @create:2021-04-16 15:59
*/
public enum DataSourceKey {
core, sec
}
Copy the code
You can configure multiple data sources based on your services. Currently, you need to configure two data sources, which are the same as those in bootstrap.yml.
Custom annotation @datasource and configure the facet
Adding this annotation to the business logic class automatically manipulates the data source corresponding to the name attribute.
package com.wx.spider.db.annotation; import java.lang.annotation.*; /** * @author:feng * @create:2021-04-16 15:54 */ @Target({ElementType.METHOD, @retention (RetentionPolicy.runtime) @documented Public @interface DataSource {// }Copy the code
Realization of section:
package com.wx.spider.db.aop; import com.wx.spider.db.annotation.DataSource; import com.wx.spider.db.constant.DataSourceKey; import com.wx.spider.db.uitl.DataSourceHolder; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.springframework.core.annotation.Order; /** * @author:feng * @create:2021-04-16 15:55 */ @slf4j @aspect @order (-1) // Ensure that this AOP implements a public class before @transactional DataSourceAOP { @Before("@annotation(ds)") public void changeDataSource(JoinPoint point, DataSource ds) throws Throwable { String dsId = ds.name(); try { DataSourceKey dataSourceKey = DataSourceKey.valueOf(dsId); DataSourceHolder.setDataSourceKey(dataSourceKey); } catch (Exception e) {log.error(" Data source [{}] does not exist, use default data source > {}", ds.name(), point.getSignature()); } } @After("@annotation(ds)") public void restoreDataSource(JoinPoint point, DataSource ds) { log.debug("Revert DataSource : {transIdo} > {}", ds.name(), point.getSignature()); DataSourceHolder.clearDataSourceKey(); }}Copy the code
package com.wx.spider.db.uitl; import com.wx.spider.db.constant.DataSourceKey; /** * @author:feng * @create:2021-04-16 16:00 */ public class DataSourceHolder { Private static final ThreadLocal<DataSourceKey> DataSourceKey = new ThreadLocal<>(); Public static DataSourceKey getDataSourceKey() {return datasourcekey.get (); } public static void setDataSourceKey(DataSourceKey type) {datasourcekey.set (type); } // Clear the current database connection public static void clearDataSourceKey() {datasourcekey.remove (); }}Copy the code
package com.wx.spider.db.uitl; import com.wx.spider.db.constant.DataSourceKey; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * @author:feng * @create:2021-04-16 16:00 */ public class DynamicDataSource extends AbstractRoutingDataSource { private Map<Object, Object> datasources; public DynamicDataSource() { datasources = new HashMap<>(); super.setTargetDataSources(datasources); } public <T extends DataSource> void addDataSource(DataSourceKey key, T data) { datasources.put(key, data); } protected Object determineCurrentLookupKey() { return DataSourceHolder.getDataSourceKey(); }}Copy the code
Configure auto-assembly using meta-INF/Spring.Factories
Spring. Factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.wx.spider.db.DataSourceAutoConfig
Copy the code
DataSourceAutoConfig:
package com.wx.spider.db; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration; import com.wx.spider.db.aop.DataSourceAOP; import com.wx.spider.db.constant.DataSourceKey; import com.wx.spider.db.uitl.DynamicDataSource; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import javax.sql.DataSource; /** * @author:feng * @create:2021-04-16 15:54 */ @Configuration @Import(DataSourceAOP.class) @AutoConfigureBefore(value={DruidDataSourceAutoConfigure.class, MybatisPlusAutoConfiguration.class}) @ConditionalOnProperty(name = {"spring.datasource.dynamic.enable"}, matchIfMissing = false, HavingValue = "true") public class DataSourceAutoConfig {// Create data source // All modules that introduce DB-core require a core library @bean @ConfigurationProperties(prefix = "spring.datasource.druid.core") public DataSource dataSourceCore(){ return DruidDataSourceBuilder.create().build(); } / / the second data source @ Bean @ ConfigurationProperties (prefix = "spring. The datasource. The druid. SEC") public datasource dataSourceSec () { return DruidDataSourceBuilder.create().build(); } @primary@bean // public DataSource DataSource () { System. The out. Println (" -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- dynamic data sources -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "); DynamicDataSource dataSource = new DynamicDataSource(); DataSource coreDataSource = dataSourceCore() ; DataSource logDataSource = dataSourceSec(); dataSource.addDataSource(DataSourceKey.core, coreDataSource); dataSource.addDataSource(DataSourceKey.sec, logDataSource); dataSource.setDefaultTargetDataSource(coreDataSource); return dataSource; } @ Bean / / data source should be brought into the spring things management public DataSourceTransactionManager transactionManager (@ the Qualifier (" dataSource ") the dataSource dataSource) { return new DataSourceTransactionManager(dataSource); }}Copy the code
Notes:
- Import the section (@import (datasourceaop.class));
- Before loading their own data sources are assembled @ AutoConfigureBefore (value = {DruidDataSourceAutoConfigure. Class, MybatisPlusAutoConfiguration class});
- Flexible configuration to enable load (@ ConditionalOnProperty (name = {” spring. The datasource. Dynamic. The enable “}, matchIfMissing = false, HavingValue = “true”).
3, redis – ws – the starter
This module is a secondary package of spring-boot-starter-data-redis, providing convenient redis and Redisson tools. You can also use the spring-boot-starter-data-redis operation classes StringRedisTemplate and RedisTemplate, compatible with stand-alone, clustered, and Sentinel configurations. Core classes:
package com.wx.spider.redis;
import com.wx.spider.redis.serizlizer.RedisObjectSerializer;
import com.wx.spider.redis.util.RedisUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties.Sentinel;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.ReflectionUtils;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;
/**
* @author:feng
* @create:2021-04-14 16:29
*/
@Configuration
@EnableCaching
@SuppressWarnings("all")
@AutoConfigureBefore(RedisTemplate.class)
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisAutoConfig {
@Autowired(required = false)
private RedissonProperties redissonProperties;
@Autowired
private RedisProperties redisProperties;
@Autowired
private ApplicationContext ctx;
@Bean(destroyMethod = "destroy")
@ConditionalOnClass(RedisClient.class)
public LettuceConnectionFactory lettuceConnectionFactory(GenericObjectPoolConfig genericObjectPoolConfig) {
Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
RedisConfiguration redisConfiguration = null;
LettuceClientConfiguration clientConfig = null;
if (redisProperties.getSentinel() != null) {
// 哨兵配置
Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
String[] nodes = null;
Set<String> sentinelHostAndPorts = new HashSet<>();
if (nodesValue instanceof String) {
nodes = convert(Arrays.asList(((String) nodesValue).split(",")));
sentinelHostAndPorts.addAll(Arrays.asList(((String) nodesValue).split(",")));
} else {
nodes = convert((List<String>) nodesValue);
sentinelHostAndPorts.addAll((List<String>) nodesValue);
}
redisConfiguration = new RedisSentinelConfiguration(redisProperties.getSentinel().getMaster(),
sentinelHostAndPorts);
((RedisSentinelConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
((RedisSentinelConfiguration) redisConfiguration).setDatabase(redisProperties.getDatabase());
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig).build();
} else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
// 集群配置
List<String> clusterNodes = redisProperties.getCluster().getNodes();
Set<RedisNode> nodes = new HashSet<RedisNode>();
clusterNodes.forEach(address -> nodes
.add(new RedisNode(address.split(":")[0].trim(), Integer.valueOf(address.split(":")[1]))));
redisConfiguration = new RedisClusterConfiguration();
((RedisClusterConfiguration) redisConfiguration).setClusterNodes(nodes);
((RedisClusterConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
/**
* ClusterTopologyRefreshOptions配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,
* Redis集群变更时将会导致连接异常!
*/
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
// 开启自适应刷新
.enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
// 开启所有自适应刷新,MOVED,ASK,PERSISTENT都会触发
// .enableAllAdaptiveRefreshTriggers()
// 自适应刷新超时时间(默认30秒)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(25)) // 默认关闭开启后时间为30秒
// 开周期刷新
.enablePeriodicRefresh(Duration.ofSeconds(20)) // 默认关闭开启后时间为60秒
// ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD
// 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) =
// .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2))
.build();
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig)
.clientOptions(
ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build())
// 将appID传入连接,方便Redis监控中查看
// .clientName(appName + "_lettuce")
.build();
} else {
// 单机版配置
redisConfiguration = new RedisStandaloneConfiguration();
((RedisStandaloneConfiguration) redisConfiguration).setDatabase(redisProperties.getDatabase());
((RedisStandaloneConfiguration) redisConfiguration).setHostName(redisProperties.getHost());
((RedisStandaloneConfiguration) redisConfiguration).setPort(redisProperties.getPort());
((RedisStandaloneConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig).build();
}
if (redisProperties.isSsl()) {
clientConfig.isUseSsl();
}
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisConfiguration, clientConfig);
return factory;
}
/**
* GenericObjectPoolConfig 连接池配置
*/
@Bean
public GenericObjectPoolConfig genericObjectPoolConfig() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
poolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
poolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
poolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().getSeconds());
Duration timeOut = redisProperties.getTimeout();
Duration shutdownTimeout = redisProperties.getLettuce().getShutdownTimeout();
return poolConfig;
}
@Bean
public CacheManager cacheManager(LettuceConnectionFactory lettuceConnectionFactory ) {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();
redisCacheConfiguration = redisCacheConfiguration.entryTtl(Duration.ofMinutes(30L)) // 设置缓存的默认超时时间:30分钟
.disableCachingNullValues() // 如果是空值,不缓存
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.string())) // 设置key序列化器
.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.java())); // 设置value序列化器
return RedisCacheManager.builder(RedisCacheWriter.nonLockingRedisCacheWriter(lettuceConnectionFactory))
.cacheDefaults(redisCacheConfiguration).build();
}
/**
* 适配redis cluster单节点
*/
// @Primary
// @Bean("redisTemplate")
// 没有此属性就不会装配bean 如果是单个redis 将此注解注释掉
// @ConditionalOnProperty(name = "spring.redis.cluster.nodes", matchIfMissing = false)
public RedisTemplate<String, Object> getRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
RedisSerializer stringSerializer = new StringRedisSerializer();
// RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
redisTemplate.setKeySerializer(stringSerializer); // key的序列化类型
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.setHashValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.afterPropertiesSet();
redisTemplate.opsForValue().set("hello", "wolrd");
return redisTemplate;
}
/**
* 适配redis单节点
*/
@Primary
@Bean("redisTemplate")
@ConditionalOnProperty(name = "spring.redis.host", matchIfMissing = true)
public RedisTemplate<String, Object> getSingleRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key的序列化类型
redisTemplate.setValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.setHashValueSerializer(redisObjectSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public HashOperations<String, String, String> hashOperations(StringRedisTemplate stringRedisTemplate) {
return stringRedisTemplate.opsForHash();
}
/**
* redis工具类
*/
@Bean("redisUtil")
public RedisUtil redisUtil(LettuceConnectionFactory lettuceConnectionFactory,
StringRedisTemplate stringRedisTemplate, HashOperations<String, String, String> hashOperations) {
RedisUtil redisUtil = new RedisUtil(lettuceConnectionFactory, stringRedisTemplate, hashOperations);
return redisUtil;
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "spring.redis.redisson.enable", matchIfMissing = false, havingValue = "true")
@ConditionalOnMissingBean(RedissonClient.class)
public RedissonClient redissonClient() throws IOException {
Config config = null;
Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
int timeout;
if (null == timeoutValue) {
timeout = 60000;
} else if (!(timeoutValue instanceof Integer)) {
Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue();
} else {
timeout = (Integer) timeoutValue;
}
// spring.redis.redisson.config=classpath:redisson.yaml
if (redissonProperties.getConfig() != null) {
try {
InputStream is = getConfigStream();
config = Config.fromJSON(is);
} catch (IOException e) {
// trying next format
try {
InputStream is = getConfigStream();
config = Config.fromYAML(is);
} catch (IOException ioe) {
throw new IllegalArgumentException("Can't parse config", ioe);
}
}
} else if (redisProperties.getSentinel() != null) {
// 哨兵配置
Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
String[] nodes;
if (nodesValue instanceof String) {
nodes = convert(Arrays.asList(((String) nodesValue).split(",")));
} else {
nodes = convert((List<String>) nodesValue);
}
config = new Config();
config.useSentinelServers().setMasterName(redisProperties.getSentinel().getMaster())
.addSentinelAddress(nodes).setDatabase(redisProperties.getDatabase()).setConnectTimeout(timeout)
.setPassword(redisProperties.getPassword());
} else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
// 集群配置
Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties);
Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject);
String[] nodes = convert(nodesObject);
config = new Config();
config.useClusterServers().addNodeAddress(nodes).setConnectTimeout(timeout)
.setPassword(redisProperties.getPassword());
} else {
// 单机redssion默认配置
config = new Config();
String prefix = "redis://";
Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {
prefix = "rediss://";
}
config.useSingleServer().setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
.setConnectTimeout(timeout).setDatabase(redisProperties.getDatabase())
.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
private String[] convert(List<String> nodesObject) {
List<String> nodes = new ArrayList<String>(nodesObject.size());
for (String node : nodesObject) {
if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {
nodes.add("redis://" + node);
} else {
nodes.add(node);
}
}
return nodes.toArray(new String[nodes.size()]);
}
private InputStream getConfigStream() throws IOException {
Resource resource = ctx.getResource(redissonProperties.getConfig());
InputStream is = resource.getInputStream();
return is;
}
}
Copy the code
4, rocketmq – ws – the starter
This module is a secondary encapsulation of RocketmQ-spring-boot-starter. Since the consumers of the packaged rocketMq-spring-boot-starter do not have the function of obtaining the retry times of consumption, they do the encapsulation themselves, because there are many uncertainties in crawler failure, so as to record error logs after repeated consumption failures. Core code: Configure consumer patterns and themes, thread counts, and so on through custom annotations.
package com.wx.spider.rocketmq.annotation;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.lang.annotation.*;
/**
* @author:feng
* @create:2021-04-21 15:17
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WsRocketMQConsumer {
String consumerGroup() default "";
String topic();
String selectorExpression() default "*";
int consumeThreadMax() default 32;
int consumeThreadMin() default 8;
int consumeMessageBatchMaxSize() default 1;
String nameServer() default "";
MessageModel messageModel() default MessageModel.CLUSTERING;
}
Copy the code
Create consumer listeners by inheriting abstract classes, adding business logic and retrieving consumption counts via MessageExt.
package com.wx.spider.rocketmq.consumer; import com.wx.spider.rocketmq.annotation.WsRocketMQConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; import javax.annotation.PostConstruct; /** * @author:feng * @create:2021-04-21 12:21 * @slf4j public Abstract class BaseRocketMQConsumer { @Value("${rocketmq.name-server}") private String namesrvAddr; @Value("${rocketmq.consumer.group}") private String groupName; @PostConstruct public void init() { WsRocketMQConsumer annotation = this.getClass().getAnnotation(WsRocketMQConsumer.class); String nameServer = annotation.nameServer(); if(nameServer ! = null && !" ".equals(nameServer)){ namesrvAddr = nameServer; } String consumerGroup = annotation.consumerGroup(); if(consumerGroup ! = null && !" ".equals(consumerGroup)){ groupName = consumerGroup; } String topicStr = annotation.topic(); String selectorExpression = annotation.selectorExpression(); int consumeThreadMin = annotation.consumeThreadMin(); int consumeThreadMax = annotation.consumeThreadMax(); MessageModel messageModel = annotation.messageModel(); int consumeMessageBatchMaxSize = annotation.consumeMessageBatchMaxSize(); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setMessageModel(messageModel); try { consumer.subscribe(topicStr, selectorExpression); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> { try { for (MessageExt messageExt : list) { String tag = messageExt.getTags(); String topic = messageExt.getTopic(); return consume(tag, topic, messageExt); } } catch (Exception e) { e.printStackTrace(); / / try again later return ConsumeConcurrentlyStatus. RECONSUME_LATER; } / / consumer successful return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; }); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); consumer.start(); System.out.println(" Consumer startup done "); } catch (Exception e) { e.printStackTrace(); } } protected abstract ConsumeConcurrentlyStatus consume(String tag, String topic, MessageExt messageExt); }Copy the code
conclusion
This article explains the relationship between modules and the encapsulation of the starter module. The functional structure of the project is probably clear. Later, the article will enter the business logic section and introduce how to get the article information automatically by capturing packages and JS injection.