This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021.

In the previous section, we discussed canal’s configuration and MySQL’s configuration, and implemented canal’s dump protocol to MySQL Mater by disguising MySQL’s slave and emulating MySQL’s slave’s interaction protocol. MySQL Mater received a dump request from Canal and began pushing the binary log to Canal, which then parsed the binary log and sent it to storage destinations such as MySQL, Kafka, Elastic Search, and so on.

Today we connect to RabbitMQ via canal, write incremental data to RabbitMQ, and then process it to mysql or Elastic.

Canal configuration:

The configuration file../canal/canal.properties

You need to obtain mq connection information in advance and establish the corresponding Exchange.

# tcp, kafka, rocketMQ, The rabbitMQ canal. ServerMode = the rabbitMQ # here into the rabbitMQ pattern # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # The RabbitMQ # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # configure RabbitMQ connection information the RabbitMQ. Host = 192.168.1.1 Rabbitmq. virtual.host = / rabbitmq.exchange = exchange. Password = admin Rabbitmq. deliveryMode = Fanout # ExchangeCopy the code

configuration/.. /canal/instance.properties

Configure the binlog file name and positon, log in to MySQL, and run the show master status command. You can see master node information, binlog File name, positon and binlog_Ignore_DB information.

Adding a Configuration

Canal. The instance. The master. The journal. The name is the File field 【 mysql – bin. 004911] binlog File name

Canal. The instance. The master. The position is the offset position binlog 【 471834950 】

. Fill in information of mysql # # position info canal. The instance. The master. The address = 192.168.1.1:3306 canal.instance.master.journal.name=mysql-bin.004911 canal.instance.master.position=471834950 canal.instance.master.timestamp= canal.instance.master.gtid= ... # # fill in mysql user information, the username/password canal. The instance. The dbUsername = canal canal. The instance. The dbPassword = canal # assigned to monitor a library, and tables canal.instance.filter.regex=.*\.. * # [.*\..*]; [test..*] stands for listening on all tables under the test library.Copy the code

Start the canal. In Windows, startup.bat starts the Canal Server script and watches the RabbitMQ logs.

  • Observe the logtail -f /tmp/canal/logs/example/example.log
  • Observe MQ: Create a queue to connect to exchange and see if the data can be received

Configure exchange and queue for RabbitMQ.

@Bean public RabbitListenerContainerFactory<? > rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Manually confirm return Factory after messageConverter is changed; }Copy the code
@RabbitListener( queues = {"testDirectQueue"}) public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {// Print data String message = new String(msg.getBody(), standardCharsets.utf_8); Log.info (" queue consume message {}"+message); //channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false); }Copy the code

Observe the log output of the listening queue.

I’m a little bit pressed for time today, so THAT’s it.