17
17
18
18
import com .google .protobuf .ProtocolStringList ;
19
19
import com .yelp .nrtsearch .server .grpc .AddDocumentRequest ;
20
+ import com .yelp .nrtsearch .server .grpc .AddDocumentRequest .MultiValuedField ;
20
21
import com .yelp .nrtsearch .server .grpc .DeadlineUtils ;
21
22
import com .yelp .nrtsearch .server .grpc .FacetHierarchyPath ;
23
+ import com .yelp .nrtsearch .server .luceneserver .Handler .HandlerException ;
22
24
import com .yelp .nrtsearch .server .luceneserver .field .FieldDef ;
23
25
import com .yelp .nrtsearch .server .luceneserver .field .IdFieldDef ;
24
26
import com .yelp .nrtsearch .server .luceneserver .field .IndexableFieldDef ;
27
+ import com .yelp .nrtsearch .server .monitoring .CustomIndexingMetrics ;
25
28
import java .io .IOException ;
26
29
import java .util .ArrayList ;
30
+ import java .util .Arrays ;
27
31
import java .util .HashMap ;
32
+ import java .util .HashSet ;
28
33
import java .util .Iterator ;
29
34
import java .util .List ;
30
35
import java .util .Map ;
36
+ import java .util .Map .Entry ;
31
37
import java .util .Queue ;
38
+ import java .util .Set ;
32
39
import java .util .concurrent .Callable ;
33
40
import java .util .concurrent .LinkedBlockingDeque ;
34
41
import java .util .stream .Collectors ;
42
+ import java .util .stream .Stream ;
35
43
import org .apache .lucene .document .Document ;
44
+ import org .apache .lucene .document .Field ;
36
45
import org .apache .lucene .index .IndexableField ;
46
+ import org .apache .lucene .index .Term ;
37
47
import org .slf4j .Logger ;
38
48
import org .slf4j .LoggerFactory ;
39
49
@@ -45,6 +55,13 @@ public class AddDocumentHandler {
45
55
* context for the AddDocumentRequest including root document and optional child documents if
46
56
* schema contains nested objects
47
57
*/
58
+ /*
59
+ constants matching elasticpipe , only needed for POC. to be deleted.
60
+ */
61
+ private static final String PARTIAL_UPDATE_KEY = "_is_partial_update" ;
62
+
63
+ private static final String PARTIAL_UPDATE_FIELDS = "_partial_update_fields" ;
64
+
48
65
public static class DocumentsContext {
49
66
private final Document rootDocument ;
50
67
private final Map <String , List <Document >> childDocuments ;
@@ -77,8 +94,12 @@ public static DocumentsContext getDocumentsContext(
77
94
AddDocumentRequest addDocumentRequest , IndexState indexState )
78
95
throws AddDocumentHandlerException {
79
96
DocumentsContext documentsContext = new DocumentsContext ();
80
- Map <String , AddDocumentRequest .MultiValuedField > fields = addDocumentRequest .getFieldsMap ();
81
- for (Map .Entry <String , AddDocumentRequest .MultiValuedField > entry : fields .entrySet ()) {
97
+ Map <String , MultiValuedField > fields = addDocumentRequest .getFieldsMap ();
98
+ for (Entry <String , MultiValuedField > entry : fields .entrySet ()) {
99
+ if (entry .getKey ().equals (PARTIAL_UPDATE_KEY )
100
+ || entry .getKey ().equals (PARTIAL_UPDATE_FIELDS )) {
101
+ continue ;
102
+ }
82
103
parseOneField (entry .getKey (), entry .getValue (), documentsContext , indexState );
83
104
}
84
105
@@ -116,7 +137,7 @@ private static void extractFieldNamesForDocument(Document document) {
116
137
/** Parses a field's value, which is a MultiValuedField in all cases */
117
138
private static void parseOneField (
118
139
String fieldName ,
119
- AddDocumentRequest . MultiValuedField value ,
140
+ MultiValuedField value ,
120
141
DocumentsContext documentsContext ,
121
142
IndexState indexState )
122
143
throws AddDocumentHandlerException {
@@ -125,9 +146,7 @@ private static void parseOneField(
125
146
126
147
/** Parse MultiValuedField for a single field, which is always a List<String>. */
127
148
private static void parseMultiValueField (
128
- FieldDef field ,
129
- AddDocumentRequest .MultiValuedField value ,
130
- DocumentsContext documentsContext )
149
+ FieldDef field , MultiValuedField value , DocumentsContext documentsContext )
131
150
throws AddDocumentHandlerException {
132
151
ProtocolStringList fieldValues = value .getValueList ();
133
152
List <FacetHierarchyPath > facetHierarchyPaths = value .getFaceHierarchyPathsList ();
@@ -153,7 +172,7 @@ private static void parseMultiValueField(
153
172
}
154
173
}
155
174
156
- public static class AddDocumentHandlerException extends Handler . HandlerException {
175
+ public static class AddDocumentHandlerException extends HandlerException {
157
176
public AddDocumentHandlerException (String errorMessage ) {
158
177
super (errorMessage );
159
178
}
@@ -181,6 +200,40 @@ public DocumentIndexer(
181
200
this .indexName = indexName ;
182
201
}
183
202
203
+ private static boolean isPartialUpdate (AddDocumentRequest addDocumentRequest ) {
204
+ return addDocumentRequest .getFieldsMap ().containsKey (PARTIAL_UPDATE_KEY )
205
+ && Boolean .parseBoolean (
206
+ addDocumentRequest .getFieldsMap ().get (PARTIAL_UPDATE_KEY ).getValue (0 ));
207
+ }
208
+
209
+ private static Set <String > getPartialUpdateFields (AddDocumentRequest addDocumentRequest ) {
210
+ Set <String > partialUpdateFields = new HashSet <>();
211
+ MultiValuedField field = addDocumentRequest .getFieldsMap ().get (PARTIAL_UPDATE_FIELDS );
212
+ if (field != null ) {
213
+ // For some weird reasons, the passed hashset from Elasticpipe like [inactive] , is coming
214
+ // literally as "[inactive]"
215
+ // and not as [inactive]. Which means that the beginning [ and ending ] are part of the
216
+ // string, whereas they should
217
+ // otherwise represent the hashset/list of items. So, we need to remove the first and last
218
+ // character from the string
219
+ List <String > cleansedValues =
220
+ field .getValueList ().stream ()
221
+ .map (value -> value .substring (1 , value .length () - 1 )) // Remove enclosing brackets
222
+ .flatMap (
223
+ value -> {
224
+ if (value .contains ("," )) {
225
+ return Arrays .stream (value .split ("," ));
226
+ } else {
227
+ return Stream .of (value );
228
+ }
229
+ })
230
+ .map (String ::trim ) // Trim each element
231
+ .collect (Collectors .toList ());
232
+ partialUpdateFields .addAll (cleansedValues );
233
+ }
234
+ return partialUpdateFields ;
235
+ }
236
+
184
237
public long runIndexingJob () throws Exception {
185
238
DeadlineUtils .checkDeadline ("DocumentIndexer: runIndexingJob" , "INDEXING" );
186
239
@@ -192,16 +245,44 @@ public long runIndexingJob() throws Exception {
192
245
IndexState indexState ;
193
246
ShardState shardState ;
194
247
IdFieldDef idFieldDef ;
195
-
248
+ String ad_bid_id = "" ;
196
249
try {
197
250
indexState = globalState .getIndex (this .indexName );
198
251
shardState = indexState .getShard (0 );
199
252
idFieldDef = indexState .getIdFieldDef ().orElse (null );
200
253
for (AddDocumentRequest addDocumentRequest : addDocumentRequestList ) {
254
+ boolean partialUpdate = isPartialUpdate (addDocumentRequest );
255
+ final Set <String > partialUpdateFields ;
256
+ if (partialUpdate ) {
257
+ // removing all fields except rtb fields for the POC , for the actual implementation
258
+ // we will only be getting the fields that need to be updated
259
+ partialUpdateFields = getPartialUpdateFields (addDocumentRequest );
260
+ Map <String , MultiValuedField > docValueFields =
261
+ getDocValueFieldsForUpdateCall (addDocumentRequest , partialUpdateFields );
262
+ ad_bid_id = addDocumentRequest .getFieldsMap ().get ("ad_bid_id" ).getValue (0 );
263
+ addDocumentRequest =
264
+ AddDocumentRequest .newBuilder ().putAllFields (docValueFields ).build ();
265
+ } else {
266
+ partialUpdateFields = new HashSet <>();
267
+ }
268
+
201
269
DocumentsContext documentsContext =
202
- AddDocumentHandler .LuceneDocumentBuilder .getDocumentsContext (
203
- addDocumentRequest , indexState );
270
+ LuceneDocumentBuilder .getDocumentsContext (addDocumentRequest , indexState );
271
+
272
+ /*
273
+ if this is a partial update request, we need the only the partial update docValue fields from
274
+ documentcontext.
275
+ */
276
+ List <IndexableField > partialUpdateDocValueFields = new ArrayList <>();
277
+ if (partialUpdate ) {
278
+ partialUpdateDocValueFields =
279
+ documentsContext .getRootDocument ().getFields ().stream ()
280
+ .filter (f -> partialUpdateFields .contains (f .name ()))
281
+ .toList ();
282
+ }
283
+
204
284
if (documentsContext .hasNested ()) {
285
+ logger .info ("Indexing nested documents for ad_bid_id: {}" , ad_bid_id );
205
286
try {
206
287
if (idFieldDef != null ) {
207
288
// update documents in the queue to keep order
@@ -222,7 +303,24 @@ public long runIndexingJob() throws Exception {
222
303
throw new IOException (e );
223
304
}
224
305
} else {
225
- documents .add (documentsContext .getRootDocument ());
306
+ if (partialUpdate ) {
307
+ CustomIndexingMetrics .updateDocValuesRequestsReceived .labels (indexName ).inc ();
308
+ Term term = new Term (idFieldDef .getName (), ad_bid_id );
309
+ // executing the partial update
310
+ logger .debug (
311
+ "running a partial update for the ad_bid_id: {} and fields {} in the thread {}" ,
312
+ ad_bid_id ,
313
+ partialUpdateDocValueFields ,
314
+ Thread .currentThread ().getName () + Thread .currentThread ().threadId ());
315
+ long nanoTime = System .nanoTime ();
316
+ shardState .writer .updateDocValues (
317
+ term , partialUpdateDocValueFields .toArray (new Field [0 ]));
318
+ CustomIndexingMetrics .updateDocValuesLatency
319
+ .labels (indexName )
320
+ .set ((System .nanoTime () - nanoTime ));
321
+ } else {
322
+ documents .add (documentsContext .getRootDocument ());
323
+ }
226
324
}
227
325
}
228
326
} catch (Exception e ) {
@@ -252,6 +350,15 @@ public long runIndexingJob() throws Exception {
252
350
return shardState .writer .getMaxCompletedSequenceNumber ();
253
351
}
254
352
353
+ private static Map <String , MultiValuedField > getDocValueFieldsForUpdateCall (
354
+ AddDocumentRequest addDocumentRequest , Set <String > partialUpdateFields ) {
355
+ Map <String , MultiValuedField > docValueFields =
356
+ addDocumentRequest .getFieldsMap ().entrySet ().stream ()
357
+ .filter (e -> partialUpdateFields .contains (e .getKey ()))
358
+ .collect (Collectors .toMap (Entry ::getKey , Entry ::getValue ));
359
+ return docValueFields ;
360
+ }
361
+
255
362
/**
256
363
* update documents with nested objects
257
364
*
@@ -267,7 +374,7 @@ private void updateNestedDocuments(
267
374
ShardState shardState )
268
375
throws IOException {
269
376
List <Document > documents = new ArrayList <>();
270
- for (Map . Entry <String , List <Document >> e : documentsContext .getChildDocuments ().entrySet ()) {
377
+ for (Entry <String , List <Document >> e : documentsContext .getChildDocuments ().entrySet ()) {
271
378
documents .addAll (
272
379
e .getValue ().stream ()
273
380
.map (v -> handleFacets (indexState , shardState , v ))
@@ -282,7 +389,12 @@ private void updateNestedDocuments(
282
389
}
283
390
284
391
documents .add (rootDoc );
392
+ CustomIndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc ();
393
+ long nanoTime = System .nanoTime ();
285
394
shardState .writer .updateDocuments (idFieldDef .getTerm (rootDoc ), documents );
395
+ CustomIndexingMetrics .addDocumentLatency
396
+ .labels (indexName )
397
+ .set ((System .nanoTime () - nanoTime ));
286
398
}
287
399
288
400
/**
@@ -296,15 +408,20 @@ private void addNestedDocuments(
296
408
DocumentsContext documentsContext , IndexState indexState , ShardState shardState )
297
409
throws IOException {
298
410
List <Document > documents = new ArrayList <>();
299
- for (Map . Entry <String , List <Document >> e : documentsContext .getChildDocuments ().entrySet ()) {
411
+ for (Entry <String , List <Document >> e : documentsContext .getChildDocuments ().entrySet ()) {
300
412
documents .addAll (
301
413
e .getValue ().stream ()
302
414
.map (v -> handleFacets (indexState , shardState , v ))
303
415
.collect (Collectors .toList ()));
304
416
}
305
417
Document rootDoc = handleFacets (indexState , shardState , documentsContext .getRootDocument ());
306
418
documents .add (rootDoc );
419
+ CustomIndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc ();
420
+ long nanoTime = System .nanoTime ();
307
421
shardState .writer .addDocuments (documents );
422
+ CustomIndexingMetrics .addDocumentLatency
423
+ .labels (indexName )
424
+ .set ((System .nanoTime () - nanoTime ));
308
425
}
309
426
310
427
private void updateDocuments (
@@ -314,8 +431,13 @@ private void updateDocuments(
314
431
ShardState shardState )
315
432
throws IOException {
316
433
for (Document nextDoc : documents ) {
434
+ CustomIndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc ();
435
+ long nanoTime = System .nanoTime ();
317
436
nextDoc = handleFacets (indexState , shardState , nextDoc );
318
437
shardState .writer .updateDocument (idFieldDef .getTerm (nextDoc ), nextDoc );
438
+ CustomIndexingMetrics .addDocumentLatency
439
+ .labels (indexName )
440
+ .set ((System .nanoTime () - nanoTime ));
319
441
}
320
442
}
321
443
@@ -326,6 +448,8 @@ private void addDocuments(
326
448
throw new IllegalStateException (
327
449
"Adding documents to an index on a replica node is not supported" );
328
450
}
451
+ CustomIndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc (documents .size ());
452
+ long nanoTime = System .nanoTime ();
329
453
shardState .writer .addDocuments (
330
454
(Iterable <Document >)
331
455
() ->
@@ -349,6 +473,9 @@ public Document next() {
349
473
return nextDoc ;
350
474
}
351
475
});
476
+ CustomIndexingMetrics .addDocumentLatency
477
+ .labels (indexName )
478
+ .set ((System .nanoTime () - nanoTime ) / documents .size ());
352
479
}
353
480
354
481
private Document handleFacets (IndexState indexState , ShardState shardState , Document nextDoc ) {
0 commit comments