The premise

This is an article that HAS been held in for a long time. I always wanted to write it, but I forgot to write it all the time. The whole article may be a bit of a rundown, explaining in relative detail how to write a small “skeleton”. This neat layer of glue has been in production for more than six months, and is an attempt to strip away the code that is coupled to the business and refine it into a relatively clean version.

Wrote a few articles inside one before mentioned Canal after parsing MySQL binlog event object is as follows (from the Canal source com. Alibaba. Otter. Canal. The protocol. The FlatMessage) :

If we were to parse the original object directly, there would be a lot of parsing template code, and any changes would be all over the place, which is something we don’t want to happen. Therefore, I spent a little time to write a Canal glue layer, so that the FlatMessage received can be directly converted into the corresponding DTO instance according to the table name, which can improve the development efficiency to a certain extent and reduce the template code. The data flow diagram of this glue layer is as follows:

To write such a glue layer, use:

  • Reflection.
  • Annotation.
  • Policy mode.
  • IOCContainer (optional).

The modules of the project are as follows:

  • canal-glue-core: Core functions.
  • spring-boot-starter-canal-glue: adaptationSpringtheIOCContainer, add automatic configuration.
  • canal-glue-example: Use examples and benchmarks.

The details of how this glue layer is implemented are described below.

Introduction of depend on

In order not to pollute external service dependencies that reference this module, except for JSON conversion dependencies, the scope of other dependencies is defined as provide or test, and the dependency version and BOM are as follows:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>2.3.0. RELEASE</spring.boot.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <lombok.version>1.18.12</lombok.version>
        <fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>
Copy the code

The Canal-glum-core module essentially relies only on FastJSON and can be used completely without the Spring architecture.

Basic architecture

Here’s a “hindsight” architecture diagram, because the first version didn’t take that much into account and even coupled the business code in order to quickly get on the line, and components were pulled out later:

Design Configuration module (removed)

In the design of the configuration module, two ways of external configuration file and pure annotation were considered. JSON external configuration file was used in the early stage, and pure annotation was added later. This section briefly covers configuration loading of JSON external configuration files, leaving pure annotations for later processor modules.

The purpose was to quickly develop the glue layer, so the configuration file uses a readable JSON format:

{
  "version": 1."module": "canal-glue"."databases": [{"database": "db_payment_service"."processors": [{"table": "payment_order"."processor": "x.y.z.PaymentOrderProcessor"."exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"}]}, {...... }}]Copy the code

When designing JSON configuration, do not use JSON Array as the top-level configuration, because this will make the object design strange

Because the application using this module may need to process the binlog event of Canal parsing multiple upstream databases, the configuration module design needs to take database as the KEY, mount multiple tables and the corresponding table binlog event handler and exception handler. Then flick through the entity class corresponding to the JSON file format:

@Data
public class CanalGlueProcessorConf {

    private String table;

    private String processor;

    private String exceptionHandler;
}

@Data
public class CanalGlueDatabaseConf {

    private String database;

    private List<CanalGlueProcessorConf> processors;
}

@Data
public class CanalGlueConf {

    private Long version;

    private String module;

    private List<CanalGlueDatabaseConf> database;
}
Copy the code

After the entity is written, you can write a configuration loader. For simplicity, the configuration file is placed directly under the ClassPath. The loader looks like this:

public interface CanalGlueConfLoader {

    CanalGlueConf load(String location);
}

/ / implementation
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {

    @Override
    public CanalGlueConf load(String location) {
        ClassPathResource resource = new ClassPathResource(location);
        Assert.isTrue(resource.exists(), String.format("File %s does not exist under classpath", location));
        try {
            String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            return JSON.parseObject(content, CanalGlueConf.class);
        } catch (IOException e) {
            // should not reach
            throw newIllegalStateException(e); }}}Copy the code

Read a string of file contents in the ClassPath where location is an absolute path and use Fasfjson to convert it into a CanalGlueConf object. This is the default implementation, which can be overridden using the canal-Glue module, loading the configuration through a custom implementation.

JSON configuration modules were later scrapped when this glue layer was removed from the business system and implemented using pure annotation drivers and core abstract component inheritance.

Core module development

It mainly includes several modules:

  • Basic model definition.
  • Adapter layer development.
  • Converter and parser layer development.
  • Processor layer development.
  • Global component auto-configuration module development (onlySpringThe system has been extractedspring-boot-starter-canal-glueModules).
  • CanalGlueDevelopment.

Basic model definition

Define a top-level KEY, that is, a unique identifier for an identified table in a database:

// Model table objects
public interface ModelTable {

    String database(a);

    String table(a);

    static ModelTable of(String database, String table) {
        returnDefaultModelTable.of(database, table); }}@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {

    private final String database;
    private final String table;

    @Override
    public String database(a) {
        return database;
    }

    @Override
    public String table(a) {
        return table;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null|| getClass() ! = o.getClass()) {return false;
        }
        DefaultModelTable that = (DefaultModelTable) o;
        return Objects.equals(database, that.database) &&
                Objects.equals(table, that.table);
    }

    @Override
    public int hashCode(a) {
        returnObjects.hash(database, table); }}Copy the code

DefaultModelTable overwrites the equals() and hashCode() methods to make it easier to apply ModelTable instances as keys to the HashMap container so that we can later design the cache structure of the ModelTable -> Processor.

Since Canal’s event content to Kafka is a raw string, we define a CanalBinLogEvent class that is basically the same as FlatMessage:

@Data
public class CanalBinLogEvent {

    /** * Event ID, no actual meaning */
    private Long id;

    /** * Current node data */
    private List<Map<String, String>> data;

    /** * list of primary key column names */
    private List<String> pkNames;

    /** * The current node data */
    private List<Map<String, String>> old;

    /** * UPDATE\INSERT\DELETE\QUERY */
    private String type;

    /** * binlog execute time */
    private Long es;

    /** * dml build timestamp */
    private Long ts;

    /** ** execute SQL, do not have */
    private String sql;

    /** * Database name */
    private String database;

    /** * table name */
    private String table;

    /** * SQL type mapping */
    private Map<String, Integer> sqlType;

    /** * MySQL field type mapping */
    private Map<String, String> mysqlType;

    /** * whether to DDL */
    private Boolean isDdl;
}
Copy the code

CanalBinLogResult:

/ / constant
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {
    
    QUERY("QUERY"."Query"),

    INSERT("INSERT"."New"),

    UPDATE("UPDATE"."Update"),

    DELETE("DELETE"."Delete"),

    ALTER("ALTER"."Column modification Operation"),

    UNKNOWN("UNKNOWN"."Unknown"),;private final String type;
    private final String description;

    public static BinLogEventType fromType(String type) {
        for (BinLogEventType binLogType : BinLogEventType.values()) {
            if (binLogType.getType().equals(type)) {
                returnbinLogType; }}returnBinLogEventType.UNKNOWN; }}/ / constant
@RequiredArgsConstructor
@Getter
public enum OperationType {

    /** * DML */
    DML("dml"."DML statements"),

    /** * DDL */
    DDL("ddl"."DDL statements"),;private final String type;
    private final String description;
}

@Data
public class CanalBinLogResult<T> {

    /** * the long primary key */
    private Long primaryKey;


    /** * binlog Indicates the event type */
    private BinLogEventType binLogEventType;

    /** ** data before change */
    private T beforeData;

    /**
     * 更变后的数据
     */
    private T afterData;

    /** * Database name */
    private String databaseName;

    /** * table name */
    private String tableName;

    /** * SQL statements - generally useful for DDL */
    private String sql;

    /** * MySQL operation type */
    private OperationType operationType;
}
Copy the code

Developing the adapter layer

Define the top-level adapter SPI interface:

public interface SourceAdapter<SOURCE.SINK> {

    SINK adapt(SOURCE source);
}
Copy the code

Next, develop the adapter implementation class:

// The original string is returned directly
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String.String> {

    @Override
    public String adapt(String source) {
        returnsource; }}/ / Fastjson transformation
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String.T> {

    private final Class<T> klass;

    @Override
    public T adapt(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        returnJSON.parseObject(source, klass); }}// Facade
public enum SourceAdapterFacade {

    /** * singleton */
    X;

    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();

    @SuppressWarnings("unchecked")
    public <T> T adapt(Class<T> klass, String source) {
        if (klass.isAssignableFrom(String.class)) {
            return (T) I_S_A.adapt(source);
        }
        returnFastJsonSourceAdapter.of(klass).adapt(source); }}Copy the code

The end result is simply to use the SourceAdapterFacade#adapt() method, because in most cases only raw strings and String -> Class instances will be used, and the adapter layer design can be simpler.

Develop the converter and parser layers

For the binlog event resolved by Canal, the data and old attributes are k-V structures, and the KEY is of String type, so the complete target instance can be derived only through traversal analysis.

The attribute type of the converted instance is currently only supported by the wrapper class, not the primitive type such as int

To better map the target entity to the actual database, table and column names, and column types, two custom annotations CanalModel and @CanalField are introduced, defined as follows:

// @CanalModel
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CanalModel {

    /**
     * 目标数据库
     */
    String database(a);

    /** * target table */
    String table(a);

    /** * Attribute name -> column name name transformation policy, the optional values are: DEFAULT(primitive), UPPER_UNDERSCORE(UPPER_UNDERSCORE), and LOWER_UNDERSCORE(lowercase underscore) */
    FieldNamingPolicy fieldNamingPolicy(a) default FieldNamingPolicy.DEFAULT;
}

// @CanalField
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CanalField {

    /** ** line name **@return columnName
     */
    String columnName(a) default "";

    /** * SQL field type **@return JDBCType
     */
    JDBCType sqlType(a) default JDBCType.NULL;

    /** * Converter type **@return klass
     */Class<? extends BaseCanalFieldConverter<? >> converterKlass()default NullCanalFieldConverter.class;
}
Copy the code

Define the top-level converter interface BinLogFieldConverter:

public interface BinLogFieldConverter<SOURCE.TARGET> {

    TARGET convert(SOURCE source);
}
Copy the code

It is tentatively possible to match the Class of the target attribute with the SQLType specified by the annotation, so define an abstract converter BaseCanalFieldConverter:

public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String.T> {

    private final SQLType sqlType;
    private finalClass<? > klass;protected BaseCanalFieldConverter(SQLType sqlType, Class
        klass) {
        this.sqlType = sqlType;
        this.klass = klass;
    }

    @Override
    public T convert(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return convertInternal(source);
    }

    /** * internal conversion method **@paramSource Indicates the source character string *@return T
     */
    protected abstract T convertInternal(String source);

    /** * returns the SQL type **@return SQLType
     */
    public SQLType sqlType(a) {
        return sqlType;
    }

    /** * Return type **@returnClass<? > * /
    publicClass<? > typeKlass() {returnklass; }}Copy the code

BaseCanalFieldConverter is facing a single attribute of the target instance, for instance the properties of the Long type, for example, can achieve a BigIntCanalFieldConverter:

public class BigIntCanalFieldConverter extends BaseCanalFieldConverter<Long> {

    /** * singleton */
    public static final BaseCanalFieldConverter<Long> X = new BigIntCanalFieldConverter();

    private BigIntCanalFieldConverter(a) {
        super(JDBCType.BIGINT, Long.class);
    }

    @Override
    protected Long convertInternal(String source) {
        if (null == source) {
            return null;
        }
        returnLong.valueOf(source); }}Copy the code

The following are the most commonly used built-in converters that have been developed:

JDBCType JAVAType converter
NULL Void NullCanalFieldConverter
BIGINT Long BigIntCanalFieldConverter
VARCHAR String VarcharCanalFieldConverter
DECIMAL BigDecimal DecimalCanalFieldConverter
INTEGER Integer IntCanalFieldConverter
TINYINT Integer TinyIntCanalFieldConverter
DATE java.time.LocalDate SqlDateCanalFieldConverter0
DATE java.sql.Date SqlDateCanalFieldConverter1
TIMESTAMP java.time.LocalDateTime TimestampCanalFieldConverter0
TIMESTAMP java.util.Date TimestampCanalFieldConverter1
TIMESTAMP java.time.OffsetDateTime TimestampCanalFieldConverter2

All converter implementations are designed as stateless singletons for dynamic registration and overwriting. Then define a converter plant CanalFieldConverterFactory, provide the API by specifying the parameter loading target converter:

/ / into the refs
@SuppressWarnings("rawtypes")
@Builder
@Data
public class CanalFieldConvertInput {

    privateClass<? > fieldKlass;private Class<? extends BaseCanalFieldConverter> converterKlass;
    private SQLType sqlType;

    @Tolerate
    public CanalFieldConvertInput(a) {}}/ / the result
@Builder
@Getter
public class CanalFieldConvertResult {

    private finalBaseCanalFieldConverter<? > converter; }/ / interface
public interface CanalFieldConverterFactory {

    default void registerConverter(BaseCanalFieldConverter
        converter) {
        registerConverter(converter, true);
    }

    void registerConverter(BaseCanalFieldConverter<? > converter,boolean replace);

    CanalFieldConvertResult load(CanalFieldConvertInput input);
}
Copy the code

CanalFieldConverterFactory provides can register a custom converter registerConverter () method, so that the user register custom converters and override the default converter.

At this point, you can load the converter of instance attributes with the specified parameters, get the converter instance, and parse the corresponding K-V structure from the original event for the target instance. Then we need to write the core parser module, which mainly contains three aspects:

  • The onlyBIGINTType primary key resolution (this is an iron rule of the company specification,MySQLOnly one table can be defined for each tableBIGINT UNSIGNEDAuto-trending primary key).
  • Data before the change, corresponding to the original eventoldAttribute node (does not necessarily exist, for exampleINSERTThis attribute node does not exist in the statement.
  • The modified data corresponds to the original eventdataProperty node.

Define the parser interface CanalBinLogEventParser as follows:

public interface CanalBinLogEventParser {

    /** * Parse the binlog event **@paramThe event event *@paramKlass target type *@paramPrimaryKeyFunction Primary key mapping method *@paramCommonEntryFunction Other attribute mapping methods *@return CanalBinLogResult
     */
    <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event,
                                         Class<T> klass,
                                         BasePrimaryKeyTupleFunction primaryKeyFunction,
                                         BaseCommonEntryFunction<T> commonEntryFunction);
}
Copy the code

The parser’s parsing method depends on:

  • binlogEvent instance, which is the result of the upstream adapter component.
  • The target type of the conversion.
  • BasePrimaryKeyTupleFunctionPrimary key mapping method instance, default use built-inBigIntPrimaryKeyTupleFunction.
  • BaseCommonEntryFunctionNon-primary key generic column-attribute mapping method instance, which is built in by defaultReflectionBinLogEntryFunction(Reflection is used in the non-primary key column transformation core).

The result is a List, because the FlatMessage data structure is a List

>.

Developing the processor layer

The handler is the entry point for the developer to process the final parsed entity. It just needs to select the corresponding processing method for each type of event, which looks like this:

public abstract class BaseCanalBinlogEventProcessor<T> extends BaseParameterizedTypeReferenceSupport<T> {

    protected void processInsertInternal(CanalBinLogResult<T> result) {}protected void processUpdateInternal(CanalBinLogResult<T> result) {}protected void processDeleteInternal(CanalBinLogResult<T> result) {}protected void processDDLInternal(CanalBinLogResult<T> result) {}}Copy the code

To deal with the Insert events, for example, the subclass inherits BaseCanalBinlogEventProcessor, corresponding to the entity class (generic substitution) using the @ CanalModel annotations statement, then cover processInsertInternal () method. Subhandlers can override custom exception handler instances, such as:

@Override
protected ExceptionHandler exceptionHandler(a) {
    return EXCEPTION_HANDLER;
}

/** * override the default ExceptionHandler.NO_OP */
private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
        -> log.error(Error parsing binlog event :{}, JSON.toJSONString(event), throwable);
Copy the code

In addition, some of the scenes to the results of the correction before or after the callback do specialized processing, thus introduces the realization of the analytical results blocker (chain), the corresponding class is BaseParseResultInterceptor:

public abstract class BaseParseResultInterceptor<T> extends BaseParameterizedTypeReferenceSupport<T> {

    public BaseParseResultInterceptor(a) {
        super(a); }public void onParse(ModelTable modelTable) {}public void onBeforeInsertProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onAfterInsertProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onBeforeUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onAfterUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onBeforeDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onAfterDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {}public void onBeforeDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {}public void onAfterDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {}public void onParseFinish(ModelTable modelTable) {}public void onParseCompletion(ModelTable modelTable) {}}Copy the code

Analytical results of interceptor callback time can see the architecture diagram above or BaseCanalBinlogEventProcessor source code.

Develop the global component auto-configuration module

If you use the Spring container, need to add a configuration class to load all existing components, add a global configuration class CanalGlueAutoConfiguration (this class can be in the project of Spring – the boot – starter – canal – see glue module, This module has only one class) :

@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton.BeanFactoryAware {

    private ConfigurableListableBeanFactory configurableListableBeanFactory;

    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory(a) {
        return InMemoryCanalBinlogEventProcessorFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory(a) {
        return InMemoryCanalFieldConverterFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser(a) {
        return DefaultCanalBinLogEventParser.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }

    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return DefaultCanalGlue.of(canalBinlogEventProcessorFactory);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated(a) { ParseResultInterceptorManager parseResultInterceptorManager = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class); ModelTableMetadataManager modelTableMetadataManager = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class); CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class); CanalBinLogEventParser canalBinLogEventParser = configurableListableBeanFactory.getBean(CanalBinLogEventParser.class); Map<String, BaseParseResultInterceptor> interceptors = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class); interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor)); Map<String, BaseCanalBinlogEventProcessor> processors = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class); processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager, canalBinlogEventProcessorFactory, parseResultInterceptorManager)); }}Copy the code

To make it easier for other services to import this configuration class, you can use the spring.Factories feature. Create a new resources/ meta-INF /spring.factories file with the following content:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.throwx.canal.gule.config.CanalGlueAutoConfiguration
Copy the code

This way, by introducing spring-boot-starter-canal-glue, you activate all the components used and initialize all the processors that have been added to the Spring container.

CanalGlue development

CanalGlue is essentially a processing entry that provides binlog event strings, currently defined as an interface:

public interface CanalGlue {

    void process(String content);
}
Copy the code

The implementation of this interface, DefaultCanalGlue, is also simple:

@RequiredArgsConstructor(access = AccessLevel.PUBLIC, staticName = "of")
public class DefaultCanalGlue implements CanalGlue {

    private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;

    @Override
    public void process(String content) { CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content); ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable()); canalBinlogEventProcessorFactory.get(modelTable).forEach(processor -> processor.process(event)); }}Copy the code

Using the source adapter convert a string to CanalBinLogEvent instance, recommit processor factory to find the corresponding BaseCanalBinlogEventProcessor list to process the input event instance.

Use the canal – glue

It mainly includes the following dimensions, all under the test package of Canal-glues -example:

  • In general, processor processing is usedINSERTEvents.
  • Custom targetDDLChanges to the alert parent processor implementationDDLChange warning.
  • A single table corresponds to multiple processors.
  • Use the parse result handler for specific fieldsAESEncryption and decryption processing.
  • SpringContainer, general programmatic use.
  • useopenjdk-jmhforBenchmarkBenchmark performance tests.

Here is a brief description of the usage mode in the Spring system, and the introduction of relying on spring-boot-starter-canal-glue:

<dependency>
    <groupId>cn.throwx</groupId>
    <artifactId>spring-boot-starter-canal-glue</artifactId>
    <version>The version number</version>
</dependency>
Copy the code

Write an entity or DTO class OrderModel:

@Data
@CanalModel(database = "db_order_service", table = "t_order", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public static class OrderModel {

    private Long id;

    private String orderId;

    private OffsetDateTime createTime;

    private BigDecimal amount;
}
Copy the code

The @canalModel annotation is used to bind the database DB_ORDER_service to the table T_ORDER. The attribute name – column name mapping strategy is hump to lowercase underscore. We then define a processor OrderProcessor and a custom exception handler (optional, here to simulate throwing custom exceptions when processing events) :

@Component
public class OrderProcessor extends BaseCanalBinlogEventProcessor<OrderModel> {

    @Override
    protected void processInsertInternal(CanalBinLogResult<OrderModel> result) {
        OrderModel orderModel = result.getAfterData();
        logger.info("Received order save binlog, primary key :{}, simulated throw exception...", orderModel.getId());
        throw new RuntimeException(String.format("[id:%d]", orderModel.getId()));
    }

    @Override
    protected ExceptionHandler exceptionHandler(a) {
        return EXCEPTION_HANDLER;
    }

    /**
        * 覆盖默认的ExceptionHandler.NO_OP
        */
    private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
            -> log.error(Error parsing binlog event :{}, JSON.toJSONString(event), throwable);
}
Copy the code

Suppose a binlog event writes order data as follows:

{
  "data": [{"id": "1"."order_id": "10086"."amount": "999.0"."create_time": "The 2020-03-02 05:12:49"}]."database": "db_order_service"."es": 1583143969000."id": 3."isDdl": false."mysqlType": {
    "id": "BIGINT"."order_id": "VARCHAR(64)"."amount": "A DECIMAL (10, 2)"."create_time": "DATETIME"
  },
  "old": null."pkNames": [
    "id"]."sql": ""."sqlType": {
    "id": - 5."order_id": 12."amount": 3."create_time": 93
  },
  "table": "t_order"."ts": 1583143969460."type": "INSERT"
}
Copy the code

The result is as follows:

It is also easy to connect directly to the Topic that Canal put into Kafka, and the following example is used by Kafka consumers:

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {

    private final CanalGlue canalGlue;

    @KafkaListener( id = "${canal.event.order.listener.id:db-order-service-listener}", topics = "db_order_service", containerFactory = "kafkaListenerContainerFactory" )
    public void onCrmMessage(String content) { canalGlue.process(content); }}Copy the code

summary

The original intention of the author to develop this Canal-Glue was to make a large string converter that greatly improved efficiency, because I just came into the field of “small data” and had to deal with a large number of reports downstream, because it was impossible to spend a lot of manpower to deal with the repeated template code. Although the overall design is not very elegant, canal-Glue did it at least in terms of improving development efficiency.

Project Warehouse:

  • Gitee:https://gitee.com/throwableDoge/canal-glue

The latest repository code is temporarily placed in the Develop branch.

(The c-15-D E-A-20201005 has been completed for nearly a month.)