17
17
18
18
import com .google .protobuf .ProtocolStringList ;
19
19
import com .yelp .nrtsearch .server .grpc .AddDocumentRequest ;
20
+ import com .yelp .nrtsearch .server .grpc .AddDocumentRequest .MultiValuedField ;
20
21
import com .yelp .nrtsearch .server .grpc .DeadlineUtils ;
21
22
import com .yelp .nrtsearch .server .grpc .FacetHierarchyPath ;
23
+ import com .yelp .nrtsearch .server .grpc .IndexingRequestType ;
22
24
import com .yelp .nrtsearch .server .luceneserver .field .FieldDef ;
23
25
import com .yelp .nrtsearch .server .luceneserver .field .IdFieldDef ;
24
26
import com .yelp .nrtsearch .server .luceneserver .field .IndexableFieldDef ;
27
+ import com .yelp .nrtsearch .server .luceneserver .field .properties .DocValueUpdatable ;
28
+ import com .yelp .nrtsearch .server .monitoring .IndexingMetrics ;
25
29
import java .io .IOException ;
26
30
import java .util .ArrayList ;
27
31
import java .util .HashMap ;
33
37
import java .util .concurrent .LinkedBlockingDeque ;
34
38
import java .util .stream .Collectors ;
35
39
import org .apache .lucene .document .Document ;
40
+ import org .apache .lucene .document .Field ;
36
41
import org .apache .lucene .index .IndexableField ;
42
+ import org .apache .lucene .index .Term ;
37
43
import org .slf4j .Logger ;
38
44
import org .slf4j .LoggerFactory ;
39
45
@@ -77,6 +83,8 @@ public static DocumentsContext getDocumentsContext(
77
83
AddDocumentRequest addDocumentRequest , IndexState indexState )
78
84
throws AddDocumentHandlerException {
79
85
DocumentsContext documentsContext = new DocumentsContext ();
86
+ if (addDocumentRequest .getRequestType ().equals (IndexingRequestType .UPDATE_DOCUMENT ))
87
+ return documentsContext ;
80
88
Map <String , AddDocumentRequest .MultiValuedField > fields = addDocumentRequest .getFieldsMap ();
81
89
for (Map .Entry <String , AddDocumentRequest .MultiValuedField > entry : fields .entrySet ()) {
82
90
parseOneField (entry .getKey (), entry .getValue (), documentsContext , indexState );
@@ -222,7 +230,12 @@ public long runIndexingJob() throws Exception {
222
230
throw new IOException (e );
223
231
}
224
232
} else {
225
- documents .add (documentsContext .getRootDocument ());
233
+ if (addDocumentRequest .getRequestType ().equals (IndexingRequestType .UPDATE_DOCUMENT )) {
234
+ executeDocValueUpdateRequest (
235
+ documentsContext , indexState , shardState , addDocumentRequest );
236
+ } else {
237
+ documents .add (documentsContext .getRootDocument ());
238
+ }
226
239
}
227
240
}
228
241
} catch (Exception e ) {
@@ -252,6 +265,54 @@ public long runIndexingJob() throws Exception {
252
265
return shardState .writer .getMaxCompletedSequenceNumber ();
253
266
}
254
267
268
+ private void executeDocValueUpdateRequest (
269
+ DocumentsContext documentsContext ,
270
+ IndexState indexState ,
271
+ ShardState shardState ,
272
+ AddDocumentRequest addDocumentRequest ) {
273
+ try {
274
+ IndexingMetrics .updateDocValuesRequestsReceived .labels (indexName ).inc ();
275
+ List <Field > updatableDocValueFields = new ArrayList <>();
276
+ Term term = null ;
277
+ for (Map .Entry <String , MultiValuedField > entry :
278
+ addDocumentRequest .getFieldsMap ().entrySet ()) {
279
+ FieldDef field = indexState .getField (entry .getKey ());
280
+ if (field .getName ().equals (indexState .getIdFieldDef ().get ().getName ())) {
281
+
282
+ String idFieldName = indexState .getIdFieldDef ().get ().getName ();
283
+ String idFieldValue = addDocumentRequest .getFieldsMap ().get (idFieldName ).getValue (0 );
284
+
285
+ if (idFieldValue == null || idFieldValue .isEmpty ()) {
286
+ throw new IllegalArgumentException (
287
+ String .format ("the _ID should have a value set to execute update DocValue" ));
288
+ }
289
+ term = new Term (idFieldName , idFieldValue );
290
+ continue ;
291
+ }
292
+ if (!(field instanceof DocValueUpdatable updatable ) || !(updatable .isUpdatable ())) {
293
+ throw new IllegalArgumentException (
294
+ String .format ("Field: %s is not updatable" , field .getName ()));
295
+ }
296
+ updatableDocValueFields .add (
297
+ ((DocValueUpdatable ) field ).getUpdatableDocValueField (entry .getValue ().getValue (0 )));
298
+ }
299
+ if (term == null ) {
300
+ throw new RuntimeException ("_ ID field should be present for the update request" );
301
+ }
302
+ long nanoTime = System .nanoTime ();
303
+ shardState .writer .updateDocValues (term , updatableDocValueFields .toArray (new Field [0 ]));
304
+ IndexingMetrics .updateDocValuesLatency
305
+ .labels (indexName )
306
+ .set ((System .nanoTime () - nanoTime ));
307
+ } catch (Throwable t ) {
308
+ logger .warn (
309
+ String .format (
310
+ "ThreadId: %s, IndexWriter.updateDocValues failed" ,
311
+ Thread .currentThread ().getName () + Thread .currentThread ().threadId ()));
312
+ throw new RuntimeException ("Error occurred when updating docValues " , t );
313
+ }
314
+ }
315
+
255
316
/**
256
317
* update documents with nested objects
257
318
*
@@ -315,7 +376,10 @@ private void updateDocuments(
315
376
throws IOException {
316
377
for (Document nextDoc : documents ) {
317
378
nextDoc = handleFacets (indexState , shardState , nextDoc );
379
+ IndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc ();
380
+ long nanoTime = System .nanoTime ();
318
381
shardState .writer .updateDocument (idFieldDef .getTerm (nextDoc ), nextDoc );
382
+ IndexingMetrics .addDocumentLatency .labels (indexName ).set ((System .nanoTime () - nanoTime ));
319
383
}
320
384
}
321
385
@@ -326,6 +390,8 @@ private void addDocuments(
326
390
throw new IllegalStateException (
327
391
"Adding documents to an index on a replica node is not supported" );
328
392
}
393
+ IndexingMetrics .addDocumentRequestsReceived .labels (indexName ).inc (documents .size ());
394
+ long nanoTime = System .nanoTime ();
329
395
shardState .writer .addDocuments (
330
396
(Iterable <Document >)
331
397
() ->
@@ -349,6 +415,11 @@ public Document next() {
349
415
return nextDoc ;
350
416
}
351
417
});
418
+ if (documents .size () >= 1 ) {
419
+ IndexingMetrics .addDocumentLatency
420
+ .labels (indexName )
421
+ .set ((System .nanoTime () - nanoTime ) / documents .size ());
422
+ }
352
423
}
353
424
354
425
private Document handleFacets (IndexState indexState , ShardState shardState , Document nextDoc ) {
0 commit comments