Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor cutover for MySQL 8.x rename feature && support OceanBase #1434

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type MigrationContext struct {
GoogleCloudPlatform bool
AzureMySQL bool
AttemptInstantDDL bool
OceanBase bool

// SkipPortValidation allows skipping the port validation in `ValidateConnection`
// This is useful when connecting to a MySQL instance where the external port
Expand Down
31 changes: 30 additions & 1 deletion go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package base

import (
"errors"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -62,6 +63,10 @@ func StringContainsAll(s string, substrings ...string) bool {
}

func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
if err := validateOceanBaseConnection(db, migrationContext); err != nil {
return "", err
}

versionQuery := `select @@global.version`

var version string
Expand All @@ -84,7 +89,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
// GCP set users port to "NULL", replace it by gh-ost param
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
var port int
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase {
port = connectionConfig.Key.Port
} else {
portQuery := `select @@global.port`
Expand All @@ -102,3 +107,27 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
}
}

func validateOceanBaseConnection(db *gosql.DB, migrationContext *MigrationContext) error {
versionCommentQuery := `select @@global.version_comment`
var versionComment string
if err := db.QueryRow(versionCommentQuery).Scan(&versionComment); err != nil {
return nil
}
if !strings.Contains(versionComment, "OceanBase") {
return nil
}

migrationContext.Log.Infof("OceanBase connection identified, version_comment: %v", versionComment)
migrationContext.OceanBase = true

enableLockPriorityQuery := `select value from oceanbase.GV$OB_PARAMETERS where name='enable_lock_priority'`
var enableLockPriority bool
if err := db.QueryRow(enableLockPriorityQuery).Scan(&enableLockPriority); err != nil {
return err
}
if !enableLockPriority {
return errors.New("system parameter 'enable_lock_priority' should be true to support cut-over")
}
return nil
}
74 changes: 49 additions & 25 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.validateAndReadGlobalVariables(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down Expand Up @@ -714,24 +714,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
// lockTable places a write lock on the specific table
func (this *Applier) lockTable(databaseName, tableName string) error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Table locked")
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
return nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
}

// LockGhostTable places a write lock on the ghost table
func (this *Applier) LockGhostTable() error {
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
}

// UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error {
query := `unlock /* gh-ost */ tables`
Expand Down Expand Up @@ -1033,7 +1037,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke

tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
Expand Down Expand Up @@ -1108,25 +1112,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
return nil
}

// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
if sessionIdChan != nil {
sessionIdChan <- -1
}
if tablesRenamed != nil {
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}
}()
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err

if sessionIdChan != nil {
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
}
sessionIdChan <- sessionId

this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
Expand All @@ -1143,14 +1153,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
)
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
if tablesRenamed != nil {
tablesRenamed <- err
}
return this.migrationContext.Log.Errore(err)
}
tablesRenamed <- nil
if tablesRenamed != nil {
tablesRenamed <- nil
}
this.migrationContext.Log.Infof("Tables renamed")
return nil
}

// AtomicCutoverRename renames tables for atomic cut over in non lock session
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
}

// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
func (this *Applier) AtomicCutoverRenameWithLock() error {
return this.atomicCutoverRename(this.singletonDB, nil, nil)
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateConnection(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down
50 changes: 47 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {

// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
if dmlEvent.NewColumnValues == nil {
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
return nil
}
// Hey, I created the changelog table, I know the type of columns it has!
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
case "state":
Expand Down Expand Up @@ -562,9 +567,15 @@ func (this *Migrator) cutOver() (err error) {

switch this.migrationContext.CutOverType {
case base.CutOverAtomic:
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
err = this.atomicCutOverMySQL8()
} else {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
}
case base.CutOverTwoStep:
err = this.cutOverTwoStep()
default:
Expand Down Expand Up @@ -643,6 +654,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
return nil
}

// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
// what's left of last DML entries, and atomically swap original->old, then new->original.
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
func (this *Migrator) atomicCutOverMySQL8() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)

if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err
}

if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
return err
}
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
return err
}

if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
return err
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}

lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}

// atomicCutOver
func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
Expand Down
45 changes: 45 additions & 0 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mysql
import (
gosql "database/sql"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
_, err := db.Exec(`KILL QUERY %s`, connectionID)
return err
}

func versionTokens(version string, digits int) []int {
v := strings.Split(version, "-")[0]
tokens := strings.Split(v, ".")
intTokens := make([]int, digits)
for i := range tokens {
if i >= digits {
break
}
intTokens[i], _ = strconv.Atoi(tokens[i])
}
return intTokens
}

func isSmallerVersion(version string, otherVersion string, digits int) bool {
v := versionTokens(version, digits)
o := versionTokens(otherVersion, digits)
for i := 0; i < len(v); i++ {
if v[i] < o[i] {
return true
}
if v[i] > o[i] {
return false
}
if i == digits {
break
}
}
return false
}

// IsSmallerMajorVersion tests two versions against another and returns true if
// the former is a smaller "major" version than the latter.
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
func IsSmallerMajorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 2)
}

// IsSmallerMinorVersion tests two versions against another and returns true if
// the former is a smaller "minor" version than the latter.
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
func IsSmallerMinorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 3)
}