Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ java {
}

allprojects {
version = '1.0.0-beta.8'
version = '1.0.0-beta.9'
group = 'com.yelp.nrtsearch'
}

Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/yelp/nrtsearch/server/index/IndexState.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import com.yelp.nrtsearch.server.field.ObjectFieldDef;
import com.yelp.nrtsearch.server.field.TextBaseFieldDef;
import com.yelp.nrtsearch.server.field.properties.GlobalOrdinalable;
import com.yelp.nrtsearch.server.grpc.*;
import com.yelp.nrtsearch.server.grpc.Field;
import com.yelp.nrtsearch.server.grpc.FieldType;
import com.yelp.nrtsearch.server.grpc.IndexStateInfo;
import com.yelp.nrtsearch.server.grpc.Mode;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.nrt.NrtDataManager;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.state.GlobalState;
Expand All @@ -39,7 +43,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -258,7 +266,8 @@ public void initWarmer(RemoteBackend remoteBackend, String indexName) {
remoteBackend,
configuration.getServiceName(),
indexName,
warmerConfig.getMaxWarmingQueries());
warmerConfig.getMaxWarmingQueries(),
warmerConfig.getWarmBasicQueryOnlyPerc());
}
}

Expand Down
36 changes: 32 additions & 4 deletions src/main/java/com/yelp/nrtsearch/server/warming/Warmer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -52,14 +53,27 @@ public class Warmer {
private final ReservoirSampler reservoirSampler;
private final String index;
private final int maxWarmingQueries;
private final int warmBasicQueryOnlyPerc;
protected final ThreadLocal<Random> randomThreadLocal;

public Warmer(RemoteBackend remoteBackend, String service, String index, int maxWarmingQueries) {
this(remoteBackend, service, index, maxWarmingQueries, 0);
}

public Warmer(
RemoteBackend remoteBackend,
String service,
String index,
int maxWarmingQueries,
int warmBasicQueryOnlyPerc) {
this.remoteBackend = remoteBackend;
this.service = service;
this.index = index;
this.warmingRequests = Collections.synchronizedList(new ArrayList<>(maxWarmingQueries));
this.reservoirSampler = new ReservoirSampler(maxWarmingQueries);
this.maxWarmingQueries = maxWarmingQueries;
this.warmBasicQueryOnlyPerc = warmBasicQueryOnlyPerc;
this.randomThreadLocal = ThreadLocal.withInitial(Random::new);
}

public int getNumWarmingRequests() {
Expand Down Expand Up @@ -122,6 +136,7 @@ void warmFromS3(IndexState indexState, int parallelism, SearchHandler searchHand
return;
}
ThreadPoolExecutor threadPoolExecutor = null;
long startMS = System.currentTimeMillis();
if (parallelism > 1) {
int numThreads = parallelism - 1;
threadPoolExecutor =
Expand All @@ -140,12 +155,21 @@ void warmFromS3(IndexState indexState, int parallelism, SearchHandler searchHand
remoteBackend.downloadWarmingQueries(service, index),
StateUtils.getValidatingUTF8Decoder()))) {
String line;
int count = 0;
int count = 0, basicCount = 0;
while ((line = reader.readLine()) != null) {
processLine(indexState, searchHandler, threadPoolExecutor, line);
boolean isStripped = randomThreadLocal.get().nextInt(100) < warmBasicQueryOnlyPerc;
processLine(indexState, searchHandler, threadPoolExecutor, line, isStripped);
count++;
if (isStripped) {
basicCount++;
}
}
logger.info("Warmed index: {} with {} warming queries", index, count);
logger.info(
"Warmed index: {} with {} full and {} basic warming queries in {} seconds.",
index,
count - basicCount,
basicCount,
(System.currentTimeMillis() - startMS) / 1000.0);
} finally {
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
Expand All @@ -158,10 +182,14 @@ private void processLine(
IndexState indexState,
SearchHandler searchHandler,
ThreadPoolExecutor threadPoolExecutor,
String line)
String line,
boolean warmBasicQuery)
throws InvalidProtocolBufferException, SearchHandler.SearchHandlerException {
SearchRequest.Builder builder = SearchRequest.newBuilder();
JsonFormat.parser().merge(line, builder);
if (warmBasicQuery) {
WarmingUtils.simplifySearchRequestForWarming(builder);
}
SearchRequest searchRequest = builder.build();
if (threadPoolExecutor == null) {
searchHandler.handle(indexState, searchRequest);
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/com/yelp/nrtsearch/server/warming/WarmerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ public class WarmerConfig {
private static final String CONFIG_PREFIX = "warmer.";
private static final int DEFAULT_MAX_WARMING_QUERIES = 0;
private static final int DEFAULT_WARMING_PARALLELISM = 1;
private static final int DEFAULT_WARM_BASIC_QUERY_ONLY_PERC = 0;
private static final boolean DEFAULT_WARM_ON_STARTUP = false;

private final int maxWarmingQueries;
private final int warmingParallelism;
private final int warmBasicQueryOnlyPerc;
private final boolean warmOnStartup;

/**
* Configuration for warmer.
*
* @param maxWarmingQueries maximum queries to store for warming
* @param warmingParallelism number of parallel queries while warming on startup
* @param warmBasicQueryOnlyPerc percentage of warming queries that should be basic queries for
* the fast boostrap
* @param warmOnStartup if true will try to download queries from S3 and use them to warm
*/
public WarmerConfig(int maxWarmingQueries, int warmingParallelism, boolean warmOnStartup) {
public WarmerConfig(
int maxWarmingQueries,
int warmingParallelism,
int warmBasicQueryOnlyPerc,
boolean warmOnStartup) {
this.maxWarmingQueries = maxWarmingQueries;
this.warmingParallelism = warmingParallelism;
this.warmBasicQueryOnlyPerc = warmBasicQueryOnlyPerc;
this.warmOnStartup = warmOnStartup;
}

Expand All @@ -45,10 +54,14 @@ public static WarmerConfig fromConfig(YamlConfigReader configReader) {
configReader.getInteger(CONFIG_PREFIX + "maxWarmingQueries", DEFAULT_MAX_WARMING_QUERIES);
int warmingParallelism =
configReader.getInteger(CONFIG_PREFIX + "warmingParallelism", DEFAULT_WARMING_PARALLELISM);
int warmBasicQueryOnlyPerc =
configReader.getInteger(
CONFIG_PREFIX + "warmBasicQueryOnlyPerc", DEFAULT_WARM_BASIC_QUERY_ONLY_PERC);
boolean warmOnStartup =
configReader.getBoolean(CONFIG_PREFIX + "warmOnStartup", DEFAULT_WARM_ON_STARTUP);

return new WarmerConfig(maxWarmingQueries, warmingParallelism, warmOnStartup);
return new WarmerConfig(
maxWarmingQueries, warmingParallelism, warmBasicQueryOnlyPerc, warmOnStartup);
}

public int getMaxWarmingQueries() {
Expand All @@ -59,6 +72,10 @@ public int getWarmingParallelism() {
return warmingParallelism;
}

public int getWarmBasicQueryOnlyPerc() {
return warmBasicQueryOnlyPerc;
}

public boolean isWarmOnStartup() {
return warmOnStartup;
}
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/warming/WarmingUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2020 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.warming;

import com.yelp.nrtsearch.server.grpc.BooleanClause;
import com.yelp.nrtsearch.server.grpc.BooleanQuery;
import com.yelp.nrtsearch.server.grpc.ConstantScoreQuery;
import com.yelp.nrtsearch.server.grpc.DisjunctionMaxQuery;
import com.yelp.nrtsearch.server.grpc.MultiFunctionScoreQuery;
import com.yelp.nrtsearch.server.grpc.NestedQuery;
import com.yelp.nrtsearch.server.grpc.Query;
import com.yelp.nrtsearch.server.grpc.SearchRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WarmingUtils {
private static final Logger logger = LoggerFactory.getLogger(WarmingUtils.class);

public static SearchRequest simplifySearchRequestForWarming(
SearchRequest.Builder searchRequestBuilder) {
Query basicSearchRequest = stripScriptQuery(searchRequestBuilder.getQuery());
SearchRequest searchRequest =
searchRequestBuilder
.setQuery(basicSearchRequest)
.clearQuerySort()
.clearRescorers()
.clearRetrieveFields()
.clearFetchTasks()
.clearFacets()
.clearCollectors()
.clearHighlight()
.setProfile(false)
.build();
return searchRequest;
}

private static Query stripScriptQuery(Query query) {
if (query.hasFunctionScoreQuery()) {
return stripScriptQuery(query.getFunctionScoreQuery().getQuery());
}
if (query.hasFunctionFilterQuery()) {
return Query.newBuilder().build();
}
if (query.hasMultiFunctionScoreQuery()) {
MultiFunctionScoreQuery multiFunctionScoreQuery = query.getMultiFunctionScoreQuery();
for (MultiFunctionScoreQuery.FilterFunction function :
multiFunctionScoreQuery.getFunctionsList()) {
if (function.hasScript()) {
return stripScriptQuery(function.getFilter());
}
}
}

Query.Builder queryBuilder = query.toBuilder();
switch (query.getQueryNodeCase()) {
case BOOLEANQUERY -> queryBuilder.setBooleanQuery(stripBooleanQuery(query.getBooleanQuery()));
case DISJUNCTIONMAXQUERY ->
queryBuilder.setDisjunctionMaxQuery(
stripDisjunctionMaxQuery(query.getDisjunctionMaxQuery()));
case NESTEDQUERY -> queryBuilder.setNestedQuery(stripNestedQuery(query.getNestedQuery()));
case CONSTANTSCOREQUERY ->
queryBuilder.setConstantScoreQuery(
stripConstantScoreQuery(query.getConstantScoreQuery()));
default -> {}
}
// Add other cases as needed
return queryBuilder.build();
}

private static BooleanQuery stripBooleanQuery(BooleanQuery booleanQuery) {
BooleanQuery.Builder booleanQueryBuilder = booleanQuery.toBuilder();
for (int i = 0; i < booleanQuery.getClausesCount(); i++) {
BooleanClause clause = booleanQuery.getClauses(i);
BooleanClause.Builder clauseBuilder = clause.toBuilder();
clauseBuilder.setQuery(stripScriptQuery(clause.getQuery()));
booleanQueryBuilder.setClauses(i, clauseBuilder.build());
}
return booleanQueryBuilder.build();
}

private static DisjunctionMaxQuery stripDisjunctionMaxQuery(
DisjunctionMaxQuery disjunctionMaxQuery) {
DisjunctionMaxQuery.Builder disjunctionMaxQueryBuilder = disjunctionMaxQuery.toBuilder();
for (int i = 0; i < disjunctionMaxQuery.getDisjunctsCount(); i++) {
disjunctionMaxQueryBuilder.setDisjuncts(
i, stripScriptQuery(disjunctionMaxQuery.getDisjuncts(i)));
}
return disjunctionMaxQueryBuilder.build();
}

private static NestedQuery stripNestedQuery(NestedQuery nestedQuery) {
NestedQuery.Builder nestedQueryBuilder = nestedQuery.toBuilder();
nestedQueryBuilder.setQuery(stripScriptQuery(nestedQuery.getQuery()));
return nestedQueryBuilder.build();
}

private static ConstantScoreQuery stripConstantScoreQuery(ConstantScoreQuery constantScoreQuery) {
ConstantScoreQuery.Builder constantScoreQueryBuilder = constantScoreQuery.toBuilder();
constantScoreQueryBuilder.setFilter(stripScriptQuery(constantScoreQuery.getFilter()));
return constantScoreQueryBuilder.build();
}
}
62 changes: 58 additions & 4 deletions src/test/java/com/yelp/nrtsearch/server/warming/WarmerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import com.amazonaws.services.s3.AmazonS3;
import com.yelp.nrtsearch.server.config.NrtsearchConfig;
import com.yelp.nrtsearch.server.grpc.FunctionScoreQuery;
import com.yelp.nrtsearch.server.grpc.Query;
import com.yelp.nrtsearch.server.grpc.Script;
import com.yelp.nrtsearch.server.grpc.SearchRequest;
import com.yelp.nrtsearch.server.grpc.TermQuery;
import com.yelp.nrtsearch.server.handler.SearchHandler;
Expand Down Expand Up @@ -105,6 +107,28 @@ public void testWarmFromS3()
verifyNoMoreInteractions(mockSearchHandler);
}

@Test
public void testWarmFromS3_basic()
throws IOException, SearchHandler.SearchHandlerException, InterruptedException {
Warmer warmerWithBasic = new Warmer(remoteBackend, service, index, 2, 30);

List<String> testSearchRequestsJson = getTestSearchRequestsAsJsonStrings();
byte[] warmingBytes = getWarmingBytes(testSearchRequestsJson);
remoteBackend.uploadWarmingQueries(service, "test_index", warmingBytes);

IndexState mockIndexState = mock(IndexState.class);
SearchHandler mockSearchHandler = mock(SearchHandler.class);

// nextInt(100) for this seed is: 28, 33, 20, 10
warmerWithBasic.randomThreadLocal.get().setSeed(1234);
warmerWithBasic.warmFromS3(mockIndexState, 0, mockSearchHandler);

for (SearchRequest testRequest : getTestBasicSearchRequests()) {
verify(mockSearchHandler).handle(mockIndexState, testRequest);
}
verifyNoMoreInteractions(mockSearchHandler);
}

@Test
public void testWarmFromS3_multiple()
throws IOException, SearchHandler.SearchHandlerException, InterruptedException {
Expand Down Expand Up @@ -167,17 +191,47 @@ private List<SearchRequest> getTestSearchRequests() {
.setIndexName(index)
.setQuery(
Query.newBuilder()
.setTermQuery(TermQuery.newBuilder().setField("field" + i).build())
.build())
.setFunctionScoreQuery(
FunctionScoreQuery.newBuilder()
.setQuery(
Query.newBuilder()
.setTermQuery(TermQuery.newBuilder().setField("field" + i)))
.setScript(Script.newBuilder().setLang("js").setSource("3 * 5"))))
.build();
testRequests.add(searchRequest);
}
return testRequests;
}

private List<SearchRequest> getTestBasicSearchRequests() {
List<SearchRequest> testRequests = new ArrayList<>();
SearchRequest searchRequest =
SearchRequest.newBuilder()
.setIndexName(index)
.setQuery(Query.newBuilder().setTermQuery(TermQuery.newBuilder().setField("field0")))
.build();
testRequests.add(searchRequest);

searchRequest =
SearchRequest.newBuilder()
.setIndexName(index)
.setQuery(
Query.newBuilder()
.setFunctionScoreQuery(
FunctionScoreQuery.newBuilder()
.setQuery(
Query.newBuilder()
.setTermQuery(TermQuery.newBuilder().setField("field1")))
.setScript(Script.newBuilder().setLang("js").setSource("3 * 5"))))
.build();
testRequests.add(searchRequest);

return testRequests;
}

private List<String> getTestSearchRequestsAsJsonStrings() {
return List.of(
"{\"indexName\":\"test_index\",\"query\":{\"termQuery\":{\"field\":\"field0\"}}}",
"{\"indexName\":\"test_index\",\"query\":{\"termQuery\":{\"field\":\"field1\"}}}");
"{\"indexName\":\"test_index\",\"query\":{\"functionScoreQuery\":{\"query\":{\"termQuery\":{\"field\":\"field0\"}},\"script\":{\"lang\":\"js\",\"source\":\"3 * 5\"}}}}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were testing the same query twice to test it being simplified or not. With the last commit change, we have 2 extra identical queries and I am just wondering what is being tested with these extra identical queries. Just wondering if we need them or if we can keep only two and simplify the test code by removing the extra for loops in getTestBasicSearchRequests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding 2 more lines to ensure the "random" works exactly as we expected. Only 2 queries might be coincident.

We ain't aiming to test the major stripping logic here

"{\"indexName\":\"test_index\",\"query\":{\"functionScoreQuery\":{\"query\":{\"termQuery\":{\"field\":\"field1\"}},\"script\":{\"lang\":\"js\",\"source\":\"3 * 5\"}}}}");
}
}
Loading
Loading