Skip to content

Commit 6dd964c

Browse files
authored
dhruva_RP-13176_support_multiple_indices_in_AddDocumentRequest (#862)
Added support for multiple indexNames in the same AddDocumentRequest. Created a new test file and updated NrtsearchClientBuilder and AddDocumentsCommand to support the new change.
1 parent 2163f6d commit 6dd964c

File tree

6 files changed

+283
-33
lines changed

6 files changed

+283
-33
lines changed

clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,8 @@ message StartIndexResponse {
832832

833833
// Message representing a document to index
834834
message AddDocumentRequest {
835-
// Index name
835+
// Index name; use this parameter when you want to add documents to one specific index.
836+
// You must exclusively use this parameter or the indexNames parameter, not both.
836837
string indexName = 1;
837838
// Wrapper object to represent each field as a multivalued field.
838839
message MultiValuedField {
@@ -845,6 +846,9 @@ message AddDocumentRequest {
845846
map<string, MultiValuedField> fields = 3;
846847
// request type
847848
IndexingRequestType requestType = 4;
849+
// List of indexNames; use this parameter when you want to add documents to multiple indices at once.
850+
// You must exclusively use this parameter or the indexName parameter, not both.
851+
repeated string indexNames = 5;
848852
}
849853

850854
// Path for hierarchical facets

src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchClientBuilder.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,15 @@ public StartIndexRequest buildRequest(Path filePath) throws IOException {
119119
class AddDocumentsClientBuilder implements NrtsearchClientBuilder<Stream<AddDocumentRequest>> {
120120
private static final Logger logger =
121121
LoggerFactory.getLogger(AddDocumentsClientBuilder.class.getName());
122-
private final String indexName;
122+
private final List<String> indexNames;
123123
private final CSVParser csvParser;
124124

125125
public AddDocumentsClientBuilder(String indexName, CSVParser csvParser) {
126-
this.indexName = indexName;
126+
this(List.of(indexName), csvParser);
127+
}
128+
129+
public AddDocumentsClientBuilder(List<String> indexNames, CSVParser csvParser) {
130+
this.indexNames = indexNames;
127131
this.csvParser = csvParser;
128132
}
129133

@@ -132,9 +136,9 @@ public Stream<AddDocumentRequest> buildRequest(Path filePath) {
132136
Stream.Builder<AddDocumentRequest> builder = Stream.builder();
133137
AddDocumentRequest.Builder addDocumentRequestBuilder = AddDocumentRequest.newBuilder();
134138
int cnt = 0;
139+
addDocumentRequestBuilder.addAllIndexNames(indexNames);
135140
for (CSVRecord csvRecord : csvParser) {
136141
{ // data rows
137-
addDocumentRequestBuilder.setIndexName(indexName);
138142
for (String fieldName : csvParser.getHeaderNames()) {
139143
String fieldValues = csvRecord.get(fieldName);
140144
List<String> fieldVals = Arrays.asList(fieldValues.split(";"));
@@ -156,7 +160,7 @@ class AddJsonDocumentsClientBuilder
156160
implements NrtsearchClientBuilder<Stream<AddDocumentRequest>> {
157161
private static final Logger logger =
158162
LoggerFactory.getLogger(AddJsonDocumentsClientBuilder.class.getName());
159-
private final String indexName;
163+
private final List<String> indexNames;
160164
private final Gson gson;
161165
private final Path filePath;
162166
private final BufferedReader reader;
@@ -165,7 +169,12 @@ class AddJsonDocumentsClientBuilder
165169

166170
public AddJsonDocumentsClientBuilder(
167171
String indexName, Gson gson, Path filePath, int maxBufferLen) throws IOException {
168-
this.indexName = indexName;
172+
this(List.of(indexName), gson, filePath, maxBufferLen);
173+
}
174+
175+
public AddJsonDocumentsClientBuilder(
176+
List<String> indexNames, Gson gson, Path filePath, int maxBufferLen) throws IOException {
177+
this.indexNames = indexNames;
169178
this.gson = gson;
170179
this.filePath = filePath;
171180
this.reader = Files.newBufferedReader(filePath);
@@ -195,7 +204,7 @@ private void updateMultiValuedFielduilder(
195204
AddDocumentRequest buildAddDocumentRequest(String lineStr) {
196205
Map<String, Object> line = gson.fromJson(lineStr, Map.class);
197206
AddDocumentRequest.Builder addDocumentRequestBuilder = AddDocumentRequest.newBuilder();
198-
addDocumentRequestBuilder.setIndexName(indexName);
207+
addDocumentRequestBuilder.addAllIndexNames(indexNames);
199208
for (String key : line.keySet()) {
200209
AddDocumentRequest.MultiValuedField.Builder multiValuedFieldsBuilder =
201210
AddDocumentRequest.MultiValuedField.newBuilder();

src/main/java/com/yelp/nrtsearch/server/handler/AddDocumentHandler.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,7 @@ private ArrayBlockingQueue<AddDocumentRequest> getAddDocumentRequestQueue(String
9797
}
9898
}
9999

100-
private long getCount(String indexName) {
101-
return countMap.getOrDefault(indexName, 0L);
102-
}
103-
104-
private void incrementCount(String indexName) {
105-
if (countMap.containsKey(indexName)) {
106-
countMap.put(indexName, countMap.get(indexName) + 1);
107-
} else {
108-
countMap.put(indexName, 1L);
109-
}
110-
}
111-
112-
@Override
113-
public void onNext(AddDocumentRequest addDocumentRequest) {
114-
String indexName = addDocumentRequest.getIndexName();
100+
private void processIndexDocument(AddDocumentRequest request, String indexName) {
115101
ArrayBlockingQueue<AddDocumentRequest> addDocumentRequestQueue;
116102
try {
117103
addDocumentRequestQueue = getAddDocumentRequestQueue(indexName);
@@ -124,15 +110,14 @@ public void onNext(AddDocumentRequest addDocumentRequest) {
124110
"onNext, index: %s, addDocumentRequestQueue size: %s",
125111
indexName, addDocumentRequestQueue.size()));
126112
incrementCount(indexName);
127-
addDocumentRequestQueue.add(addDocumentRequest);
113+
addDocumentRequestQueue.add(request);
128114
if (addDocumentRequestQueue.remainingCapacity() == 0) {
129115
logger.debug(
130116
String.format(
131117
"indexing addDocumentRequestQueue size: %s, total: %s",
132118
addDocumentRequestQueue.size(), getCount(indexName)));
133119
try {
134120
DeadlineUtils.checkDeadline("addDocuments: onNext", "INDEXING");
135-
136121
List<AddDocumentRequest> addDocRequestList = new ArrayList<>(addDocumentRequestQueue);
137122
Future<Long> future =
138123
getGlobalState()
@@ -150,6 +135,44 @@ public void onNext(AddDocumentRequest addDocumentRequest) {
150135
}
151136
}
152137

138+
private long getCount(String indexName) {
139+
return countMap.getOrDefault(indexName, 0L);
140+
}
141+
142+
private void incrementCount(String indexName) {
143+
if (countMap.containsKey(indexName)) {
144+
countMap.put(indexName, countMap.get(indexName) + 1);
145+
} else {
146+
countMap.put(indexName, 1L);
147+
}
148+
}
149+
150+
@Override
151+
public void onNext(AddDocumentRequest addDocumentRequest) {
152+
String indexName = addDocumentRequest.getIndexName();
153+
int indexNamesCount = addDocumentRequest.getIndexNamesCount();
154+
if (indexName.isEmpty() && indexNamesCount == 0) {
155+
onError(
156+
Status.INVALID_ARGUMENT
157+
.withDescription(
158+
"Must provide exactly one of indexName or indexNames but neither is set")
159+
.asRuntimeException());
160+
} else if (!indexName.isEmpty() && indexNamesCount > 0) {
161+
onError(
162+
Status.INVALID_ARGUMENT
163+
.withDescription(
164+
"Must provide exactly one of indexName or indexNames but both are set")
165+
.asRuntimeException());
166+
} else if (indexNamesCount > 0) {
167+
List<String> indexNames = addDocumentRequest.getIndexNamesList();
168+
for (String currentIndexName : indexNames) {
169+
processIndexDocument(addDocumentRequest, currentIndexName);
170+
}
171+
} else {
172+
processIndexDocument(addDocumentRequest, indexName);
173+
}
174+
}
175+
153176
@Override
154177
public void onError(Throwable t) {
155178
logger.warn("addDocuments Cancelled", t);

src/main/java/com/yelp/nrtsearch/tools/cli/AddDocumentsCommand.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.file.Files;
2626
import java.nio.file.Path;
2727
import java.nio.file.Paths;
28+
import java.util.List;
2829
import java.util.concurrent.Callable;
2930
import java.util.stream.Stream;
3031
import org.apache.commons.csv.CSVFormat;
@@ -60,12 +61,13 @@ public String getFileType() {
6061

6162
@CommandLine.Option(
6263
names = {"-i", "--indexName"},
63-
description = "Name of the index to add documents to",
64+
description =
65+
"List of index names to add documents to. Delimited by a \",\" : <indexName1>,<indexName2>,<indexName3>...",
6466
required = true)
65-
private String indexName;
67+
private String indexNamesStr;
6668

67-
public String getIndexName() {
68-
return indexName;
69+
public String getIndexNamesStr() {
70+
return indexNamesStr;
6971
}
7072

7173
@CommandLine.Option(
@@ -83,8 +85,8 @@ public int getMaxBufferLen() {
8385
public Integer call() throws Exception {
8486
NrtsearchClient client = baseCmd.getClient();
8587
try {
86-
String indexName = getIndexName();
8788
String fileType = getFileType();
89+
List<String> indexNames = List.of(getIndexNamesStr().split(","));
8890
Stream<AddDocumentRequest> addDocumentRequestStream;
8991
Path filePath = Paths.get(getFileName());
9092
if (fileType.equalsIgnoreCase("csv")) {
@@ -93,13 +95,13 @@ public Integer call() throws Exception {
9395
new CSVParser(
9496
reader, CSVFormat.DEFAULT.builder().setHeader().setSkipHeaderRecord(true).build());
9597
addDocumentRequestStream =
96-
new NrtsearchClientBuilder.AddDocumentsClientBuilder(indexName, csvParser)
98+
new NrtsearchClientBuilder.AddDocumentsClientBuilder(indexNames, csvParser)
9799
.buildRequest(filePath);
98100
client.addDocuments(addDocumentRequestStream);
99101
} else if (fileType.equalsIgnoreCase("json")) {
100102
NrtsearchClientBuilder.AddJsonDocumentsClientBuilder addJsonDocumentsClientBuilder =
101103
new NrtsearchClientBuilder.AddJsonDocumentsClientBuilder(
102-
indexName, new Gson(), filePath, getMaxBufferLen());
104+
indexNames, new Gson(), filePath, getMaxBufferLen());
103105
while (!addJsonDocumentsClientBuilder.isFinished()) {
104106
addDocumentRequestStream = addJsonDocumentsClientBuilder.buildRequest(filePath);
105107
client.addDocuments(addDocumentRequestStream);

0 commit comments

Comments
 (0)