sequence

This paper mainly studies the SimpleCanalConnector of Canal-Go

SimpleCanalConnector

Canal – go – v1.0.7 / client/simple_canal_connector. Go

type SimpleCanalConnector struct {
	Address           string
	Port              int
	UserName          string
	PassWord          string
	SoTime            int32
	IdleTimeOut       int32
	ClientIdentity    pb.ClientIdentity
	Connected         bool
	Running           bool
	Filter            string
	RollbackOnConnect bool
	LazyParseEntry    bool
}
Copy the code
  • SimpleCanalConnector defines Address, Port, UserName, PassWord, SoTime, IdleTimeOut, ClientIdentity, Connected, Running, Filter, and Rollbac KOnConnect, LazyParseEntry attributes

NewSimpleCanalConnector

Canal – go – v1.0.7 / client/simple_canal_connector. Go

/ / NewSimpleCanalConnector create instances func SimpleCanalConnector NewSimpleCanalConnector (address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector { s := &SimpleCanalConnector{ Address: address, Port: port, UserName: username, PassWord: password, ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001}, SoTime: soTimeOut, IdleTimeOut: idleTimeOut, RollbackOnConnect:true,}return s

}
Copy the code
  • The NewSimpleCanalConnector method creates an instance of SimpleCanalConnector

Connect

Canal – go – v1.0.7 / client/simple_canal_connector. Go

//Connect Connect to canal-server func (c *SimpleCanalConnector) Connect() error {if c.Connected {
		return nil
	}

	if c.Running {
		return nil
	}

	err := c.doConnect()
	iferr ! = nil {return err
	}
	ifc.Filter ! ="" {
		c.Subscribe(c.Filter)
	}

	if c.RollbackOnConnect {
		c.waitClientRunning()

		c.RollBack(0)
	}

	c.Connected = true
	return nil

}
Copy the code
  • The Connect methods mainly execute c. doconnect () and C. Subscribe(c.filter). If RollbackOnConnect is true, the c. waitclientrunning () and c. rolback (0) methods are executed

DisConnection

Canal – go – v1.0.7 / client/simple_canal_connector. Go

//DisConnection close the connection func (c *SimpleCanalConnector)DisConnection() {
	if c.RollbackOnConnect && c.Connected == true {
		c.RollBack(0)
	}
	c.Connected = falseQuitelyClose ()} //quitelyClose gracefully closes funcquitelyClose() {
	ifconn ! = nil { conn.Close() } }Copy the code
  • The DisConnection method basically executes conn.close ()

doConnect

Canal – go – v1.0.7 / client/simple_canal_connector. Go

//doCanal-server func (c SimpleCanalConnector)doConnect() error {
	address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
	con, err := net.Dial("tcp", address)
	iferr ! = nil {return err
	}
	conn = con

	p := new(pb.Packet)
	data, err := readNextPacket()
	iferr ! = nil {return err
	}
	err = proto.Unmarshal(data, p)
	iferr ! = nil {return err
	}
	ifp ! = nil {ifp.GetVersion() ! = 1 { panic("unsupported version at this client.")}ifp.GetType() ! = pb.PacketType_HANDSHAKE { panic("expect handshake but found other type.")
		}

		handshake := &pb.Handshake{}
		err = proto.Unmarshal(p.GetBody(), handshake)
		iferr ! = nil {return err
		}
		pas := []byte(c.PassWord)
		ca := &pb.ClientAuth{
			Username:               c.UserName,
			Password:               pas,
			NetReadTimeoutPresent:  &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
			NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
		}
		caByteArray, _ := proto.Marshal(ca)
		packet := &pb.Packet{
			Type: pb.PacketType_CLIENTAUTHENTICATION,
			Body: caByteArray,
		}

		packArray, _ := proto.Marshal(packet)

		WriteWithHeader(packArray)

		pp, err := readNextPacket()
		iferr ! = nil {return err
		}
		pk := &pb.Packet{}

		err = proto.Unmarshal(pp, pk)
		iferr ! = nil {return err
		}

		ifpk.Type ! = pb.PacketType_ACK { panic("unexpected packet type when ack is expected")
		}

		ackBody := &pb.Ack{}
		err = proto.Unmarshal(pk.GetBody(), ackBody)
		iferr ! = nil {return err
		}
		if ackBody.GetErrorCode() > 0 {

			panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
		}

		c.Connected = true

	}
	return nil

}
Copy the code
  • The doConnect method establishes the connection through net.dial (” TCP “, address), reads the data through readNextPacket, and parses it through proto.unmarshal (data, P). Then send PacketType_CLIENTAUTHENTICATION data for authentication. If ack succeeds, set C. connected to true

GetWithOutAck

Canal – go – v1.0.7 / client/simple_canal_connector. Go

Func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { c.waitClientRunning()if! c.Running {return nil, nil
	}
	var size int32

	if batchSize < 0 {
		size = 1000
	} else {
		size = batchSize
	}
	var time *int64
	var t int64
	t = -1
	if timeOut == nil {
		time = &t
	} else {
		time = timeOut
	}
	var i int32
	i = -1
	if units == nil {
		units = &i
	}
	get := new(pb.Get)
	get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
	get.Destination = c.ClientIdentity.Destination
	get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
	get.FetchSize = size
	get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
	get.UnitPresent = &pb.Get_Unit{Unit: *units}

	getBody, err := proto.Marshal(get)
	iferr ! = nil {return nil, err
	}
	packet := new(pb.Packet)
	packet.Type = pb.PacketType_GET
	packet.Body = getBody
	pa, err := proto.Marshal(packet)
	iferr ! = nil {return nil, err
	}
	WriteWithHeader(pa)
	message, err := c.receiveMessages()
	iferr ! = nil {return nil, err
	}
	return message, nil
}
Copy the code
  • The GetWithOutAck method mainly executes WriteWithHeader(PA) and c.reeivemessages ().

Get

Canal – go – v1.0.7 / client/simple_canal_connector. Go

Func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { message, err := c.GetWithOutAck(batchSize, timeOut, units)iferr ! = nil {return nil, err
	}
	err = c.Ack(message.Id)
	iferr ! = nil {return nil, err
	}
	return message, nil
}
Copy the code
  • C. getwithoutack (batchSize, timeOut, units) and then c. ck(message.id)

Ack

Canal – go – v1.0.7 / client/simple_canal_connector. Go

Func (c *SimpleCanalConnector) Ack(batchId int64) error { c.waitClientRunning()if! c.Running {return nil
	}

	ca := new(pb.ClientAck)
	ca.Destination = c.ClientIdentity.Destination
	ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
	ca.BatchId = batchId

	clientAck, err := proto.Marshal(ca)
	iferr ! = nil {return err
	}
	pa := new(pb.Packet)
	pa.Type = pb.PacketType_CLIENTACK
	pa.Body = clientAck
	pack, err := proto.Marshal(pa)
	iferr ! = nil {return err
	}
	WriteWithHeader(pack)
	return nil

}
Copy the code
  • The Ack method mainly sends pb.packettyPE_clientACK

Subscribe

Canal – go – v1.0.7 / client/simple_canal_connector. Go

//Subscribe func (c *SimpleCanalConnector) Subscribe(filter string) error {c. waitclientrunning ()if! c.Running {return nil
	}
	body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
	pack := new(pb.Packet)
	pack.Type = pb.PacketType_SUBSCRIPTION
	pack.Body = body

	packet, _ := proto.Marshal(pack)
	WriteWithHeader(packet)

	p := new(pb.Packet)

	paBytes, err := readNextPacket()
	iferr ! = nil {return err
	}
	err = proto.Unmarshal(paBytes, p)
	iferr ! = nil {return err
	}
	ack := new(pb.Ack)
	err = proto.Unmarshal(p.Body, ack)
	iferr ! = nil {return err
	}

	if ack.GetErrorCode() > 0 {
		return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
	}

	c.Filter = filter

	return nil
}
Copy the code
  • The Subscribe method basically sends pb.PacketType_SUBSCRIPTION

UnSubscribe

Canal – go – v1.0.7 / client/simple_canal_connector. Go

//UnSubscribe func (c *SimpleCanalConnector) UnSubscribe() error {c. waitclientrunning ()if c.Running {
		return nil
	}

	us := new(pb.Unsub)
	us.Destination = c.ClientIdentity.Destination
	us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)

	unSub, err := proto.Marshal(us)
	iferr ! = nil {return err
	}

	pa := new(pb.Packet)
	pa.Type = pb.PacketType_UNSUBSCRIPTION
	pa.Body = unSub

	pack, err := proto.Marshal(pa)
	WriteWithHeader(pack)

	p, err := readNextPacket()
	iferr ! = nil {return err
	}
	pa = nil
	err = proto.Unmarshal(p, pa)
	iferr ! = nil {return err
	}
	ack := new(pb.Ack)
	err = proto.Unmarshal(pa.Body, ack)
	iferr ! = nil {return err
	}
	if ack.GetErrorCode() > 0 {
		panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
	}
	return nil
}
Copy the code
  • The UnSubscribe method basically sends pb.PacketType_UNSUBSCRIPTION

RollBack

Canal – go – v1.0.7 / client/simple_canal_connector. Go

Func (c *SimpleCanalConnector) RollBack(batchId int64) error {c. waitclientrunning () cb := new(pb.ClientRollback) cb.Destination = c.ClientIdentity.Destination cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) cb.BatchId = batchId clientBollBack, err := proto.Marshal(cb)iferr ! = nil {return err
	}

	pa := new(pb.Packet)
	pa.Type = pb.PacketType_CLIENTROLLBACK
	pa.Body = clientBollBack
	pack, err := proto.Marshal(pa)
	iferr ! = nil {return err
	}
	WriteWithHeader(pack)
	return nil
}
Copy the code
  • The RollBack method basically sends pb.packettyPE_clientrollback

summary

SimpleCanalConnector defines Address, Port, UserName, PassWord, SoTime, IdleTimeOut, ClientIdentity, Connected, Running, Filter, and Rollbac KOnConnect, LazyParseEntry attributes; It provides Connect, DisConnection, GetWithOutAck, Get, Ack, Subscribe, UnSubscribe, RollBack methods

doc

  • simple_canal_connector