Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ java {
}

allprojects {
version = '1.0.0-beta.8'
version = '1.0.0-beta.9'
group = 'com.yelp.nrtsearch'
}

Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/yelp/nrtsearch/server/index/IndexState.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import com.yelp.nrtsearch.server.field.ObjectFieldDef;
import com.yelp.nrtsearch.server.field.TextBaseFieldDef;
import com.yelp.nrtsearch.server.field.properties.GlobalOrdinalable;
import com.yelp.nrtsearch.server.grpc.*;
import com.yelp.nrtsearch.server.grpc.Field;
import com.yelp.nrtsearch.server.grpc.FieldType;
import com.yelp.nrtsearch.server.grpc.IndexStateInfo;
import com.yelp.nrtsearch.server.grpc.Mode;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.nrt.NrtDataManager;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.state.GlobalState;
Expand All @@ -39,7 +43,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -258,7 +266,8 @@ public void initWarmer(RemoteBackend remoteBackend, String indexName) {
remoteBackend,
configuration.getServiceName(),
indexName,
warmerConfig.getMaxWarmingQueries());
warmerConfig.getMaxWarmingQueries(),
warmerConfig.getWarmBasicQueryOnlyPerc());
}
}

Expand Down
36 changes: 32 additions & 4 deletions src/main/java/com/yelp/nrtsearch/server/warming/Warmer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -52,14 +53,27 @@ public class Warmer {
private final ReservoirSampler reservoirSampler;
private final String index;
private final int maxWarmingQueries;
private final int warmBasicQueryOnlyPerc;
protected final ThreadLocal<Random> randomThreadLocal;

public Warmer(RemoteBackend remoteBackend, String service, String index, int maxWarmingQueries) {
this(remoteBackend, service, index, maxWarmingQueries, 0);
}

public Warmer(
RemoteBackend remoteBackend,
String service,
String index,
int maxWarmingQueries,
int warmBasicQueryOnlyPerc) {
this.remoteBackend = remoteBackend;
this.service = service;
this.index = index;
this.warmingRequests = Collections.synchronizedList(new ArrayList<>(maxWarmingQueries));
this.reservoirSampler = new ReservoirSampler(maxWarmingQueries);
this.maxWarmingQueries = maxWarmingQueries;
this.warmBasicQueryOnlyPerc = warmBasicQueryOnlyPerc;
this.randomThreadLocal = ThreadLocal.withInitial(Random::new);
}

public int getNumWarmingRequests() {
Expand Down Expand Up @@ -122,6 +136,7 @@ void warmFromS3(IndexState indexState, int parallelism, SearchHandler searchHand
return;
}
ThreadPoolExecutor threadPoolExecutor = null;
long startMS = System.currentTimeMillis();
if (parallelism > 1) {
int numThreads = parallelism - 1;
threadPoolExecutor =
Expand All @@ -140,12 +155,21 @@ void warmFromS3(IndexState indexState, int parallelism, SearchHandler searchHand
remoteBackend.downloadWarmingQueries(service, index),
StateUtils.getValidatingUTF8Decoder()))) {
String line;
int count = 0;
int count = 0, basicCount = 0;
while ((line = reader.readLine()) != null) {
processLine(indexState, searchHandler, threadPoolExecutor, line);
boolean isStripped = randomThreadLocal.get().nextInt(100) < warmBasicQueryOnlyPerc;
processLine(indexState, searchHandler, threadPoolExecutor, line, isStripped);
count++;
if (isStripped) {
basicCount++;
}
}
logger.info("Warmed index: {} with {} warming queries", index, count);
logger.info(
"Warmed index: {} with {} full and {} basic warming queries in {} seconds.",
index,
count - basicCount,
basicCount,
(System.currentTimeMillis() - startMS) / 1000.0);
} finally {
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
Expand All @@ -158,10 +182,14 @@ private void processLine(
IndexState indexState,
SearchHandler searchHandler,
ThreadPoolExecutor threadPoolExecutor,
String line)
String line,
boolean warmBasicQuery)
throws InvalidProtocolBufferException, SearchHandler.SearchHandlerException {
SearchRequest.Builder builder = SearchRequest.newBuilder();
JsonFormat.parser().merge(line, builder);
if (warmBasicQuery) {
WarmingUtils.simplifySearchRequestForWarming(builder);
}
SearchRequest searchRequest = builder.build();
if (threadPoolExecutor == null) {
searchHandler.handle(indexState, searchRequest);
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/com/yelp/nrtsearch/server/warming/WarmerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ public class WarmerConfig {
private static final String CONFIG_PREFIX = "warmer.";
private static final int DEFAULT_MAX_WARMING_QUERIES = 0;
private static final int DEFAULT_WARMING_PARALLELISM = 1;
private static final int DEFAULT_WARM_BASIC_QUERY_ONLY_PERC = 0;
private static final boolean DEFAULT_WARM_ON_STARTUP = false;

private final int maxWarmingQueries;
private final int warmingParallelism;
private final int warmBasicQueryOnlyPerc;
private final boolean warmOnStartup;

/**
* Configuration for warmer.
*
* @param maxWarmingQueries maximum queries to store for warming
* @param warmingParallelism number of parallel queries while warming on startup
* @param warmBasicQueryOnlyPerc percentage of warming queries that should be basic queries for
* the fast boostrap
* @param warmOnStartup if true will try to download queries from S3 and use them to warm
*/
public WarmerConfig(int maxWarmingQueries, int warmingParallelism, boolean warmOnStartup) {
public WarmerConfig(
int maxWarmingQueries,
int warmingParallelism,
int warmBasicQueryOnlyPerc,
boolean warmOnStartup) {
this.maxWarmingQueries = maxWarmingQueries;
this.warmingParallelism = warmingParallelism;
this.warmBasicQueryOnlyPerc = warmBasicQueryOnlyPerc;
this.warmOnStartup = warmOnStartup;
}

Expand All @@ -45,10 +54,14 @@ public static WarmerConfig fromConfig(YamlConfigReader configReader) {
configReader.getInteger(CONFIG_PREFIX + "maxWarmingQueries", DEFAULT_MAX_WARMING_QUERIES);
int warmingParallelism =
configReader.getInteger(CONFIG_PREFIX + "warmingParallelism", DEFAULT_WARMING_PARALLELISM);
int warmBasicQueryOnlyPerc =
configReader.getInteger(
CONFIG_PREFIX + "warmBasicQueryOnlyPerc", DEFAULT_WARM_BASIC_QUERY_ONLY_PERC);
boolean warmOnStartup =
configReader.getBoolean(CONFIG_PREFIX + "warmOnStartup", DEFAULT_WARM_ON_STARTUP);

return new WarmerConfig(maxWarmingQueries, warmingParallelism, warmOnStartup);
return new WarmerConfig(
maxWarmingQueries, warmingParallelism, warmBasicQueryOnlyPerc, warmOnStartup);
}

public int getMaxWarmingQueries() {
Expand All @@ -59,6 +72,10 @@ public int getWarmingParallelism() {
return warmingParallelism;
}

public int getWarmBasicQueryOnlyPerc() {
return warmBasicQueryOnlyPerc;
}

public boolean isWarmOnStartup() {
return warmOnStartup;
}
Expand Down
112 changes: 112 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/warming/WarmingUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2020 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.warming;

import com.yelp.nrtsearch.server.grpc.BooleanClause;
import com.yelp.nrtsearch.server.grpc.BooleanQuery;
import com.yelp.nrtsearch.server.grpc.ConstantScoreQuery;
import com.yelp.nrtsearch.server.grpc.DisjunctionMaxQuery;
import com.yelp.nrtsearch.server.grpc.MultiFunctionScoreQuery;
import com.yelp.nrtsearch.server.grpc.NestedQuery;
import com.yelp.nrtsearch.server.grpc.Query;
import com.yelp.nrtsearch.server.grpc.SearchRequest;

public class WarmingUtils {

public static SearchRequest simplifySearchRequestForWarming(
SearchRequest.Builder searchRequestBuilder) {
Query basicSearchRequest = stripScriptQuery(searchRequestBuilder.getQuery());
SearchRequest searchRequest =
searchRequestBuilder
.setQuery(basicSearchRequest)
.clearQuerySort()
.clearRescorers()
.clearRetrieveFields()
.clearFetchTasks()
.clearFacets()
.clearCollectors()
.clearHighlight()
.setProfile(false)
.build();
return searchRequest;
}

private static Query stripScriptQuery(Query query) {
if (query.hasFunctionScoreQuery()) {
return stripScriptQuery(query.getFunctionScoreQuery().getQuery());
}
if (query.hasFunctionFilterQuery()) {
return Query.newBuilder().build();
}
if (query.hasMultiFunctionScoreQuery()) {
MultiFunctionScoreQuery multiFunctionScoreQuery = query.getMultiFunctionScoreQuery();
for (MultiFunctionScoreQuery.FilterFunction function :
multiFunctionScoreQuery.getFunctionsList()) {
if (function.hasScript()) {
return stripScriptQuery(function.getFilter());
}
}
}

Query.Builder queryBuilder = query.toBuilder();
switch (query.getQueryNodeCase()) {
case BOOLEANQUERY -> queryBuilder.setBooleanQuery(stripBooleanQuery(query.getBooleanQuery()));
case DISJUNCTIONMAXQUERY ->
queryBuilder.setDisjunctionMaxQuery(
stripDisjunctionMaxQuery(query.getDisjunctionMaxQuery()));
case NESTEDQUERY -> queryBuilder.setNestedQuery(stripNestedQuery(query.getNestedQuery()));
case CONSTANTSCOREQUERY ->
queryBuilder.setConstantScoreQuery(
stripConstantScoreQuery(query.getConstantScoreQuery()));
default -> {}
}
// Add other cases as needed
return queryBuilder.build();
}

private static BooleanQuery stripBooleanQuery(BooleanQuery booleanQuery) {
BooleanQuery.Builder booleanQueryBuilder = booleanQuery.toBuilder();
for (int i = 0; i < booleanQuery.getClausesCount(); i++) {
BooleanClause clause = booleanQuery.getClauses(i);
BooleanClause.Builder clauseBuilder = clause.toBuilder();
clauseBuilder.setQuery(stripScriptQuery(clause.getQuery()));
booleanQueryBuilder.setClauses(i, clauseBuilder.build());
}
return booleanQueryBuilder.build();
}

private static DisjunctionMaxQuery stripDisjunctionMaxQuery(
DisjunctionMaxQuery disjunctionMaxQuery) {
DisjunctionMaxQuery.Builder disjunctionMaxQueryBuilder = disjunctionMaxQuery.toBuilder();
for (int i = 0; i < disjunctionMaxQuery.getDisjunctsCount(); i++) {
disjunctionMaxQueryBuilder.setDisjuncts(
i, stripScriptQuery(disjunctionMaxQuery.getDisjuncts(i)));
}
return disjunctionMaxQueryBuilder.build();
}

private static NestedQuery stripNestedQuery(NestedQuery nestedQuery) {
NestedQuery.Builder nestedQueryBuilder = nestedQuery.toBuilder();
nestedQueryBuilder.setQuery(stripScriptQuery(nestedQuery.getQuery()));
return nestedQueryBuilder.build();
}

private static ConstantScoreQuery stripConstantScoreQuery(ConstantScoreQuery constantScoreQuery) {
ConstantScoreQuery.Builder constantScoreQueryBuilder = constantScoreQuery.toBuilder();
constantScoreQueryBuilder.setFilter(stripScriptQuery(constantScoreQuery.getFilter()));
return constantScoreQueryBuilder.build();
}
}
Loading
Loading