sequence
This article mainly studies the Lock of storagetapper
Lock
storagetapper/lock/lock.go
/*Lock is general distributed lock interface*/ type Lock interface { // Try to acquire a lock. Returns false if failed. TryLock(s string) bool // Try to acquire a lock. Returns false if failed. // Allows n simultaneous locks to be held TryLockShared(s string, n int) bool // Try to acquire a lock and wait for specified period of time for the lock // to become available. Returns false if failed. Lock(s string, waitDuration time.Duration) bool // Check if we still have the lock. Try to reacquire if necessary. // Returns false in the case of failure Refresh() bool // Unlock the lock. Returns false if there was failure Unlock() bool //Close releases resources associated with the lock Close() bool }Copy the code
The Lock interface defines methods TryLock, TryLockShared, Lock, Refresh, Unlock, and Close
myLock
storagetapper/lock/lock.go
type myLock struct { conn *sql.DB connID int64 name string ci db.Addr n int mu sync.Mutex isLocked bool } // Lock waits for the duration specified for the lock to be available // Also TryLock will reuse the connection if it already exists otherwise // it will create a new one. func (m *myLock) Lock(s string, timeout time.Duration) bool { return m.lock(s, timeout, 1) } // TryLock tries to take a lock and returns an error if it cannot. If the lock // is already held then TryLock is noop. func (m *myLock) TryLock(s string) bool { return m.Lock(s, 0) } // TryLockShared tries to take a lock and returns an error if it cannot. If the lock // is already held then TryLock is noop. func (m *myLock) TryLockShared(s string, n int) bool { return m.lock(s, 0, n) } // Refresh tries to keep the lock fresh. func (m *myLock) Refresh() bool { if ! m.isLocked { return true } if m.IsLockedByMe() { return true } return m.TryLock(m.name) } // Unlock releases locks associated with the connection func (m *myLock) Unlock() bool { m.mu.Lock() defer m.mu.Unlock() if ! m.isLocked { return false } m.isLocked = false if m.conn == nil { return false } var res sql.NullBool err := m.conn.QueryRow("SELECT RELEASE_LOCK(?) ", m.lockName()).Scan(&res) if log.EL(m.log(), err) { return false } if ! res.Valid { m.log().Errorf("Lock did not exists on release") return false } if ! res.Bool { m.log().Errorf("Lock was not hold by me") return false } return true } func (m *myLock) Close() bool { return m.closeConn() }Copy the code
MyLock defines conn, connID, name, db.addr, N, mu, isLocked properties. It uses DB to implement the Lock interface. Lock, TryLock, and TryLockShared call the Lock method; The Refresh method checks whether the lock is still in place. If not, TryLock is executed. Unlock is achieved through RELEASE_LOCK, and isLocked is updated to false; Close releases conn
lock
storagetapper/lock/lock.go
// Lock waits for the duration specified for the lock to be available // Also TryLock will reuse the connection if it already exists otherwise // it will create a new one. func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool { m.mu.Lock() defer m.mu.Unlock() var err error var res sql.NullBool m.name = s for m.n = 0; m.n < ntickets; m.n++ { if ! m.createConn() { return false } err = m.conn.QueryRow("SELECT GET_LOCK(? ,?) ", m.lockName(), timeout/time.Second).Scan(&res) //true - success, false - timeout, NULL - error if err == nil && res.Valid && res.Bool { m.isLocked = true return true } if log.EL(m.log(), err) { m.closeConn() } } return false }Copy the code
The lock method performs GET_LOCK and sets isLocked to true if the lock succeeds. Otherwise, try again, trying the ntickets times
IsLockedByMe
storagetapper/lock/lock.go
// IsLockedByMe checks whether the lock is held by the same connection as // the current lock object. func (m *myLock) IsLockedByMe() bool { m.mu.Lock() defer m.mu.Unlock() var lockedBy int64 if m.conn == nil { return false } err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?) , 0)", m.lockName()).Scan(&lockedBy) log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID) if err ! = nil || m.connID == 0 || lockedBy ! = m.connID { if err ! = nil { m.log().Errorf("IsLockedByMe: error: " + err.Error()) m.closeConn() } else { m.log().Debugf("IsLockedByMe: lockedBy: %v, ! = myConnID: %v", lockedBy, m.connID) } return false } return true }Copy the code
IsLockedByMe queries lockedBy from IS_USED_LOCK and determines whether lockedBy is equal to connID or false if not
summary
The Lock interface of the Storagetapper defines methods TryLock, TryLockShared, Lock, Refresh, Unlock, and Close. MyLock defines conn, connID, name, db.addr, n, mu, and isLocked properties. It uses DB to implement the Lock interface, which is implemented by mysql’s GET_LOCK, RELEASE_LOCK, and IS_USED_LOCK functions.
doc
- storagetapper
- locking-functions