Skip to content

Commit 386c8fa

Browse files
committed
allow concurrent delete
1 parent c284ca1 commit 386c8fa

File tree

4 files changed

+137
-4
lines changed

4 files changed

+137
-4
lines changed

kv/kv_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,89 @@ func TestKVDoesPreventOutdatedWrites(t *testing.T) {
3838
require.NoError(t, err, "w4 must succeed because it is fresh")
3939
}
4040

41+
// this behaviour actually sucks because it means Delete in apogy must be locked
42+
// this test is just here in case they change their mind about it
43+
func TestKVDoesPreventOutdatedDoubleDelete(t *testing.T) {
44+
45+
k, err := NewTikv()
46+
require.NoError(t, err)
47+
defer k.Close()
48+
49+
var ctx = t.Context()
50+
var key = []byte("TestKVDoesNotPreventOutdatedDoubleDelete")
51+
52+
prep := k.Write()
53+
prep.Put(key, []byte("1"))
54+
err = prep.Commit(ctx)
55+
require.NoError(t, err)
56+
57+
w1 := k.Write()
58+
w2 := k.Write()
59+
60+
w1.Del(key)
61+
w2.Del(key)
62+
63+
err = w1.Commit(context.Background())
64+
require.NoError(t, err)
65+
66+
err = w2.Commit(context.Background())
67+
require.Error(t, err)
68+
}
69+
70+
func TestKVDeleteAndPutMustConflict1(t *testing.T) {
71+
72+
k, err := NewTikv()
73+
require.NoError(t, err)
74+
defer k.Close()
75+
76+
var ctx = t.Context()
77+
var key = []byte("TestKVDoesNotPreventOutdatedDoubleDelete")
78+
79+
prep := k.Write()
80+
prep.Put(key, []byte("1"))
81+
err = prep.Commit(ctx)
82+
require.NoError(t, err)
83+
84+
w1 := k.Write()
85+
w2 := k.Write()
86+
87+
w1.Put(key, []byte("2"))
88+
w2.Del(key)
89+
90+
err = w1.Commit(context.Background())
91+
require.NoError(t, err)
92+
93+
err = w2.Commit(context.Background())
94+
require.Error(t, err)
95+
}
96+
97+
func TestKVDeleteAndPutMustConflict2(t *testing.T) {
98+
99+
k, err := NewTikv()
100+
require.NoError(t, err)
101+
defer k.Close()
102+
103+
var ctx = t.Context()
104+
var key = []byte("TestKVDoesNotPreventOutdatedDoubleDelete")
105+
106+
prep := k.Write()
107+
prep.Put(key, []byte("1"))
108+
err = prep.Commit(ctx)
109+
require.NoError(t, err)
110+
111+
w1 := k.Write()
112+
w2 := k.Write()
113+
114+
w1.Del(key)
115+
w2.Put(key, []byte("2"))
116+
117+
err = w1.Commit(context.Background())
118+
require.NoError(t, err)
119+
120+
err = w2.Commit(context.Background())
121+
require.Error(t, err)
122+
}
123+
41124
func TestKVDoesNotPreventDup(t *testing.T) {
42125
k, err := NewTikv()
43126
require.NoError(t, err)

kv/tikv.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,11 @@ func (w *TikvWrite) Del(key []byte) error {
123123
if w.err != nil {
124124
return w.err
125125
}
126-
return w.txn.Delete(key)
126+
err := w.txn.Delete(key)
127+
if err != nil {
128+
w.err = err
129+
}
130+
return err
127131
}
128132

129133
func (r *TikvWrite) Iter(ctx context.Context, start []byte, end []byte) iter.Seq2[KeyAndValue, error] {

server/crud.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ func (s *server) PutDocument(c echo.Context) error {
7171
hotKeys = append(hotKeys, path)
7272
}
7373

74+
// TODO add unique
75+
7476
var w2 kv.Write
7577
if len(hotKeys) == 0 {
7678
w2 = s.kv.Write()
@@ -306,7 +308,7 @@ func (s *server) DeleteDocument(c echo.Context, model string, id string) error {
306308
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
307309
}
308310

309-
w := s.kv.Write()
311+
w, err := s.kv.ExclusiveWrite(ctx, path)
310312
defer w.Close()
311313

312314
// First get the document to remove its indexes
@@ -315,12 +317,12 @@ func (s *server) DeleteDocument(c echo.Context, model string, id string) error {
315317
if err != nil {
316318
span.RecordError(err)
317319
if tikerr.IsErrNotFound(err) {
318-
return echo.NewHTTPError(http.StatusNotFound, "document not found")
320+
return c.NoContent(http.StatusOK)
319321
}
320322
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("database error: %v", err))
321323
}
322324
if bytes == nil {
323-
notFoundErr := echo.NewHTTPError(http.StatusNotFound, "document not found")
325+
notFoundErr := echo.NewHTTPError(http.StatusNotFound, "document is empty")
324326
span.RecordError(notFoundErr)
325327
return notFoundErr
326328
}

server/crud_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,3 +483,47 @@ func TestConcurrentMutations_NeverFail(t *testing.T) {
483483
assert.NoError(t, err)
484484
assert.Equal(t, int64(concurrentCount), counterInt, "All mutations should have been applied")
485485
}
486+
487+
func TestConcurrentDelete(t *testing.T) {
488+
e, s := setupTestServer(t)
489+
490+
// Create a document with a counter
491+
docId := "concurrent-delete-test"
492+
initialVal := map[string]interface{}{
493+
"counter": json.Number("0"),
494+
}
495+
initialDoc := openapi.Document{
496+
Model: "Test.com.example",
497+
Id: docId,
498+
Val: initialVal,
499+
}
500+
501+
// First PUT to create the document
502+
docBytes, _ := json.Marshal(initialDoc)
503+
req := httptest.NewRequest(http.MethodPost, "/v1/", bytes.NewReader(docBytes))
504+
req.Header.Set(echo.HeaderContentType, "application/json")
505+
rec := httptest.NewRecorder()
506+
c := e.NewContext(req, rec)
507+
508+
assert.NoError(t, s.PutDocument(c))
509+
510+
// Number of concurrent mutations to perform
511+
concurrentCount := 50
512+
513+
// Launch concurrent mutations, all deleting the doc
514+
var wg sync.WaitGroup
515+
for i := 0; i < concurrentCount; i++ {
516+
wg.Add(1)
517+
go func() {
518+
defer wg.Done()
519+
520+
reqMut := httptest.NewRequest(http.MethodDelete, "/v1", nil)
521+
reqMut.Header.Set(echo.HeaderContentType, "application/json")
522+
523+
err := s.DeleteDocument(c, initialDoc.Model, initialDoc.Id)
524+
assert.NoError(t, err, "Concurrent delete should not fail")
525+
}()
526+
}
527+
528+
wg.Wait()
529+
}

0 commit comments

Comments
 (0)