sequence
This paper mainly studies the SimpleCanalConnector of Canal-Go
SimpleCanalConnector
Canal – go – v1.0.7 / client/simple_canal_connector. Go
type SimpleCanalConnector struct {
Address string
Port int
UserName string
PassWord string
SoTime int32
IdleTimeOut int32
ClientIdentity pb.ClientIdentity
Connected bool
Running bool
Filter string
RollbackOnConnect bool
LazyParseEntry bool
}
Copy the code
- SimpleCanalConnector defines Address, Port, UserName, PassWord, SoTime, IdleTimeOut, ClientIdentity, Connected, Running, Filter, and Rollbac KOnConnect, LazyParseEntry attributes
NewSimpleCanalConnector
Canal – go – v1.0.7 / client/simple_canal_connector. Go
/ / NewSimpleCanalConnector create instances func SimpleCanalConnector NewSimpleCanalConnector (address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector { s := &SimpleCanalConnector{ Address: address, Port: port, UserName: username, PassWord: password, ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001}, SoTime: soTimeOut, IdleTimeOut: idleTimeOut, RollbackOnConnect:true,}return s
}
Copy the code
- The NewSimpleCanalConnector method creates an instance of SimpleCanalConnector
Connect
Canal – go – v1.0.7 / client/simple_canal_connector. Go
//Connect Connect to canal-server func (c *SimpleCanalConnector) Connect() error {if c.Connected {
return nil
}
if c.Running {
return nil
}
err := c.doConnect()
iferr ! = nil {return err
}
ifc.Filter ! ="" {
c.Subscribe(c.Filter)
}
if c.RollbackOnConnect {
c.waitClientRunning()
c.RollBack(0)
}
c.Connected = true
return nil
}
Copy the code
- The Connect methods mainly execute c. doconnect () and C. Subscribe(c.filter). If RollbackOnConnect is true, the c. waitclientrunning () and c. rolback (0) methods are executed
DisConnection
Canal – go – v1.0.7 / client/simple_canal_connector. Go
//DisConnection close the connection func (c *SimpleCanalConnector)DisConnection() {
if c.RollbackOnConnect && c.Connected == true {
c.RollBack(0)
}
c.Connected = falseQuitelyClose ()} //quitelyClose gracefully closes funcquitelyClose() {
ifconn ! = nil { conn.Close() } }Copy the code
- The DisConnection method basically executes conn.close ()
doConnect
Canal – go – v1.0.7 / client/simple_canal_connector. Go
//doCanal-server func (c SimpleCanalConnector)doConnect() error {
address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
con, err := net.Dial("tcp", address)
iferr ! = nil {return err
}
conn = con
p := new(pb.Packet)
data, err := readNextPacket()
iferr ! = nil {return err
}
err = proto.Unmarshal(data, p)
iferr ! = nil {return err
}
ifp ! = nil {ifp.GetVersion() ! = 1 { panic("unsupported version at this client.")}ifp.GetType() ! = pb.PacketType_HANDSHAKE { panic("expect handshake but found other type.")
}
handshake := &pb.Handshake{}
err = proto.Unmarshal(p.GetBody(), handshake)
iferr ! = nil {return err
}
pas := []byte(c.PassWord)
ca := &pb.ClientAuth{
Username: c.UserName,
Password: pas,
NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
}
caByteArray, _ := proto.Marshal(ca)
packet := &pb.Packet{
Type: pb.PacketType_CLIENTAUTHENTICATION,
Body: caByteArray,
}
packArray, _ := proto.Marshal(packet)
WriteWithHeader(packArray)
pp, err := readNextPacket()
iferr ! = nil {return err
}
pk := &pb.Packet{}
err = proto.Unmarshal(pp, pk)
iferr ! = nil {return err
}
ifpk.Type ! = pb.PacketType_ACK { panic("unexpected packet type when ack is expected")
}
ackBody := &pb.Ack{}
err = proto.Unmarshal(pk.GetBody(), ackBody)
iferr ! = nil {return err
}
if ackBody.GetErrorCode() > 0 {
panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
}
c.Connected = true
}
return nil
}
Copy the code
- The doConnect method establishes the connection through net.dial (” TCP “, address), reads the data through readNextPacket, and parses it through proto.unmarshal (data, P). Then send PacketType_CLIENTAUTHENTICATION data for authentication. If ack succeeds, set C. connected to true
GetWithOutAck
Canal – go – v1.0.7 / client/simple_canal_connector. Go
Func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { c.waitClientRunning()if! c.Running {return nil, nil
}
var size int32
if batchSize < 0 {
size = 1000
} else {
size = batchSize
}
var time *int64
var t int64
t = -1
if timeOut == nil {
time = &t
} else {
time = timeOut
}
var i int32
i = -1
if units == nil {
units = &i
}
get := new(pb.Get)
get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
get.Destination = c.ClientIdentity.Destination
get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
get.FetchSize = size
get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
get.UnitPresent = &pb.Get_Unit{Unit: *units}
getBody, err := proto.Marshal(get)
iferr ! = nil {return nil, err
}
packet := new(pb.Packet)
packet.Type = pb.PacketType_GET
packet.Body = getBody
pa, err := proto.Marshal(packet)
iferr ! = nil {return nil, err
}
WriteWithHeader(pa)
message, err := c.receiveMessages()
iferr ! = nil {return nil, err
}
return message, nil
}
Copy the code
- The GetWithOutAck method mainly executes WriteWithHeader(PA) and c.reeivemessages ().
Get
Canal – go – v1.0.7 / client/simple_canal_connector. Go
Func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { message, err := c.GetWithOutAck(batchSize, timeOut, units)iferr ! = nil {return nil, err
}
err = c.Ack(message.Id)
iferr ! = nil {return nil, err
}
return message, nil
}
Copy the code
- C. getwithoutack (batchSize, timeOut, units) and then c. ck(message.id)
Ack
Canal – go – v1.0.7 / client/simple_canal_connector. Go
Func (c *SimpleCanalConnector) Ack(batchId int64) error { c.waitClientRunning()if! c.Running {return nil
}
ca := new(pb.ClientAck)
ca.Destination = c.ClientIdentity.Destination
ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
ca.BatchId = batchId
clientAck, err := proto.Marshal(ca)
iferr ! = nil {return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTACK
pa.Body = clientAck
pack, err := proto.Marshal(pa)
iferr ! = nil {return err
}
WriteWithHeader(pack)
return nil
}
Copy the code
- The Ack method mainly sends pb.packettyPE_clientACK
Subscribe
Canal – go – v1.0.7 / client/simple_canal_connector. Go
//Subscribe func (c *SimpleCanalConnector) Subscribe(filter string) error {c. waitclientrunning ()if! c.Running {return nil
}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
pack.Body = body
packet, _ := proto.Marshal(pack)
WriteWithHeader(packet)
p := new(pb.Packet)
paBytes, err := readNextPacket()
iferr ! = nil {return err
}
err = proto.Unmarshal(paBytes, p)
iferr ! = nil {return err
}
ack := new(pb.Ack)
err = proto.Unmarshal(p.Body, ack)
iferr ! = nil {return err
}
if ack.GetErrorCode() > 0 {
return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
}
c.Filter = filter
return nil
}
Copy the code
- The Subscribe method basically sends pb.PacketType_SUBSCRIPTION
UnSubscribe
Canal – go – v1.0.7 / client/simple_canal_connector. Go
//UnSubscribe func (c *SimpleCanalConnector) UnSubscribe() error {c. waitclientrunning ()if c.Running {
return nil
}
us := new(pb.Unsub)
us.Destination = c.ClientIdentity.Destination
us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
unSub, err := proto.Marshal(us)
iferr ! = nil {return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_UNSUBSCRIPTION
pa.Body = unSub
pack, err := proto.Marshal(pa)
WriteWithHeader(pack)
p, err := readNextPacket()
iferr ! = nil {return err
}
pa = nil
err = proto.Unmarshal(p, pa)
iferr ! = nil {return err
}
ack := new(pb.Ack)
err = proto.Unmarshal(pa.Body, ack)
iferr ! = nil {return err
}
if ack.GetErrorCode() > 0 {
panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
}
return nil
}
Copy the code
- The UnSubscribe method basically sends pb.PacketType_UNSUBSCRIPTION
RollBack
Canal – go – v1.0.7 / client/simple_canal_connector. Go
Func (c *SimpleCanalConnector) RollBack(batchId int64) error {c. waitclientrunning () cb := new(pb.ClientRollback) cb.Destination = c.ClientIdentity.Destination cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) cb.BatchId = batchId clientBollBack, err := proto.Marshal(cb)iferr ! = nil {return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTROLLBACK
pa.Body = clientBollBack
pack, err := proto.Marshal(pa)
iferr ! = nil {return err
}
WriteWithHeader(pack)
return nil
}
Copy the code
- The RollBack method basically sends pb.packettyPE_clientrollback
summary
SimpleCanalConnector defines Address, Port, UserName, PassWord, SoTime, IdleTimeOut, ClientIdentity, Connected, Running, Filter, and Rollbac KOnConnect, LazyParseEntry attributes; It provides Connect, DisConnection, GetWithOutAck, Get, Ack, Subscribe, UnSubscribe, RollBack methods
doc
- simple_canal_connector