sequence
This article focuses on SimpleCanalConnector’s getWithoutAck
getWithoutAck
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector. Java
public class SimpleCanalConnector implements CanalConnector { private static final Logger logger = LoggerFactory.getLogger(SimpleCanalConnector.class); private SocketAddress address; private String username; private String password; private int soTimeout = 60000; // milliseconds private int idleTimeout = 60 * 60 * 1000; // Timeout duration for idle connections between the client and server. The default value is 1 hour. // Records the last filter submission value to facilitate the submission of private final ByteBuffer in automatic retriesreadHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN); private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN); private SocketChannel channel; private ReadableByteChannel readableChannel; private WritableByteChannel writableChannel; private List<Compression> supportedCompressions = new ArrayList<Compression>(); private ClientIdentity clientIdentity; private ClientRunningMonitor runningMonitor; // Run control private ZkClientx ZkClientx; private BooleanMutex mutex = new BooleanMutex(false);
private volatile boolean connected = false; Private Boolean rollbackOnConnect = // Indicates whether connected is running properly. Private Boolean rollbackOnConnect =true; Private Boolean rollbackOnDisConnect = Specifies whether to rollback automatically after the connect connection succeedsfalse; Private Boolean lazyParseEntry = Specifies whether to rollback automatically after the connect is successfully connectedfalse; // Read and write data are controlled by different locks to reduce the lock granularity. Read and write data also need exclusive locks. The concurrency is easy to cause packet chaos and deserialization failurereadDataLock = new Object();
private Object writeDataLock = new Object();
private volatile boolean running = false; / /... public Message getWithoutAck(int batchSize) throws CanalClientException {return getWithoutAck(batchSize, null, null);
}
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
waitClientRunning();
if(! running) {returnnull; } try { int size = (batchSize <= 0) ? 1000 : batchSize; long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1 indicates no timeout controlif (unit == null) {
unit = TimeUnit.MILLISECONDS;
}
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
.build()
.toByteArray());
returnreceiveMessages(); } catch (IOException e) { throw new CanalClientException(e); }} / /... }Copy the code
- The getWithoutAck method executes writeWithHeader first, then receiveMessages. WriteWithHeader Packet is of the GET type. AutoAck of the body is set to false. Destination, clientId, fetchSize, timeout, and unit are also set
writeWithHeader
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector. Java
public class SimpleCanalConnector implements CanalConnector { //...... private void writeWithHeader(byte[] body) throws IOException { writeWithHeader(writableChannel, body); } private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException { synchronized (writeDataLock) { writeHeader.clear(); writeHeader.putInt(body.length); writeHeader.flip(); channel.write(writeHeader); channel.write(ByteBuffer.wrap(body)); }} / /... }Copy the code
- WriteWithHeader writes the length of the body to the header, then to the header, then to the body
receiveMessages
Canal – 1.1.4 / client/SRC/main/Java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector. Java
public class SimpleCanalConnector implements CanalConnector {
//......
private Message receiveMessages() throws IOException {
byte[] data = readNextPacket();
return CanalMessageDeserializer.deserializer(data, lazyParseEntry);
}
private byte[] readNextPacket() throws IOException {
return readNextPacket(readableChannel);
}
private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {
synchronized (readDataLock) {
readHeader.clear();
read(channel, readHeader);
int bodyLen = readHeader.getInt(0);
ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
read(channel, bodyBuf);
returnbodyBuf.array(); }} / /... }Copy the code
- The receiveMessages method executes the readNextPacket method, which reads the header from the read method to get the body length, then reads the body from the read method, and finally returns the body
summary
The getWithoutAck method executes writeWithHeader first, then receiveMessages. WriteWithHeader Packet is of GET type. AutoAck of body is set to false, destination, clientId, fetchSize, timeout and unit are also set. The receiveMessages method executes the readNextPacket method, which reads the header from the read method to get the body length, then reads the body from the read method, and finally returns the body
doc
- SimpleCanalConnector