A list,
Storm-Redis provides Storm and Redis integration support, you only need to import the corresponding dependencies to use:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
<type>jar</type>
</dependency>
Copy the code
Storm-redis uses Jedis as the Redis client and provides the following three basic Bolt implementations:
- RedisLookupBolt: Query data from Redis;
- RedisStoreBolt: Stores data to Redis;
- RedisFilterBolt: Queries eligible data;
RedisLookupBolt, RedisStoreBolt, and RedisFilterBolt all inherit from the AbstractRedisBolt class. We can extend this functionality by inheriting this abstract class and implementing a custom RedisBolt.
Ii. Integration cases
2.1 Project Structure
Here is an integration example: take word frequency statistics and store the final results in Redis. The project structure is as follows:
Use case source code download address: storm- Redis-integration
2.2 Project Dependency
The project mainly relies on the following:
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
Copy the code
2.3 DataSourceSpout
/** * The source of the word frequency sample */
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark"."Hadoop"."HBase"."Storm"."Flink"."Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple(a) {
// The simulation generates data
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/** ** analog data */
private String productData(a) {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t".0, endIndex); }}Copy the code
The generated simulated data format is as follows:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
Copy the code
2.4 SplitBolt
/** * splits each row with the specified delimiter */
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word, String.valueOf(1))); }}@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"."count")); }}Copy the code
2.5 CountBolt
/** ** count word frequency */
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
/ / output
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"."count")); }}Copy the code
2.6 WordCountStoreMapper
RedisStoreMapper interface is implemented to define the mapping relationship between tuple and Redis data. That is, specify which field is key and which field is value in a Tuple, and store them in what data structure of Redis.
/** * define the mapping between tuple and data in Redis */
public class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper(a) {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription(a) {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count"); }}Copy the code
2.7 WordCountToRedisApp
/** * Perform word frequency statistics and store statistics results in Redis */
public class WordCountToRedisApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String STORE_BOLT = "storeBolt";
// In real development these parameters can be passed in externally to make the program more flexible
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
// save to redis
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);
// If cluster is transmitted externally, it indicates that the system is started online; otherwise, it indicates that the system is started locally
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp".new Config(), builder.createTopology());
} catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp".newConfig(), builder.createTopology()); }}}Copy the code
2.8 Starting tests
You can run it directly in local mode, or you can package it and commit it to a server cluster. The source code provided by this repository is packaged with maven-shade-plugin by default, and the package command is as follows:
# mvn clean package -D maven.test.skip=true
Copy the code
After starting, look at the data in Redis:
Iii. Implementation principle of Storm-Redis
3.1 AbstractRedisBolt
RedisLookupBolt, RedisStoreBolt, and RedisFilterBolt all derive from the AbstractRedisBolt abstract class. AbstractRedisBolt is indirectly derived from BaseRichBolt.
Important in AbstractRedisBolt is the prepare method, In the method through an external incoming jedis connection pool configuration (jedisPoolConfig/jedisClusterConfig) create for managing JedisCommandsInstanceContainer jedis instances of container.
public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
protected OutputCollector collector;
private transient JedisCommandsInstanceContainer container;
private JedisPoolConfig jedisPoolConfig;
privateJedisClusterConfig jedisClusterConfig; .@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
// FIXME: stores map (stormConf), topologyContext and expose these to derived classes
this.collector = collector;
if(jedisPoolConfig ! =null) {
this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
} else if(jedisClusterConfig ! =null) {
this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found"); }}... }Copy the code
JedisCommandsInstanceContainer the build () method is as follows, in fact, is to create JedisPool or JedisCluster and introduced into the container.
public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
return new JedisContainer(jedisPool);
}
public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
return new JedisClusterContainer(jedisCluster);
}
Copy the code
3.2 RedisStoreBolt and RedisLookupBolt
The more important method in RedisStoreBolt is the process method, which mainly obtains the value of the incoming key/value from storeMapper and stores it by calling the corresponding method of jedisCommand according to its storage type dataType.
The implementation of RedisLookupBolt is basically similar, fetching the incoming key from lookupMapper and performing query operations.
public class RedisStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
public void process(Tuple input) {
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
switch (dataType) {
case STRING:
jedisCommand.set(key, value);
break;
case LIST:
jedisCommand.rpush(key, value);
break;
case HASH:
jedisCommand.hset(additionalKey, key, value);
break;
case SET:
jedisCommand.sadd(key, value);
break;
case SORTED_SET:
jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
break;
case HYPER_LOG_LOG:
jedisCommand.pfadd(key, value);
break;
case GEO:
String[] array = value.split(":");
if(array.length ! =2) {
throw new IllegalArgumentException("value structure should be longitude:latitude");
}
double longitude = Double.valueOf(array[0]);
double latitude = Double.valueOf(array[1]);
jedisCommand.geoadd(additionalKey, longitude, latitude, key);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
collector.ack(input);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
} finally{ returnInstance(jedisCommand); }}... }Copy the code
3.3 JedisCommands
All Redis client commands are defined in the JedisCommands interface, which has the following three implementation classes: Jedis, JedisCluster, ShardedJedis. Strom mainly uses the first two implementation classes. Which implementation class is called to execute commands depends on whether jedisPoolConfig or jedisClusterConfig is passed in.
3.4 RedisMapper and TupleMapper
RedisMapper and TupleMapper define how data in tuple and Redis is mapped and transformed.
1. TupleMapper
TupleMapper defines two main methods:
-
GetKeyFromTuple (ITuple tuple) : Get the field from the tuple as the Key;
-
GetValueFromTuple (ITuple tuple) : Get the field from the tuple as the Value;
2. RedisMapper
Defines the data type of method to obtain getDataTypeDescription (), RedisDataTypeDescription RedisDataType enumeration class defines all of the available in Redis data types:
public class RedisDataTypeDescription implements Serializable {
public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
......
}
Copy the code
3. RedisStoreMapper
RedisStoreMapper inherits the TupleMapper and RedisMapper interfaces and is used for data storage without defining additional methods.
4. RedisLookupMapper
RedisLookupMapper inherits the TupleMapper and RedisMapper interfaces:
- DeclareOutputFields defines the declareOutputFields method to declareOutputFields.
- The toTuple method is defined to assemble the query results into a collection of Storm Values for sending.
The following example shows that the word field is obtained from the input Tuple as the key, and the key and the result value are assembled into values after the RedisLookupBolt query and sent to the next processing unit.
class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper(a) {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List<Values> toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List<Values> values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName"."count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription(a) {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null; }}Copy the code
5. RedisFilterMapper
RedisFilterMapper inherits TupleMapper and RedisMapper interfaces to query data. It defines declareOutputFields to declareOutputFields. The following implementation:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName"."count"));
}
Copy the code
4. Customized RedisBolt to realize word frequency statistics
4.1 Implementation Principle
Custom RedisBolt: It mainly uses hincrby key field command of hashed structure in Redis for word frequency statistics. In Redis, hincrby is executed as follows. Hincrby increments a field by a specified value, or creates a new field if it doesn’t already exist and assigns it a value of 0. Using this command, you can easily implement the word frequency statistics function.
redis> HSET myhash field 5
(integer) 1
redis> HINCRBY myhash field 1
(integer) 6
redis> HINCRBY myhash field -1
(integer) 5
redis> HINCRBY myhash field -10
(integer) -5
redis>
Copy the code
4.2 Project Structure
4.3 Custom code implementation of RedisBolt
/** * Custom RedisBolt uses the hincrby key field command in the Redis hash data structure for word frequency statistics */
public class RedisCountStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
protected void process(Tuple tuple) {
String key = storeMapper.getKeyFromTuple(tuple);
String value = storeMapper.getValueFromTuple(tuple);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
} else {
throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
}
collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} finally{ returnInstance(jedisCommand); }}@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}}Copy the code
4.4 CustomRedisCountApp
/** * Use custom RedisBolt to implement word frequency statistics */
public class CustomRedisCountApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String STORE_BOLT = "storeBolt";
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// save to redis and count
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);
// If the external parameter cluster is transmitted, it indicates that the online environment is started; otherwise, it indicates that the local environment is started
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterCustomRedisCountApp".new Config(), builder.createTopology());
} catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalCustomRedisCountApp".newConfig(), builder.createTopology()); }}}Copy the code
The resources
- Storm Redis Integration
See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series