1. An overview of the
The previous article covered starting coDIS-Proxy and processing requests. This article focuses on how CODIS-Proxy and CODIS-Server join the cluster (go live) before processing the request, and also provides a brief analysis of sentinel integration.
2. The emphasis
Codis – proxy startup
- Load the routing table
- Service registration (how the client invokes)
Codis – server startup
- How does CODIS-proxy discover redis nodes
3. Source code analysis
Codis-proxy is added to the cluster
As mentioned in the previous article, when codis-proxy is in wait state, it cannot process requests directly. It must call fillSlot method to fill the routing table before it can go online. So let’s focus on when this process is triggered.
Before we get to that, we need to know about coDIS’s Topom module: The Topom module is primarily the kernel for the Dashboard API. This means that all API requests from the console are handled by TopOM (which you can think of as a Web service in Java).
In fact, there are a few flaws in the design, such as topom single point. Why single point? The author also answered: Because Topom is directly interacting with ZK, a single TopOM has a lot of pressure on ZK, so it can only be implemented in the way of single point + process cache.
topom_proxy#CreateProxy
This method is called by an HTTP interface to topom_API. That is, when the console new Proxy triggers the call.
func (s *Topom) CreateProxy(addr string) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
iferr ! =nil {
return err
}
p, err := proxy.NewApiClient(addr).Model()
iferr ! =nil {
return errors.Errorf("proxy@%s fetch model failed, %s", addr, err)
}
c := s.newProxyClient(p)
iferr := c.XPing(); err ! =nil {
return errors.Errorf("proxy@%s check xauth failed, %s", addr, err)
}
ifctx.proxy[p.Token] ! =nil {
return errors.Errorf("proxy-[%s] already exists", p.Token)
} else {
p.Id = ctx.maxProxyId() + 1
}
defer s.dirtyProxyCache(p.Token)
iferr := s.storeCreateProxy(p); err ! =nil {
return err
} else {
return s.reinitProxy(ctx, p, c)
}
}
Copy the code
1. Call the s.newContext method, which initializes a CTX and obtains a current cluster metadata (routing information, sentinel information, etc.). For subsequent logical processing, most operations in Topom call this method first. To recap, this method is to fetch data from memory, and if the data is found to be empty, zK will fetch it. That’s why it’s a la Carte.
2. Obtain model information from proxy through HTTP. Model is actually the node information corresponding to proxy startup.
3. Ping the proxy to ensure that it is alive
4. If the proxy is added to the current cluster, an error message is displayed. Use token for unique distinction.
5. Store proxy data to the ZK. The reinitProxy method is then called.
Note: For each cluster metadata update method, defer and hooks ensure that exceptions are performed to clean up the data and avoid dirty data.
topom_proxy#reinitProxy
func (s *Topom) reinitProxy(ctx *context, p *models.Proxy, c *proxy.ApiClient) error {
log.Warnf("proxy-[%s] reinit:\n%s", p.Token, p.Encode())
iferr := c.FillSlots(ctx.toSlotSlice(ctx.slots, p)...) ; err ! =nil {
log.ErrorErrorf(err, "proxy-[%s] fillslots failed", p.Token)
return errors.Errorf("proxy-[%s] fillslots failed", p.Token)
}
iferr := c.Start(); err ! =nil {
log.ErrorErrorf(err, "proxy-[%s] start failed", p.Token)
return errors.Errorf("proxy-[%s] start failed", p.Token)
}
iferr := c.SetSentinels(ctx.sentinel); err ! =nil {
log.ErrorErrorf(err, "proxy-[%s] set sentinels failed", p.Token)
return errors.Errorf("proxy-[%s] set sentinels failed", p.Token)
}
return nil
}
Copy the code
This method basically calls the HTTP method of proxy, not surprisingly, c.fillslots first, then start. Finally, set up the corresponding Sentinels to make the cluster highly available.
FillSlots
This eventually calls the fillSlot method of router.go. In fact, it initializes solT data for the routing table. The code is relatively simple, so let’s focus on the next few lines.
if addr := m.BackendAddr; len(addr) ! =0 {
slot.backend.bc = s.pool.primary.Retain(addr)
slot.backend.id = m.BackendAddrGroupId
}
if from := m.MigrateFrom; len(from) ! =0 {
slot.migrate.bc = s.pool.primary.Retain(from)
slot.migrate.id = m.MigrateFromGroupId
}
Copy the code
Backend and Migrate are initialized. Backend, Migrate, and redis are used to process requests and migrate data.
NewSharedBackendConn method
S.pool.primary. Retain mainly calls newSharedBackendConn to initialize the connection
func newSharedBackendConn(addr string, pool *sharedBackendConnPool) *sharedBackendConn {
host, port, err := net.SplitHostPort(addr)
iferr ! =nil {
log.ErrorErrorf(err, "split host-port failed, address = %s", addr)
}
s := &sharedBackendConn{
addr: addr,
host: []byte(host), port: []byte(port),
}
s.owner = pool
s.conns = make([][]*BackendConn, pool.config.BackendNumberDatabases)
for database := range s.conns {
parallel := make([]*BackendConn, pool.parallel)
for i := range parallel {
parallel[i] = NewBackendConn(addr, database, pool.config)
}
s.conns[database] = parallel
}
if pool.parallel == 1 {
s.single = make([]*BackendConn, len(s.conns))
for database := range s.conns {
s.single[database] = s.conns[database][0]
}
}
s.refcnt = 1
return s
}
Copy the code
1. Create connection pool sharedBackendConn
2. Initialize connected two-dimensional array, single point redis is divided into 16 dB. Multiple connections can be created for each DB to ensure high concurrency.
3. NewBackendConn starts the coroutine and executes loopWrite and loopReader methods. The drop request to BackendConn’s input pipe is handled automatically.
The Start method
func (s *Proxy) Start(a) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrClosedProxy
}
if s.online {
return nil
}
s.online = true
s.router.Start()
ifs.jodis ! =nil {
s.jodis.Start()
}
return nil
}
Copy the code
1. Change the status of the proxy, such as the online field of the proxy and router.
2. S.jidis. Start registers node data with ZK. This allows the client to discover the node and make the call
SetSentinels method
This method basically updates the sentinel information of the current proxy. Used to listen on sentinel’s switch to the master node, more on that later.
Codis – server startup
GroupAddServer
c, err := redis.NewClient(addr, s.topom.Config().ProductAuth, time.Second)
iferr ! =nil {
log.WarnErrorf(err, "create redis client to %s failed", addr)
return rpc.ApiResponseError(err)
}
defer c.Close()
if_, err := c.SlotsInfo(); err ! =nil {
log.WarnErrorf(err, "redis %s check slots-info failed", addr)
return rpc.ApiResponseError(err)
}
iferr := s.topom.GroupAddServer(gid, dc, addr); err ! =nil {
return rpc.ApiResponseError(err)
} else {
return rpc.ApiResponseJson("OK")}Copy the code
Joining a Server node invokes this method.
1. Create a Redis client
2. Run the SLOTSINFO command. Determine whether the server can be added to the cluster. This command is not supported by Redis itself and is added to codis-server.
3. Invoke the GroupAddServer method of Topom.
Topom#GroupAddServer
1. Check whether the server is added to the cluster
for _, g := range ctx.group {
for _, x := range g.Servers {
if x.Addr == addr {
return errors.Errorf("server-[%s] already exists", addr)
}
}
}
Copy the code
2. Update sentinel information
3. Add the node to the corresponding group
Group structure
type Group struct {
Id int `json:"id"`
Servers []*GroupServer `json:"servers"`
Promoting struct {
Index int `json:"index,omitempty"`
State string `json:"state,omitempty"`
} `json:"promoting"`
OutOfSync bool `json:"out_of_sync"`
}
Copy the code
GroupServer stores all coDIS-Servers in the current group. Array 0 indexes the primary node.
If multiple nodes are added to the same group, you need to call SyncCreateAction to synchronize the primary node to the secondary node. For details, see the background.
Sentinle module
The rewatchSentinels method is called when Topom starts. In fact, proxy has this method, and the logic is similar. This method of proxy is called when TopOM is started and topOM goes online or a new proxy is created. Let’s just focus on topom’s rewatchSentinels approach.
s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
s.ha.monitor.LogFunc = log.Warnf
s.ha.monitor.ErrFunc = log.WarnErrorf
go func(p *redis.Sentinel) {
var trigger = make(chan struct{}, 1)
delayUntil := func(deadline time.Time) {
for! p.IsCanceled() {var d = deadline.Sub(time.Now())
if d <= 0 {
return
}
time.Sleep(math2.MinDuration(d, time.Second))
}
}
go func(a) {
defer close(trigger)
callback := func(a) {
select {
case trigger <- struct{} {} :default:}}for! p.IsCanceled() { timeout := time.Minute *15
retryAt := time.Now().Add(time.Second * 10)
if! p.Subscribe(servers, timeout, callback) { delayUntil(retryAt) }else {
callback()
}
}
}()
go func(a) {
for range trigger {
var success int
for i := 0; i ! =10&&! p.IsCanceled() && success ! =2; i++ {
timeout := time.Second * 5
masters, err := p.Masters(servers, timeout)
iferr ! =nil {
log.WarnErrorf(err, "fetch group masters failed")}else {
if! p.IsCanceled() { s.SwitchMasters(masters) } success +=1
}
delayUntil(time.Now().Add(time.Second * 5))
}
}
}()
}(s.ha.monitor)
Copy the code
1. Create a Trigger pipe
2. Start the coroutine call to the P.subscribe method to subscribe to sentinel events. Anyone who knows SentienL knows that sentienL calls the business side’s callback function when it switches from master to slave. Listen for the +switch-master event. If a +switch-master event is triggered, a callback is returned, that is, writing to the trigger.
3. Enable coroutine processing +switch-master logic. Call p.Masters whenever trigger is readable. This method uses the SENTINEL masters command to get the current master information from each SENTINEL. Obtain the master corresponding to each group according to the information.
4. Call S. witchMasters to perform the logic of switching master. Topom is an update of zK master information. This method of proxy actually updates the information of the in-memory routing table
4. To summarize
This article briefly describes the process of adding CODIS-proxy and Server to a cluster. The slot-related logic will be explained later.