Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ private void registerMetrics(GlobalState globalState) {
new ProcStatCollector().register(collectorRegistry);
new MergeSchedulerCollector(globalState).register(collectorRegistry);
new SearchResponseCollector(globalState).register(collectorRegistry);

CustomIndexingMetrics.register(collectorRegistry);
}

/** Main launches the server from the command line. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@

import com.google.protobuf.ProtocolStringList;
import com.yelp.nrtsearch.server.grpc.AddDocumentRequest;
import com.yelp.nrtsearch.server.grpc.AddDocumentRequest.MultiValuedField;
import com.yelp.nrtsearch.server.grpc.DeadlineUtils;
import com.yelp.nrtsearch.server.grpc.FacetHierarchyPath;
import com.yelp.nrtsearch.server.luceneserver.Handler.HandlerException;
import com.yelp.nrtsearch.server.luceneserver.field.FieldDef;
import com.yelp.nrtsearch.server.luceneserver.field.IdFieldDef;
import com.yelp.nrtsearch.server.luceneserver.field.IndexableFieldDef;
import com.yelp.nrtsearch.server.monitoring.CustomIndexingMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +55,13 @@ public class AddDocumentHandler {
* context for the AddDocumentRequest including root document and optional child documents if
* schema contains nested objects
*/
/*
constants matching elasticpipe , only needed for POC. to be deleted.
*/
private static final String PARTIAL_UPDATE_KEY = "_is_partial_update";

private static final String PARTIAL_UPDATE_FIELDS = "_partial_update_fields";

public static class DocumentsContext {
private final Document rootDocument;
private final Map<String, List<Document>> childDocuments;
Expand Down Expand Up @@ -77,8 +94,12 @@ public static DocumentsContext getDocumentsContext(
AddDocumentRequest addDocumentRequest, IndexState indexState)
throws AddDocumentHandlerException {
DocumentsContext documentsContext = new DocumentsContext();
Map<String, AddDocumentRequest.MultiValuedField> fields = addDocumentRequest.getFieldsMap();
for (Map.Entry<String, AddDocumentRequest.MultiValuedField> entry : fields.entrySet()) {
Map<String, MultiValuedField> fields = addDocumentRequest.getFieldsMap();
for (Entry<String, MultiValuedField> entry : fields.entrySet()) {
if (entry.getKey().equals(PARTIAL_UPDATE_KEY)
|| entry.getKey().equals(PARTIAL_UPDATE_FIELDS)) {
continue;
}
parseOneField(entry.getKey(), entry.getValue(), documentsContext, indexState);
}

Expand Down Expand Up @@ -116,7 +137,7 @@ private static void extractFieldNamesForDocument(Document document) {
/** Parses a field's value, which is a MultiValuedField in all cases */
private static void parseOneField(
String fieldName,
AddDocumentRequest.MultiValuedField value,
MultiValuedField value,
DocumentsContext documentsContext,
IndexState indexState)
throws AddDocumentHandlerException {
Expand All @@ -125,9 +146,7 @@ private static void parseOneField(

/** Parse MultiValuedField for a single field, which is always a List<String>. */
private static void parseMultiValueField(
FieldDef field,
AddDocumentRequest.MultiValuedField value,
DocumentsContext documentsContext)
FieldDef field, MultiValuedField value, DocumentsContext documentsContext)
throws AddDocumentHandlerException {
ProtocolStringList fieldValues = value.getValueList();
List<FacetHierarchyPath> facetHierarchyPaths = value.getFaceHierarchyPathsList();
Expand All @@ -153,7 +172,7 @@ private static void parseMultiValueField(
}
}

public static class AddDocumentHandlerException extends Handler.HandlerException {
public static class AddDocumentHandlerException extends HandlerException {
public AddDocumentHandlerException(String errorMessage) {
super(errorMessage);
}
Expand Down Expand Up @@ -181,6 +200,40 @@ public DocumentIndexer(
this.indexName = indexName;
}

private static boolean isPartialUpdate(AddDocumentRequest addDocumentRequest) {
return addDocumentRequest.getFieldsMap().containsKey(PARTIAL_UPDATE_KEY)
&& Boolean.parseBoolean(
addDocumentRequest.getFieldsMap().get(PARTIAL_UPDATE_KEY).getValue(0));
}

private static Set<String> getPartialUpdateFields(AddDocumentRequest addDocumentRequest) {
Set<String> partialUpdateFields = new HashSet<>();
MultiValuedField field = addDocumentRequest.getFieldsMap().get(PARTIAL_UPDATE_FIELDS);
if (field != null) {
// For some weird reasons, the passed hashset from Elasticpipe like [inactive] , is coming
// literally as "[inactive]"
// and not as [inactive]. Which means that the beginning [ and ending ] are part of the
// string, whereas they should
// otherwise represent the hashset/list of items. So, we need to remove the first and last
// character from the string
List<String> cleansedValues =
field.getValueList().stream()
.map(value -> value.substring(1, value.length() - 1)) // Remove enclosing brackets
.flatMap(
value -> {
if (value.contains(",")) {
return Arrays.stream(value.split(","));
} else {
return Stream.of(value);
}
})
.map(String::trim) // Trim each element
.collect(Collectors.toList());
partialUpdateFields.addAll(cleansedValues);
}
return partialUpdateFields;
}

public long runIndexingJob() throws Exception {
DeadlineUtils.checkDeadline("DocumentIndexer: runIndexingJob", "INDEXING");

Expand All @@ -192,16 +245,44 @@ public long runIndexingJob() throws Exception {
IndexState indexState;
ShardState shardState;
IdFieldDef idFieldDef;

String ad_bid_id = "";
try {
indexState = globalState.getIndex(this.indexName);
shardState = indexState.getShard(0);
idFieldDef = indexState.getIdFieldDef().orElse(null);
for (AddDocumentRequest addDocumentRequest : addDocumentRequestList) {
boolean partialUpdate = isPartialUpdate(addDocumentRequest);
final Set<String> partialUpdateFields;
if (partialUpdate) {
// removing all fields except rtb fields for the POC , for the actual implementation
// we will only be getting the fields that need to be updated
partialUpdateFields = getPartialUpdateFields(addDocumentRequest);
Map<String, MultiValuedField> docValueFields =
getDocValueFieldsForUpdateCall(addDocumentRequest, partialUpdateFields);
ad_bid_id = addDocumentRequest.getFieldsMap().get("ad_bid_id").getValue(0);
addDocumentRequest =
AddDocumentRequest.newBuilder().putAllFields(docValueFields).build();
} else {
partialUpdateFields = new HashSet<>();
}

DocumentsContext documentsContext =
AddDocumentHandler.LuceneDocumentBuilder.getDocumentsContext(
addDocumentRequest, indexState);
LuceneDocumentBuilder.getDocumentsContext(addDocumentRequest, indexState);

/*
if this is a partial update request, we need the only the partial update docValue fields from
documentcontext.
*/
List<IndexableField> partialUpdateDocValueFields = new ArrayList<>();
if (partialUpdate) {
partialUpdateDocValueFields =
documentsContext.getRootDocument().getFields().stream()
.filter(f -> partialUpdateFields.contains(f.name()))
.toList();
}

if (documentsContext.hasNested()) {
logger.info("Indexing nested documents for ad_bid_id: {}", ad_bid_id);
try {
if (idFieldDef != null) {
// update documents in the queue to keep order
Expand All @@ -222,7 +303,24 @@ public long runIndexingJob() throws Exception {
throw new IOException(e);
}
} else {
documents.add(documentsContext.getRootDocument());
if (partialUpdate) {
CustomIndexingMetrics.updateDocValuesRequestsReceived.labels(indexName).inc();
Term term = new Term(idFieldDef.getName(), ad_bid_id);
// executing the partial update
logger.debug(
"running a partial update for the ad_bid_id: {} and fields {} in the thread {}",
ad_bid_id,
partialUpdateDocValueFields,
Thread.currentThread().getName() + Thread.currentThread().threadId());
long nanoTime = System.nanoTime();
shardState.writer.updateDocValues(
term, partialUpdateDocValueFields.toArray(new Field[0]));
CustomIndexingMetrics.updateDocValuesLatency
.labels(indexName)
.set((System.nanoTime() - nanoTime));
} else {
documents.add(documentsContext.getRootDocument());
}
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -252,6 +350,15 @@ public long runIndexingJob() throws Exception {
return shardState.writer.getMaxCompletedSequenceNumber();
}

private static Map<String, MultiValuedField> getDocValueFieldsForUpdateCall(
AddDocumentRequest addDocumentRequest, Set<String> partialUpdateFields) {
Map<String, MultiValuedField> docValueFields =
addDocumentRequest.getFieldsMap().entrySet().stream()
.filter(e -> partialUpdateFields.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
return docValueFields;
}

/**
* update documents with nested objects
*
Expand All @@ -267,7 +374,7 @@ private void updateNestedDocuments(
ShardState shardState)
throws IOException {
List<Document> documents = new ArrayList<>();
for (Map.Entry<String, List<Document>> e : documentsContext.getChildDocuments().entrySet()) {
for (Entry<String, List<Document>> e : documentsContext.getChildDocuments().entrySet()) {
documents.addAll(
e.getValue().stream()
.map(v -> handleFacets(indexState, shardState, v))
Expand All @@ -282,7 +389,12 @@ private void updateNestedDocuments(
}

documents.add(rootDoc);
CustomIndexingMetrics.addDocumentRequestsReceived.labels(indexName).inc();
long nanoTime = System.nanoTime();
shardState.writer.updateDocuments(idFieldDef.getTerm(rootDoc), documents);
CustomIndexingMetrics.addDocumentLatency
.labels(indexName)
.set((System.nanoTime() - nanoTime));
}

/**
Expand All @@ -296,15 +408,20 @@ private void addNestedDocuments(
DocumentsContext documentsContext, IndexState indexState, ShardState shardState)
throws IOException {
List<Document> documents = new ArrayList<>();
for (Map.Entry<String, List<Document>> e : documentsContext.getChildDocuments().entrySet()) {
for (Entry<String, List<Document>> e : documentsContext.getChildDocuments().entrySet()) {
documents.addAll(
e.getValue().stream()
.map(v -> handleFacets(indexState, shardState, v))
.collect(Collectors.toList()));
}
Document rootDoc = handleFacets(indexState, shardState, documentsContext.getRootDocument());
documents.add(rootDoc);
CustomIndexingMetrics.addDocumentRequestsReceived.labels(indexName).inc();
long nanoTime = System.nanoTime();
shardState.writer.addDocuments(documents);
CustomIndexingMetrics.addDocumentLatency
.labels(indexName)
.set((System.nanoTime() - nanoTime));
}

private void updateDocuments(
Expand All @@ -314,8 +431,13 @@ private void updateDocuments(
ShardState shardState)
throws IOException {
for (Document nextDoc : documents) {
CustomIndexingMetrics.addDocumentRequestsReceived.labels(indexName).inc();
long nanoTime = System.nanoTime();
nextDoc = handleFacets(indexState, shardState, nextDoc);
shardState.writer.updateDocument(idFieldDef.getTerm(nextDoc), nextDoc);
CustomIndexingMetrics.addDocumentLatency
.labels(indexName)
.set((System.nanoTime() - nanoTime));
}
}

Expand All @@ -326,6 +448,8 @@ private void addDocuments(
throw new IllegalStateException(
"Adding documents to an index on a replica node is not supported");
}
CustomIndexingMetrics.addDocumentRequestsReceived.labels(indexName).inc(documents.size());
long nanoTime = System.nanoTime();
shardState.writer.addDocuments(
(Iterable<Document>)
() ->
Expand All @@ -349,6 +473,9 @@ public Document next() {
return nextDoc;
}
});
CustomIndexingMetrics.addDocumentLatency
.labels(indexName)
.set((System.nanoTime() - nanoTime) / documents.size());
}

private Document handleFacets(IndexState indexState, ShardState shardState, Document nextDoc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ public void initWarmer(Archiver archiver, String indexName) {
archiver,
configuration.getServiceName(),
indexName,
warmerConfig.getMaxWarmingQueries());
warmerConfig.getMaxWarmingQueries(),
warmerConfig.getMaxWarmingLuceneQueryOnlyCount());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still use the % settings in your final PR if you prefer

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,47 @@ public SearchResponse handle(IndexState indexState, SearchRequest searchRequest)
return searchResponse;
}

public void handleStrippedWarmingQuery(IndexState indexState, SearchRequest searchRequest) {
SearchContext searchContext;
SearcherTaxonomyManager.SearcherAndTaxonomy s = null;
ShardState shardState = indexState.getShard(0);

try {
s = getSearcherAndTaxonomy(
searchRequest,
indexState,
shardState,
SearchResponse.Diagnostics.newBuilder(),
indexState.getSearchThreadPoolExecutor());

searchContext =
SearchRequestProcessor.buildContextForLuceneQueryOnly(
searchRequest, indexState, shardState, s);

s.searcher.search(searchContext.getQuery(), searchContext.getCollector().getWrappedManager());
} catch (InterruptedException | IOException e) {
logger.warn(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
// NOTE: this is a little iffy, because we may not
// have obtained this searcher from the NRTManager
// (i.e. sometimes we pulled from
// SearcherLifetimeManager, other times (if
// snapshot was specified) we opened ourselves,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not want to warm the query with specific generation, i.e. snapshot. But this is likely a fix in saving criteria.

// but under-the-hood all these methods just call
// s.getIndexReader().decRef(), which is what release
// does:
try {
if (s != null) {
shardState.release(s);
}
} catch (IOException e) {
logger.warn("Failed to release searcher reference previously acquired by acquire()", e);
throw new RuntimeException(e);
}
}
}

/**
* Fetch/compute field values for the top hits. This operation may be done in parallel, based on
* the setting for the fetch thread pool. In addition to filling hit fields, any query {@link
Expand Down
Loading