Skip to content

Commit c284ca1

Browse files
committed
reduce mut contention with a pessimistic lock
1 parent 5b1835d commit c284ca1

File tree

6 files changed

+152
-64
lines changed

6 files changed

+152
-64
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ apogy: api .PHONY
1010

1111

1212
test:
13-
go test github.com/aep/apogy/server/...
13+
go test -count=1 -v github.com/aep/apogy/server/...
1414

1515

1616
.PHONY:

kv/kv.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type KeyAndValue struct {
1313
type KV interface {
1414
Close()
1515
Write() Write
16+
ExclusiveWrite(ctx context.Context, keys ...[]byte) (Write, error)
1617
Read() Read
1718
Ping() error
1819
}

kv/kv_test.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package kv
22

33
import (
44
"context"
5-
"github.com/stretchr/testify/require"
65
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
"go.uber.org/atomic"
710
)
811

912
func TestKVDoesPreventOutdatedWrites(t *testing.T) {
@@ -47,3 +50,52 @@ func TestKVDoesNotPreventDup(t *testing.T) {
4750
err = w1.Commit(context.Background())
4851
require.NoError(t, err, "kv must allow setting the same key twice within a tx")
4952
}
53+
54+
func TestKVLockReordering(t *testing.T) {
55+
k, err := NewTikv()
56+
require.NoError(t, err)
57+
defer k.Close()
58+
59+
var ctx = t.Context()
60+
var key = []byte("reordertest")
61+
62+
w1, err := k.ExclusiveWrite(ctx, key)
63+
require.NoError(t, err)
64+
defer w1.Rollback()
65+
66+
var w1commited = atomic.NewBool(false)
67+
go func() {
68+
time.Sleep(time.Millisecond * 101)
69+
70+
err = w1.Put(key, []byte("1"))
71+
require.NoError(t, err)
72+
73+
err = w1.Commit(ctx)
74+
w1commited.Store(true)
75+
require.NoError(t, err, "first commit must work")
76+
}()
77+
78+
w2, err := k.ExclusiveWrite(ctx, key)
79+
require.NoError(t, err)
80+
defer w2.Rollback()
81+
82+
if !w1commited.Load() {
83+
panic("not reordered")
84+
}
85+
86+
current, err := w2.Get(ctx, key)
87+
require.NoError(t, err)
88+
require.Equal(t, "1", string(current))
89+
90+
require.NoError(t, err)
91+
err = w2.Put(key, []byte("2"))
92+
require.NoError(t, err)
93+
94+
err = w2.Commit(ctx)
95+
require.NoError(t, err, "second commit must work")
96+
97+
actual, err := k.Read().Get(ctx, key)
98+
require.NoError(t, err)
99+
require.Equal(t, "2", string(actual))
100+
101+
}

kv/tikv.go

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ package kv
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"iter"
78
"log/slog"
89
"os"
10+
"time"
911

1012
pingcaplog "github.com/pingcap/log"
1113

1214
"github.com/lmittmann/tint"
15+
tikverr "github.com/tikv/client-go/v2/error"
16+
"github.com/tikv/client-go/v2/kv"
1317
"github.com/tikv/client-go/v2/txnkv"
1418
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
1519

@@ -45,6 +49,12 @@ type TikvWrite struct {
4549
txn *txnkv.KVTxn
4650
err error
4751
commited bool
52+
53+
statLockRetries int
54+
}
55+
56+
func (w *TikvWrite) Stat() (statLockRetries int) {
57+
return w.statLockRetries
4858
}
4959

5060
func (w *TikvWrite) Commit(ctx context.Context) error {
@@ -249,8 +259,66 @@ func (t *Tikv) Close() {
249259

250260
func (t *Tikv) Write() Write {
251261
txn, err := t.k.Begin()
252-
txn.SetEnable1PC(true)
253-
return &TikvWrite{txn, err, false}
262+
//txn.SetEnable1PC(true)
263+
return &TikvWrite{txn: txn, err: err}
264+
}
265+
266+
func (t *Tikv) ExclusiveWrite(ctx context.Context, keys ...[]byte) (Write, error) {
267+
268+
var waitMs = int64(100)
269+
270+
// DO NOT use aggressive locking.
271+
// it's a deadlock trap
272+
// r.txn.StartAggressiveLocking()
273+
// instead do the retry loop in the client (us)
274+
275+
txn, err := t.k.Begin()
276+
if err != nil {
277+
return nil, err
278+
}
279+
txn.SetPessimistic(true)
280+
281+
retries := 0
282+
for {
283+
284+
waitMs += 1
285+
retries += 1
286+
287+
lkctx := kv.NewLockCtx(txn.StartTS(), waitMs, time.Now())
288+
289+
err = txn.LockKeys(ctx, lkctx, keys...)
290+
if err == nil {
291+
break
292+
}
293+
294+
// we got the lock but someone changed the key we're locking
295+
// get a new txn with the current start time
296+
if tikverr.IsErrWriteConflict(err) {
297+
txn.Rollback()
298+
txn, err = t.k.Begin()
299+
if err != nil {
300+
return nil, err
301+
}
302+
txn.SetPessimistic(true)
303+
continue
304+
}
305+
306+
if !errors.Is(err, tikverr.ErrLockWaitTimeout) {
307+
return nil, err
308+
}
309+
310+
select {
311+
case <-ctx.Done():
312+
if ctx.Err() == nil {
313+
return nil, err
314+
}
315+
return nil, ctx.Err()
316+
default:
317+
continue
318+
}
319+
}
320+
321+
return &TikvWrite{txn: txn, statLockRetries: retries}, nil
254322
}
255323

256324
func (t *Tikv) Read() Read {

server/crud.go

Lines changed: 24 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package server
33
import (
44
"context"
55
"fmt"
6-
"log/slog"
76
"net/http"
87
"reflect"
98
"time"
109

1110
openapi "github.com/aep/apogy/api/go"
11+
"github.com/aep/apogy/kv"
1212
"github.com/labstack/echo/v4"
1313
tikerr "github.com/tikv/client-go/v2/error"
1414
"go.opentelemetry.io/otel/attribute"
@@ -39,13 +39,13 @@ func (s *server) PutDocument(c echo.Context) error {
3939

4040
switch doc.Model {
4141
case "Model":
42-
err := s.validateSchemaSchema(ctx, doc)
42+
err = s.validateSchemaSchema(ctx, doc)
4343
if err != nil {
4444
span.RecordError(err)
4545
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("validation error: %s", err))
4646
}
4747
case "Reactor":
48-
err := s.validateReactorSchema(ctx, doc)
48+
err = s.validateReactorSchema(ctx, doc)
4949
if err != nil {
5050
span.RecordError(err)
5151
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("validation error: %s", err))
@@ -60,65 +60,32 @@ func (s *server) PutDocument(c echo.Context) error {
6060
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
6161
}
6262

63-
// Try to save document with retry logic
64-
attempts := 0
65-
for i := 0; ; i++ {
66-
attempts++
67-
retrySpan := trace.SpanFromContext(ctx)
68-
retrySpan.SetAttributes(attribute.Int("attempt", i+1))
69-
70-
err := s.putDocument1(c, ctx, doc, model)
71-
if err == nil {
72-
retrySpan.SetAttributes(attribute.Bool("success", true))
73-
break
74-
}
75-
76-
retrySpan.RecordError(err)
77-
78-
if echoErr, ok := err.(*echo.HTTPError); ok {
79-
// Record failed commit
80-
if echoErr.Code == http.StatusInternalServerError {
81-
kvCommitFailures.WithLabelValues("put_document", "internal_error").Inc()
82-
} else if echoErr.Code == http.StatusConflict {
83-
kvCommitFailures.WithLabelValues("put_document", "write_conflict").Inc()
84-
} else {
85-
kvCommitFailures.WithLabelValues("put_document", "other").Inc()
86-
}
63+
path, err := safeDBPath(doc.Model, doc.Id)
64+
if err != nil {
65+
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
66+
}
8767

88-
// Record retries for failed operation
89-
kvCommitRetries.WithLabelValues("put_document", "failed").Observe(float64(attempts - 1))
90-
return echoErr
91-
}
68+
// for contested keys, use pessimistic locking
69+
var hotKeys = [][]byte{}
70+
if doc.Mut != nil {
71+
hotKeys = append(hotKeys, path)
72+
}
9273

93-
if i > 10 {
94-
time.Sleep(100 * time.Millisecond)
95-
} else {
96-
time.Sleep(10 * time.Millisecond)
74+
var w2 kv.Write
75+
if len(hotKeys) == 0 {
76+
w2 = s.kv.Write()
77+
} else {
78+
w2, err = s.kv.ExclusiveWrite(ctx, hotKeys...)
79+
if err != nil {
80+
return err
9781
}
9882

99-
slog.Warn("putDocument1", "err", err)
83+
statLockRetries := w2.(*kv.TikvWrite).Stat()
84+
span.SetAttributes(attribute.Int("lockRetries", statLockRetries))
85+
kvLockRetries.WithLabelValues("hot", "true").Observe(float64(statLockRetries))
10086
}
101-
102-
// Record successful commit metrics
103-
span.SetAttributes(attribute.Int("attempts", attempts))
104-
kvCommitRetries.WithLabelValues("put_document", "success").Observe(float64(attempts - 1))
105-
106-
return c.JSON(http.StatusOK, doc)
107-
}
108-
109-
func (s *server) putDocument1(c echo.Context, ctx context.Context, doc_ *openapi.Document, model *Model) error {
110-
111-
doccpy := *doc_
112-
doc := &doccpy
113-
114-
w2 := s.kv.Write()
11587
defer w2.Close()
11688

117-
path, err := safeDBPath(doc.Model, doc.Id)
118-
if err != nil {
119-
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
120-
}
121-
12289
now := time.Now()
12390
doc.History = &openapi.History{
12491
Created: &now,
@@ -149,7 +116,7 @@ func (s *server) putDocument1(c echo.Context, ctx context.Context, doc_ *openapi
149116
}
150117

151118
if reflect.DeepEqual(old.Val, doc.Val) {
152-
return nil
119+
return c.JSON(http.StatusOK, doc)
153120
}
154121

155122
if old.Version != nil && doc.Version != nil {
@@ -255,7 +222,7 @@ func (s *server) putDocument1(c echo.Context, ctx context.Context, doc_ *openapi
255222
return echo.NewHTTPError(http.StatusUnprocessableEntity, err.Error())
256223
}
257224

258-
return nil
225+
return c.JSON(http.StatusOK, doc)
259226
}
260227

261228
func (s *server) GetDocument(c echo.Context, model string, id string) error {

server/health.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ var (
4242
[]string{"operation"},
4343
)
4444

45-
kvCommitRetries = prometheus.NewHistogramVec(
45+
kvLockRetries = prometheus.NewHistogramVec(
4646
prometheus.HistogramOpts{
47-
Name: "kv_commit_retries",
47+
Name: "kv_lock_retries",
4848
Help: "Number of commit retries",
4949
Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 40, 80, 160, 320},
5050
},
@@ -75,7 +75,7 @@ func init() {
7575
promRegistry.MustRegister(httpRequestsTotal)
7676
promRegistry.MustRegister(httpRequestDuration)
7777
promRegistry.MustRegister(kvCommitDuration)
78-
promRegistry.MustRegister(kvCommitRetries)
78+
promRegistry.MustRegister(kvLockRetries)
7979
promRegistry.MustRegister(kvCommitFailures)
8080
}
8181

0 commit comments

Comments
 (0)