Skip to content

Commit 4b9d220

Browse files
feat: unexpected eof retry mechanism (#97)
1 parent 7e66db0 commit 4b9d220

File tree

5 files changed

+59
-32
lines changed

5 files changed

+59
-32
lines changed

README.md

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,25 +95,26 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)
9595

9696
### Elasticsearch Specific Configuration
9797

98-
| Variable | Type | Required | Default | Description |
99-
|---------------------------------------------|-------------------|----------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
100-
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
101-
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
102-
| `elasticsearch.username` | string | no | | The username of Elasticsearch |
103-
| `elasticsearch.password` | string | no | | The password of Elasticsearch |
104-
| `elasticsearch.typeName` | string | no | | Defines Elasticsearch index type name |
105-
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
106-
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
98+
| Variable | Type | Required | Default | Description |
99+
|---------------------------------------------|-------------------|----------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
100+
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
101+
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
102+
| `elasticsearch.username` | string | no | | The username of Elasticsearch |
103+
| `elasticsearch.password` | string | no | | The password of Elasticsearch |
104+
| `elasticsearch.typeName` | string | no | | Defines Elasticsearch index type name |
105+
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
106+
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
107107
| `elasticsearch.batchCommitTickerDuration` | time.Duration | no | 0s | Configures checkpoint offset save time, By default, after batch flushing, the offsets are updated immediately, this period can be increased for performance. |
108-
| `elasticsearch.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
109-
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
110-
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
111-
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
112-
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
113-
| `elasticsearch.disableDiscoverNodesOnStart` | boolean | no | false | Disable discover nodes when initializing the client. |
114-
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5m | Discover nodes periodically |
115-
| `elasticsearch.rejectionLog.index` | string | no | cbes-rejects | Rejection log index name. `cbes-rejects` is default. |
116-
| `elasticsearch.rejectionLog.includeSource` | boolean | no | false | Includes rejection log source info. `false` is default. |
108+
| `elasticsearch.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
109+
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
110+
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
111+
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
112+
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
113+
| `elasticsearch.disableDiscoverNodesOnStart` | boolean | no | false | Disable discover nodes when initializing the client. |
114+
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5m | Discover nodes periodically |
115+
| `elasticsearch.rejectionLog.index` | string | no | cbes-rejects | Rejection log index name. `cbes-rejects` is default. |
116+
| `elasticsearch.rejectionLog.includeSource` | boolean | no | false | Includes rejection log source info. `false` is default. |
117+
| `elasticsearch.maxRetries` | int | no | math.MaxInt | Maximum retry count for the Elasticsearch client. |
117118

118119
## Exposed metrics
119120

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"math"
45
"time"
56

67
"github.com/Trendyol/go-dcp/helpers"
@@ -25,6 +26,7 @@ type Elasticsearch struct {
2526
ConcurrentRequest int `yaml:"concurrentRequest"`
2627
CompressionEnabled bool `yaml:"compressionEnabled"`
2728
DisableDiscoverNodesOnStart bool `yaml:"disableDiscoverNodesOnStart"`
29+
MaxRetries int `yaml:"maxRetries"`
2830
}
2931

3032
type RejectionLog struct {
@@ -58,4 +60,8 @@ func (c *Config) ApplyDefaults() {
5860
duration := 5 * time.Minute
5961
c.Elasticsearch.DiscoverNodesInterval = &duration
6062
}
63+
64+
if c.Elasticsearch.MaxRetries == 0 {
65+
c.Elasticsearch.MaxRetries = math.MaxInt
66+
}
6167
}

elasticsearch/bulk/bulk.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package bulk
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
8+
"io"
79
"strings"
810
"sync"
911
"time"
@@ -312,20 +314,34 @@ func (b *Bulk) CheckAndCommit() {
312314
func (b *Bulk) requestFunc(concurrentRequestIndex int, batchItems []BatchItem) func() error {
313315
return func() error {
314316
reader := b.readers[concurrentRequestIndex]
315-
reader.Reset(getBytes(batchItems))
316317
actionsOfBatchItems := getActions(batchItems)
317-
r, err := b.esClient.Bulk(reader)
318-
if err != nil {
319-
b.finalizeProcess(actionsOfBatchItems, fillErrorDataWithBulkRequestError(actionsOfBatchItems, err))
320-
return err
321-
}
322-
errorData, err := hasResponseError(r)
323-
b.finalizeProcess(actionsOfBatchItems, errorData)
318+
batchItemBytes := getBytes(batchItems)
319+
reader.Reset(batchItemBytes)
320+
321+
for attempt := 1; attempt <= b.config.Elasticsearch.MaxRetries; attempt++ {
322+
r, err := b.esClient.Bulk(reader)
323+
if err != nil {
324+
if errors.Is(err, io.ErrUnexpectedEOF) {
325+
logger.Log.Warn(fmt.Sprintf("unexpected eof error in attempt: %d", attempt))
326+
if attempt != b.config.Elasticsearch.MaxRetries {
327+
reader.ResetPositions()
328+
continue
329+
}
330+
}
331+
332+
b.finalizeProcess(actionsOfBatchItems, fillErrorDataWithBulkRequestError(actionsOfBatchItems, err))
333+
return err
334+
}
324335

325-
if err != nil {
326-
return err
336+
errorData, err := hasResponseError(r)
337+
b.finalizeProcess(actionsOfBatchItems, errorData)
338+
if err != nil {
339+
return err
340+
}
341+
return nil
327342
}
328-
return nil
343+
344+
return fmt.Errorf("max retry cannot be 0")
329345
}
330346
}
331347

elasticsearch/client/client.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package client
22

33
import (
4-
"math"
5-
64
"github.com/Trendyol/go-dcp/logger"
75

86
"github.com/Trendyol/go-dcp-elasticsearch/config"
@@ -14,7 +12,7 @@ func NewElasticClient(config *config.Config) (*elasticsearch.Client, error) {
1412
es, err := elasticsearch.NewClient(elasticsearch.Config{
1513
Username: config.Elasticsearch.Username,
1614
Password: config.Elasticsearch.Password,
17-
MaxRetries: math.MaxInt,
15+
MaxRetries: config.Elasticsearch.MaxRetries,
1816
Addresses: config.Elasticsearch.Urls,
1917
Transport: newTransport(config.Elasticsearch),
2018
CompressRequestBody: config.Elasticsearch.CompressionEnabled,

helper/multidim_byte_reader.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ func (r *MultiDimByteReader) Reset(b [][]byte) {
4646
*r = MultiDimByteReader{b, 0, 0, getLen(b, 0), len(b)}
4747
}
4848

49+
func (r *MultiDimByteReader) ResetPositions() {
50+
r.currentSliceLen = getLen(r.s, 0)
51+
r.currentSliceIndex = 0
52+
r.currentIndexInCurrentSlice = 0
53+
}
54+
4955
func NewMultiDimByteReader(b [][]byte) *MultiDimByteReader {
5056
return &MultiDimByteReader{b, 0, 0, getLen(b, 0), len(b)}
5157
}

0 commit comments

Comments
 (0)