Skip to content

Commit

Permalink
adjust embed test
Browse files Browse the repository at this point in the history
  • Loading branch information
jensenojs committed Sep 30, 2024
1 parent 5473310 commit acf01be
Showing 1 changed file with 6 additions and 184 deletions.
190 changes: 6 additions & 184 deletions pkg/tests/issues/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,194 +377,16 @@ func TestCNFlushS3Deletes(t *testing.T) {

/*
#16493
from : Last Allocated : CN1 insert into t1(null, 1) CN1 [1, 10000] at TS 80 (CN1)
to : This txn can see : CN1 CN1 start insert transaction again,
this transaction to use 8000~10000 cache at TS 90 (CN1)
create table t (id int auto_increment primary key, id2 int);
CN1 CN2
insert into t(id2) values (1), (2)
insert into t(id) values (3)
insert into t(id2) values (1), (2)
Between preinsert generating 9000 at TS 91 (CN1)
CN2 begin insert into t1 (9000, 1) at TS 95 (CN2)
CN2 commit at TS 96 (CN2)
and CN1 lock9000, the 9000 was committed in CN2 at TS 100 (CN1)
CN1 commit(failed)
need to ensure that dup check is after lock op
There is no lock competition, but there is data modification
*/
func TestDedupForAutoPk(t *testing.T) {
embed.RunBaseClusterTests(
func(c embed.Cluster) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

cn1, err := c.GetCNService(0)
require.NoError(t, err)

cn2, err := c.GetCNService(1)
require.NoError(t, err)

db := testutils.GetDatabaseName(t)
table := "t"

testutils.CreateTableAndWaitCNApplied(
t,
db,
table,
"create table "+table+" (id int auto_increment primary key, id2 int)",
cn1,
cn2,
)

_ = testutils.ExecSQL(
t,
db,
cn1,
"insert into "+table+" values (null, 1)",
)

committedAt := testutils.ExecSQL(
t,
db,
cn1,
"insert into "+table+" values (8999, 1)",
)

var wg sync.WaitGroup
wg.Add(2)

type oneChannel struct {
c chan struct{}
once sync.Once
}
txn2StartedC := &oneChannel{c: make(chan struct{})}

txn2CommittedC := make(chan struct{})

// txn1 workflow, the mean idea here is to make sure that
// between preinsert generating 9000 and lock, the 9000 was committed in another cn
go func() {
defer func() {
defer wg.Done()
}()

exec1 := testutils.GetSQLExecutor(cn1)
err := exec1.ExecTxn(
ctx,
func(txn executor.TxnExecutor) error {
defer func() {
runtime.MustGetTestingContext(cn1.ServiceID()).SetAdjustLockResultFunc(nil)
runtime.MustGetTestingContext(cn1.ServiceID()).SetBeforeLockFunc(nil)
}()

tx1 := txn.Txn().Txn().ID

runtime.MustGetTestingContext(cn1.ServiceID()).SetBeforeLockFunc(
func(txnID []byte, tableID uint64) {
if !bytes.Equal(txnID, tx1) {
return
}
// start txn2 update
txn2StartedC.once.Do(func() {
close(txn2StartedC.c)

// wait txn2 update committed
<-txn2CommittedC
})
},
)

runtime.MustGetTestingContext(cn1.ServiceID()).SetAdjustLockResultFunc(
func(
txnID []byte,
tableID uint64,
result *lock.Result,
) {
if !bytes.Equal(txnID, tx1) {
return
}
return
},
)

res, err := txn.Exec(
"insert into "+table+" values (null, 1)",
executor.StatementOption{},
)
require.NoError(t, err)
res.Close()

return nil
},
executor.Options{}.
WithDatabase(db).
WithMinCommittedTS(committedAt),
)
require.NoError(t, err)
}()

go func() {
defer func() {
close(txn2CommittedC)
wg.Done()
}()

<-txn2StartedC.c
exec2 := testutils.GetSQLExecutor(cn2)

res, err := exec2.Exec(
ctx,
"insert into "+table+" values (9000, 1)",
executor.Options{}.
WithDatabase(db).
WithMinCommittedTS(committedAt),
)
require.NoError(t, err)
res.Close()
}()

wg.Wait()

// check the result
exec := testutils.GetSQLExecutor(cn1)
res, err := exec.Exec(
ctx,
"select id from t;",
executor.Options{}.
WithDatabase(db).
WithWaitCommittedLogApplied(),
)
require.NoError(t, err)

res.ReadRows(
func(rows int, cols []*vector.Vector) bool {
for i := 0; i < rows; i++ {
n := executor.GetFixedRows[int32](cols[0])[i]
t.Logf("the value of rows %d is %d", i, n)
}
return true
},
)
require.True(t, 4 == res.Batches[0].RowCount())
res.Close()

exec = testutils.GetSQLExecutor(cn1)
res, err = exec.Exec(
ctx,
"select id, count(*) from t group by id having count(*) > 1;",
executor.Options{}.
WithDatabase(db).
WithWaitCommittedLogApplied(),
)
require.NoError(t, err)
require.Equal(t, 0, len(res.Batches))
res.Close()
},
)
}

func TestDedupForAutoPk2(t *testing.T) {
embed.RunBaseClusterTests(
func(c embed.Cluster) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
Expand Down

0 comments on commit acf01be

Please sign in to comment.