This is the sixth day of my participation in the August Text Challenge.More challenges in August

5. Define the database configuration classes
Public class TmallConfig {/** * hbase database */ public static final String HBASE_SCHEMA = "TMALL_REALTIME"; Phoenix connection address * * * * / / public static final String PHOENIX_SERVER = "JDBC: phoenix: hd1, hd2, hd3:2181". / clickhouse connection address * * * * / public static final String CLICKHOUSE_URL = "JDBC: clickhouse: / / hd1:8123 / tmall_realtime"; }Copy the code
6. Define the CDC action type enumeration class
Public enum CDCTypeEnum {/** * CDC UPDATE */ UPDATE("u"); /** * CDC UPDATE */ UPDATE("u"); /** * CDC d operation type change to DELETE */ DELETE("d"); private static final Map<String, CDCTypeEnum> MAP = new HashMap<>(); static { for (CDCTypeEnum cdcTypeEnum : CDCTypeEnum.values()) { MAP.put(cdcTypeEnum.op, cdcTypeEnum); } } String op; CDCTypeEnum(String op) { this.op = op; } public static CDCTypeEnum of(String c) { return MAP.get(c); }}Copy the code
7. Encapsulate the Kafka utility class
public class KafkaUtil { private static final String KAFKA_SERVER = "hd1:9092,hd2:9092,hd3:9092"; Private static Properties getProps() {Properties Properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); return properties; @param topic * @param groupId * @return Kafka Source */ public static SourceFunction<String> ofSource(String topic, String groupId) { Properties props = getProps(); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props); } /** * Kafka Sink * @param topic * @return Kafka Sink * ofSink(String topic) { return new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), getProps()); } /** * Kafka Sink * @param serializationSchema * @param <IN> * @return Kafka Sink */ public static <IN> SinkFunction<IN> ofSink(KafkaSerializationSchema<IN> serializationSchema) { return new FlinkKafkaProducer<>("default_topic", serializationSchema, getProps(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE); * @param topic * @param groupId * @return */ public static String  getKafkaDDL(String topic, String groupId) { return " 'connector' = 'kafka', " + " 'topic' = '" + topic + "'," + " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " + " 'properties.group.id' = '" + groupId + "', " + " 'format' = 'json', " + " 'scan.startup.mode' = 'latest-offset' "; }}Copy the code
8. Encapsulate redis tool class and cache DIM data
public class RedisUtil { private static volatile JedisPool jedisPool; public static Jedis getJedis() { if (jedisPool == null) { synchronized (RedisUtil.class) { if (jedisPool == null) { JedisPoolConfig poolConfig = new JedisPoolConfig(); Poolconfig.setmaxtotal (100); / / connection run out whether waiting poolConfig setBlockWhenExhausted (true); / / waiting time poolConfig setMaxWaitMillis (2000); Poolconfig. setMaxIdle(5); Poolconfig. setMinIdle(5); / / remove connections when the test ping pong poolConfig. SetTestOnBorrow (true); JedisPool = new jedisPool (poolConfig, "hd1", 6379, 1000, "password "); } } } return jedisPool.getResource(); }}Copy the code
9. Encapsulate the Phoenix tool class to query HBase
public class PhoenixUtil { private static Connection connection; @param SQL @param clazz @param <T> @param clazz @param <T> @param clazz @param <T> @param clazz @param <T> @param clazz @param <T>  queryList(String sql, Class<T> clazz) { if (connection == null) { initConnection(); } try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { ResultSet resultSet = preparedStatement.executeQuery(); ResultSetMetaData metaData = resultSet.getMetaData(); ArrayList<T> resList = new ArrayList<>(); while (resultSet.next()) { T t = clazz.newInstance(); for (int i = 1; i <= metaData.getColumnCount(); i++) { BeanUtils.setProperty(t, metaData.getColumnName(i), resultSet.getObject(i)); } resList.add(t); } resultSet.close(); return resList; } catch (Exception e) { e.printStackTrace(); } return Collections.emptyList(); } /** * Initialize Phoenix connection */ @sneakythrows private static void initConnection() { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER); }}Copy the code
10. Encapsulate the DIM HBase query tool class
Public class DimUtil {/** * delete cache * @param tableName * @param id */ public static void delDimCache(String tableName, String id) { StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(id); try (Jedis jedis = RedisUtil.getJedis()) { jedis.del(cacheKey.toString()); @param tableName tableName @param id id @return data */ public static JSONObject getDimInfo(String tableName, String id) { return getDimInfo(tableName, Tuple2.of("id", id)); @param colAndValue Mandatory (condition, value) * @param colAndValues Optional (condition, value) @safevarargs public static JSONObject getDimInfo(String tableName, Tuple2<String, String> colAndValue, Tuple2<String, String>... ColAndValues) {// cacheKey StringBuilder cacheKey = new StringBuilder().append("dim:").append(tableName.toLowerCase()).append(":").append(colAndValue.f1); for (Tuple2<String, String> cv : colAndValues) { cacheKey.append("_").append(cv.f1); } try (Jedis Jedis = redisutil.getJedis ()) {String STR = Jedis(cachekey.tostring ()); if (StringUtils.isNotBlank(str)) { return JSON.parseObject(str); SQL = new StringBuilder(); sql.append("select * from ").append(TmallConfig.HBASE_SCHEMA).append(".").append(tableName) .append(" where ").append(colAndValue.f0).append("='").append(colAndValue.f1).append("' "); for (Tuple2<String, String> cv : colAndValues) { sql.append("and ").append(cv.f0).append("='").append(cv.f1).append("' "); } // queryList <JSONObject> jsonObjectList = phoenixutil. queryList(sql.tostring (), jsonobject.class); if (! jsonObjectList.isEmpty()) { JSONObject jsonObject = jsonObjectList.get(0); jedis.setex(cacheKey.toString(), 60 * 60 * 24, jsonObject.toJSONString()); return jsonObject; } } return null; }}Copy the code
11. HBase ideas
  • In order to openhbasenamespaceandphoenixtheschemaThe mapping ofhbaseAs well asphoenixthehbase-site.xmlIn the configuration file
<property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
</property>

<property>
    <name>phoenix.schema.mapSystemTablesToNamespace</name>
    <value>true</value>
</property>
Copy the code
  • Also need tohbase-site.xmlPut it in the program

Next period: DIM layer & DWD layer core code

The column continues to be updated at 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻