21
21
import com .yelp .nrtsearch .server .field .FieldDef ;
22
22
import com .yelp .nrtsearch .server .field .IdFieldDef ;
23
23
import com .yelp .nrtsearch .server .field .IndexableFieldDef ;
24
+ import com .yelp .nrtsearch .server .field .properties .DocValueUpdatable ;
24
25
import com .yelp .nrtsearch .server .grpc .AddDocumentRequest ;
26
+ import com .yelp .nrtsearch .server .grpc .AddDocumentRequest .MultiValuedField ;
25
27
import com .yelp .nrtsearch .server .grpc .AddDocumentResponse ;
26
28
import com .yelp .nrtsearch .server .grpc .DeadlineUtils ;
27
29
import com .yelp .nrtsearch .server .grpc .FacetHierarchyPath ;
30
+ import com .yelp .nrtsearch .server .grpc .IndexingRequestType ;
28
31
import com .yelp .nrtsearch .server .index .IndexState ;
29
32
import com .yelp .nrtsearch .server .index .ShardState ;
33
+ import com .yelp .nrtsearch .server .monitoring .IndexingMetrics ;
30
34
import com .yelp .nrtsearch .server .state .GlobalState ;
31
35
import io .grpc .Context ;
32
36
import io .grpc .Status ;
46
50
import java .util .concurrent .RejectedExecutionException ;
47
51
import java .util .stream .Collectors ;
48
52
import org .apache .lucene .document .Document ;
53
+ import org .apache .lucene .document .Field ;
49
54
import org .apache .lucene .index .IndexableField ;
55
+ import org .apache .lucene .index .Term ;
50
56
import org .slf4j .Logger ;
51
57
import org .slf4j .LoggerFactory ;
52
58
@@ -376,6 +382,8 @@ public AddDocumentHandlerException(Throwable err) {
376
382
}
377
383
378
384
public static class DocumentIndexer implements Callable <Long > {
385
+
386
+ public static final double ONE_MILLION = 1000000.0 ;
379
387
private final GlobalState globalState ;
380
388
private final List <AddDocumentRequest > addDocumentRequestList ;
381
389
private final String indexName ;
@@ -406,6 +414,10 @@ public long runIndexingJob() throws Exception {
406
414
shardState = indexState .getShard (0 );
407
415
idFieldDef = indexState .getIdFieldDef ().orElse (null );
408
416
for (AddDocumentRequest addDocumentRequest : addDocumentRequestList ) {
417
+ if (addDocumentRequest .getRequestType ().equals (IndexingRequestType .UPDATE_DOC_VALUES )) {
418
+ executeDocValueUpdateRequest (indexState , shardState , addDocumentRequest );
419
+ continue ;
420
+ }
409
421
DocumentsContext documentsContext =
410
422
AddDocumentHandler .LuceneDocumentBuilder .getDocumentsContext (
411
423
addDocumentRequest , indexState );
@@ -460,6 +472,67 @@ public long runIndexingJob() throws Exception {
460
472
return shardState .writer .getMaxCompletedSequenceNumber ();
461
473
}
462
474
475
+ private void executeDocValueUpdateRequest (
476
+ IndexState indexState , ShardState shardState , AddDocumentRequest addDocumentRequest ) {
477
+ try {
478
+ IndexingMetrics .updateDocValuesRequestsReceived .labelValues (indexName ).inc ();
479
+ Term term = buildTermForDocValueUpdate (indexState , addDocumentRequest );
480
+ List <Field > updatableDocValueFields = new ArrayList <>();
481
+ for (Map .Entry <String , MultiValuedField > entry :
482
+ addDocumentRequest .getFieldsMap ().entrySet ()) {
483
+ FieldDef field = indexState .getField (entry .getKey ());
484
+ if (field == null ) {
485
+ throw new IllegalArgumentException (
486
+ String .format ("Field: %s is not registered" , entry .getKey ()));
487
+ }
488
+ if (field .getName ().equals (indexState .getIdFieldDef ().get ().getName ())) continue ;
489
+
490
+ if (!(field instanceof DocValueUpdatable updatable ) || !(updatable .isUpdatable ())) {
491
+ throw new IllegalArgumentException (
492
+ String .format ("Field: %s is not updatable" , field .getName ()));
493
+ }
494
+ if (entry .getValue ().getValueCount () > 0 ) {
495
+ updatableDocValueFields .add (
496
+ ((DocValueUpdatable ) field )
497
+ .getUpdatableDocValueField (entry .getValue ().getValueList ()));
498
+ }
499
+ }
500
+
501
+ if (updatableDocValueFields .size () > 0 ) {
502
+ long ns_start = System .nanoTime ();
503
+ shardState .writer .updateDocValues (term , updatableDocValueFields .toArray (new Field [0 ]));
504
+ IndexingMetrics .updateDocValuesLatency
505
+ .labelValues (indexName )
506
+ .observe ((System .nanoTime () - ns_start ) / ONE_MILLION );
507
+ }
508
+ } catch (Throwable t ) {
509
+ logger .warn (
510
+ String .format (
511
+ "ThreadId: %s, IndexWriter.updateDocValues failed" ,
512
+ Thread .currentThread ().getName () + Thread .currentThread ().threadId ()));
513
+ throw new RuntimeException ("Error occurred when updating docValues " , t );
514
+ }
515
+ }
516
+
517
+ private static Term buildTermForDocValueUpdate (
518
+ IndexState indexState , AddDocumentRequest addDocumentRequest ) {
519
+ if (indexState .getIdFieldDef ().isEmpty ()) {
520
+ throw new RuntimeException (
521
+ " Index needs to have an ID field to execute update DocValue request" );
522
+ }
523
+ String idFieldName = indexState .getIdFieldDef ().get ().getName ();
524
+ if (addDocumentRequest .getFieldsMap ().get (idFieldName ).getValueCount () == 0 ) {
525
+ throw new IllegalArgumentException (
526
+ String .format ("the _ID should have a value set to execute update DocValue" ));
527
+ }
528
+ String idFieldValue = addDocumentRequest .getFieldsMap ().get (idFieldName ).getValue (0 );
529
+
530
+ if (idFieldValue == null || idFieldValue .isEmpty ()) {
531
+ throw new IllegalArgumentException (String .format ("the value of _ID field cannot be emtpy" ));
532
+ }
533
+ return new Term (idFieldName , idFieldValue );
534
+ }
535
+
463
536
/**
464
537
* update documents with nested objects
465
538
*
@@ -488,7 +561,12 @@ private void updateNestedDocuments(
488
561
}
489
562
490
563
documents .add (rootDoc );
564
+ IndexingMetrics .addDocumentRequestsReceived .labelValues (indexName ).inc ();
565
+ long ns_start = System .nanoTime ();
491
566
shardState .writer .updateDocuments (idFieldDef .getTerm (rootDoc ), documents );
567
+ IndexingMetrics .addDocumentLatency
568
+ .labelValues (indexName )
569
+ .observe ((System .nanoTime () - ns_start ) / ONE_MILLION );
492
570
}
493
571
494
572
/**
@@ -508,7 +586,12 @@ private void addNestedDocuments(
508
586
}
509
587
Document rootDoc = handleFacets (indexState , shardState , documentsContext .getRootDocument ());
510
588
documents .add (rootDoc );
589
+ IndexingMetrics .addDocumentRequestsReceived .labelValues (indexName ).inc ();
590
+ long ns_start = System .nanoTime ();
511
591
shardState .writer .addDocuments (documents );
592
+ IndexingMetrics .addDocumentLatency
593
+ .labelValues (indexName )
594
+ .observe ((System .nanoTime () - ns_start ) / ONE_MILLION );
512
595
}
513
596
514
597
private void updateDocuments (
@@ -523,7 +606,12 @@ private void updateDocuments(
523
606
}
524
607
for (Document nextDoc : documents ) {
525
608
nextDoc = handleFacets (indexState , shardState , nextDoc );
609
+ IndexingMetrics .addDocumentRequestsReceived .labelValues (indexName ).inc ();
610
+ long ns_start = System .nanoTime ();
526
611
shardState .writer .updateDocument (idFieldDef .getTerm (nextDoc ), nextDoc );
612
+ IndexingMetrics .addDocumentLatency
613
+ .labelValues (indexName )
614
+ .observe ((System .nanoTime () - ns_start ) / ONE_MILLION );
527
615
}
528
616
}
529
617
@@ -534,6 +622,9 @@ private void addDocuments(
534
622
throw new IllegalStateException (
535
623
"Adding documents to an index on a replica node is not supported" );
536
624
}
625
+ if (documents != null )
626
+ IndexingMetrics .addDocumentRequestsReceived .labelValues (indexName ).inc (documents .size ());
627
+ long ns_start = System .nanoTime ();
537
628
shardState .writer .addDocuments (
538
629
(Iterable <Document >)
539
630
() ->
@@ -557,6 +648,11 @@ public Document next() {
557
648
return nextDoc ;
558
649
}
559
650
});
651
+ if (documents != null && documents .size () >= 1 ) {
652
+ IndexingMetrics .addDocumentLatency
653
+ .labelValues (indexName )
654
+ .observe ((System .nanoTime () - ns_start ) / ONE_MILLION / documents .size ());
655
+ }
560
656
}
561
657
562
658
private Document handleFacets (IndexState indexState , ShardState shardState , Document nextDoc ) {
0 commit comments