Skip to content

Commit 3b45978

Browse files
authored
store/tikv: unset pessimistic txn primary key when statement failed (pingcap#10867) (pingcap#10894)
1 parent e4fd045 commit 3b45978

File tree

7 files changed

+108
-20
lines changed

7 files changed

+108
-20
lines changed

session/pessimistic_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,15 @@ func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) {
232232
tk.MustExec("commit")
233233
syncCh <- true
234234
}
235+
236+
func (s *testPessimisticSuite) TestFirstStatementFail(c *C) {
237+
tk := testkit.NewTestKitWithInit(c, s.store)
238+
tk.MustExec("drop table if exists first")
239+
tk.MustExec("create table first (k int unique)")
240+
tk.MustExec("insert first values (1)")
241+
tk.MustExec("begin pessimistic")
242+
_, err := tk.Exec("insert first values (1)")
243+
c.Assert(err, NotNil)
244+
tk.MustExec("insert first values (2)")
245+
tk.MustExec("commit")
246+
}

store/mockstore/mocktikv/mock_tikv_test.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,12 @@ func (s *testMockTiKVSuite) mustGetRC(c *C, key string, ts uint64, expect string
9494
}
9595

9696
func (s *testMockTiKVSuite) mustPutOK(c *C, key, value string, startTS, commitTS uint64) {
97-
errs := s.store.Prewrite(putMutations(key, value), []byte(key), startTS, 0)
97+
req := &kvrpcpb.PrewriteRequest{
98+
Mutations: putMutations(key, value),
99+
PrimaryLock: []byte(key),
100+
StartVersion: startTS,
101+
}
102+
errs := s.store.Prewrite(req)
98103
for _, err := range errs {
99104
c.Assert(err, IsNil)
100105
}
@@ -109,7 +114,12 @@ func (s *testMockTiKVSuite) mustDeleteOK(c *C, key string, startTS, commitTS uin
109114
Key: []byte(key),
110115
},
111116
}
112-
errs := s.store.Prewrite(mutations, []byte(key), startTS, 0)
117+
req := &kvrpcpb.PrewriteRequest{
118+
Mutations: mutations,
119+
PrimaryLock: []byte(key),
120+
StartVersion: startTS,
121+
}
122+
errs := s.store.Prewrite(req)
113123
for _, err := range errs {
114124
c.Assert(err, IsNil)
115125
}
@@ -146,7 +156,12 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
146156
}
147157

148158
func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
149-
errs := s.store.Prewrite(mutations, []byte(primary), startTS, 0)
159+
req := &kvrpcpb.PrewriteRequest{
160+
Mutations: mutations,
161+
PrimaryLock: []byte(primary),
162+
StartVersion: startTS,
163+
}
164+
errs := s.store.Prewrite(req)
150165
for _, err := range errs {
151166
c.Assert(err, IsNil)
152167
}
@@ -412,7 +427,12 @@ func (s *testMockTiKVSuite) TestCommitConflict(c *C) {
412427
// A prewrite.
413428
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
414429
// B prewrite and find A's lock.
415-
errs := s.store.Prewrite(putMutations("x", "B"), []byte("x"), 10, 0)
430+
req := &kvrpcpb.PrewriteRequest{
431+
Mutations: putMutations("x", "B"),
432+
PrimaryLock: []byte("x"),
433+
StartVersion: 10,
434+
}
435+
errs := s.store.Prewrite(req)
416436
c.Assert(errs[0], NotNil)
417437
// B find rollback A because A exist too long.
418438
s.mustRollbackOK(c, [][]byte{[]byte("x")}, 5)
@@ -470,17 +490,27 @@ func (s *testMockTiKVSuite) TestBatchResolveLock(c *C) {
470490

471491
func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
472492
s.mustPutOK(c, "test", "test", 1, 3)
473-
474-
errs := s.store.Prewrite(putMutations("lock", "lock", "test", "test1"), []byte("test"), 2, 2)
493+
req := &kvrpcpb.PrewriteRequest{
494+
Mutations: putMutations("lock", "lock", "test", "test1"),
495+
PrimaryLock: []byte("test"),
496+
StartVersion: 2,
497+
LockTtl: 2,
498+
}
499+
errs := s.store.Prewrite(req)
475500
s.mustWriteWriteConflict(c, errs, 1)
476501

477502
s.mustPutOK(c, "test", "test2", 5, 8)
478503

479504
// simulate `getTxnStatus` for txn 2.
480505
err := s.store.Cleanup([]byte("test"), 2)
481506
c.Assert(err, IsNil)
482-
483-
errs = s.store.Prewrite(putMutations("test", "test3"), []byte("test"), 6, 1)
507+
req = &kvrpcpb.PrewriteRequest{
508+
Mutations: putMutations("test", "test3"),
509+
PrimaryLock: []byte("test"),
510+
StartVersion: 6,
511+
LockTtl: 1,
512+
}
513+
errs = s.store.Prewrite(req)
484514
s.mustWriteWriteConflict(c, errs, 0)
485515
}
486516

store/mockstore/mocktikv/mvcc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ type MVCCStore interface {
434434
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
435435
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error
436436
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
437-
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS, ttl uint64) []error
437+
Prewrite(req *kvrpcpb.PrewriteRequest) []error
438438
Commit(keys [][]byte, startTS, commitTS uint64) error
439439
Rollback(keys [][]byte, startTS uint64) error
440440
Cleanup(key []byte, startTS uint64) error

store/mockstore/mocktikv/mvcc_leveldb.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
554554
}
555555
return nil
556556
}
557-
if err = checkConflictValue(iter, mutation.Key, forUpdateTS); err != nil {
557+
if err = checkConflictValue(iter, mutation, forUpdateTS); err != nil {
558558
return err
559559
}
560560

@@ -623,15 +623,19 @@ func pessimisticRollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, st
623623
}
624624

625625
// Prewrite implements the MVCCStore interface.
626-
func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error {
626+
func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
627+
mutations := req.Mutations
628+
primary := req.PrimaryLock
629+
startTS := req.StartVersion
630+
ttl := req.LockTtl
627631
mvcc.mu.Lock()
628632
defer mvcc.mu.Unlock()
629633

630634
anyError := false
631635
batch := &leveldb.Batch{}
632636
errs := make([]error, 0, len(mutations))
633637
txnSize := len(mutations)
634-
for _, m := range mutations {
638+
for i, m := range mutations {
635639
// If the operation is Insert, check if key is exists at first.
636640
var err error
637641
if m.GetOp() == kvrpcpb.Op_Insert {
@@ -650,7 +654,8 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
650654
continue
651655
}
652656
}
653-
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize))
657+
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
658+
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, uint64(txnSize), isPessimisticLock)
654659
errs = append(errs, err)
655660
if err != nil {
656661
anyError = true
@@ -666,26 +671,34 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
666671
return errs
667672
}
668673

669-
func checkConflictValue(iter *Iterator, key []byte, startTS uint64) error {
674+
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) error {
670675
dec := valueDecoder{
671-
expectKey: key,
676+
expectKey: m.Key,
672677
}
673678
ok, err := dec.Decode(iter)
674679
if err != nil {
675680
return errors.Trace(err)
676681
}
682+
if !ok {
683+
return nil
684+
}
677685
// Note that it's a write conflict here, even if the value is a rollback one.
678-
if ok && dec.value.commitTS >= startTS {
686+
if dec.value.commitTS >= startTS {
679687
return &ErrConflict{
680688
StartTS: startTS,
681689
ConflictTS: dec.value.commitTS,
682-
Key: key,
690+
Key: m.Key,
691+
}
692+
}
693+
if m.Op == kvrpcpb.Op_PessimisticLock && m.Assertion == kvrpcpb.Assertion_NotExist {
694+
return &ErrKeyAlreadyExist{
695+
Key: m.Key,
683696
}
684697
}
685698
return nil
686699
}
687700

688-
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64) error {
701+
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64, txnSize uint64, isPessimisticLock bool) error {
689702
startKey := mvccEncode(mutation.Key, lockVer)
690703
iter := newIterator(db, &util.Range{
691704
Start: startKey,
@@ -708,7 +721,10 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
708721
}
709722
// Overwrite the pessimistic lock.
710723
} else {
711-
err = checkConflictValue(iter, mutation.Key, startTS)
724+
if isPessimisticLock {
725+
return ErrAbort("pessimistic lock not found")
726+
}
727+
err = checkConflictValue(iter, mutation, startTS)
712728
if err != nil {
713729
return err
714730
}

store/mockstore/mocktikv/rpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Pre
292292
panic("KvPrewrite: key not in region")
293293
}
294294
}
295-
errs := h.mvccStore.Prewrite(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetLockTtl())
295+
errs := h.mvccStore.Prewrite(req)
296296
return &kvrpcpb.PrewriteResponse{
297297
Errors: convertToKeyErrors(errs),
298298
}

store/tikv/2pc_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,28 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
461461
c.Assert(req.Prewrite.ForUpdateTs, Equals, uint64(100))
462462
}
463463

464+
func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
465+
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
466+
key := kv.Key("key")
467+
txn := s.begin(c)
468+
c.Assert(txn.Set(key, key), IsNil)
469+
c.Assert(txn.Commit(context.Background()), IsNil)
470+
471+
txn = s.begin(c)
472+
txn.SetOption(kv.Pessimistic, true)
473+
txn.SetOption(kv.PresumeKeyNotExists, nil)
474+
_, _ = txn.us.Get(key)
475+
c.Assert(txn.Set(key, key), IsNil)
476+
txn.DelOption(kv.PresumeKeyNotExists)
477+
err := txn.LockKeys(context.Background(), txn.startTS, key)
478+
c.Assert(err, NotNil)
479+
c.Assert(txn.Delete(key), IsNil)
480+
key2 := kv.Key("key2")
481+
c.Assert(txn.Set(key2, key2), IsNil)
482+
err = txn.Commit(context.Background())
483+
c.Assert(err, IsNil)
484+
}
485+
464486
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
465487
txn := s.begin(c)
466488
txn.SetOption(kv.Pessimistic, true)

store/tikv/txn.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,11 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput
373373
if err != nil {
374374
return err
375375
}
376+
}
377+
var assignedPrimaryKey bool
378+
if txn.committer.primaryKey == nil {
376379
txn.committer.primaryKey = keys[0]
380+
assignedPrimaryKey = true
377381
}
378382

379383
bo := NewBackoffer(ctx, pessimisticLockMaxBackoff).WithVars(txn.vars)
@@ -391,6 +395,10 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput
391395
// Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock.
392396
time.Sleep(time.Millisecond * 5)
393397
}
398+
if assignedPrimaryKey {
399+
// unset the primary key if we assigned primary key when failed to lock it.
400+
txn.committer.primaryKey = nil
401+
}
394402
return err
395403
}
396404
}

0 commit comments

Comments
 (0)