From 330923f03977121079d5fa8f4aa8785947c08c24 Mon Sep 17 00:00:00 2001 From: umesh Date: Wed, 30 Apr 2025 09:47:37 -0700 Subject: [PATCH 1/6] feat(plugin): add ingestion plugin framework and example implementation - Introduced AbstractIngestionPlugin base class to support ingestion plugins - Defined IngestionPlugin interface with start/stop lifecycle methods - Updated NrtsearchServer to initialize ingestion plugin state - Implemented ExamplePlugin as an ingestion + analysis plugin - Added test coverage for ingestion using Awaitility for async validation - Registered example plugin in build and settings - Bumped version to 1.0.0-beta.11 --- example-plugin/build.gradle | 1 + .../plugins/example/ExamplePlugin.java | 103 +++++++++++++++- .../plugins/example/ExamplePluginTest.java | 111 +++++++++++++++--- .../resources/register_fields_ingestion.json | 15 +++ settings.gradle | 1 + .../server/grpc/NrtsearchServer.java | 11 ++ .../plugins/AbstractIngestionPlugin.java | 95 +++++++++++++++ .../server/plugins/IngestionPlugin.java | 38 ++++++ .../yelp/nrtsearch/server/ServerTestCase.java | 6 + 9 files changed, 359 insertions(+), 22 deletions(-) create mode 100644 example-plugin/src/test/resources/register_fields_ingestion.json create mode 100644 src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java diff --git a/example-plugin/build.gradle b/example-plugin/build.gradle index 0062bbe6e..cb4fe7092 100644 --- a/example-plugin/build.gradle +++ b/example-plugin/build.gradle @@ -34,6 +34,7 @@ dependencies { testImplementation libs.grpc.testing testImplementation libs.junit testImplementation libs.protobuf.java + testImplementation("org.awaitility:awaitility:4.3.0") } distributions { diff --git a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java index 57d29ac65..2284587ee 100644 --- a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java +++ b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java @@ -15,22 +15,64 @@ */ package com.yelp.nrtsearch.plugins.example; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.yelp.nrtsearch.server.analysis.AnalysisProvider; import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; import com.yelp.nrtsearch.server.plugins.AnalysisPlugin; import com.yelp.nrtsearch.server.plugins.CustomRequestPlugin; -import com.yelp.nrtsearch.server.plugins.Plugin; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.custom.CustomAnalyzer; import org.apache.lucene.util.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class ExamplePlugin extends Plugin implements AnalysisPlugin, CustomRequestPlugin { - +public class ExamplePlugin extends AbstractIngestionPlugin + implements AnalysisPlugin, CustomRequestPlugin { + private static final Logger logger = LoggerFactory.getLogger(ExamplePlugin.class); + public static final String INGESTION_TEST_INDEX = "ingestion_test_index"; private final String availableAnalyzers = String.join(",", getAnalyzers().keySet()); + private ExecutorService executorService; // No longer final so we can recreate + private final AtomicBoolean running = new AtomicBoolean(false); + private final List testDocuments = new ArrayList<>(); + + public ExamplePlugin(NrtsearchConfig configuration) { + super(configuration); - // Constructor that accepts LuceneServerConfiguration object is required - public ExamplePlugin(NrtsearchConfig configuration) {} + // Create test documents + testDocuments.add( + AddDocumentRequest.newBuilder() + .setIndexName(INGESTION_TEST_INDEX) + .putFields( + "field1", + AddDocumentRequest.MultiValuedField.newBuilder().addValue("test doc 1").build()) + .build()); + testDocuments.add( + AddDocumentRequest.newBuilder() + .setIndexName(INGESTION_TEST_INDEX) + .putFields( + "field1", + AddDocumentRequest.MultiValuedField.newBuilder().addValue("test doc 2").build()) + .build()); + } + + private synchronized ExecutorService getOrCreateExecutorService() { + if (executorService == null || executorService.isShutdown()) { + executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); + } + return executorService; + } @Override public String id() { @@ -61,4 +103,55 @@ public Map> getAnalyzers() { } }); } + + @Override + public void startIngestion() throws IOException { + logger.info("Starting example ingestion"); + if (!running.compareAndSet(false, true)) { + logger.warn("Ingestion already running"); + return; + } + + executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); + + executorService.submit( + () -> { + try { + addDocuments(testDocuments, INGESTION_TEST_INDEX); + commit(INGESTION_TEST_INDEX); + } catch (Exception e) { + logger.error("Error during ingestion", e); + } finally { + running.set(false); // Reset running flag when done + } + }); + } + + @Override + public void stopIngestion() throws IOException { + logger.info("Stopping example ingestion"); + running.set(false); // Signal stop (not strictly needed for one-shot task) + + if (executorService != null) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("Ingestion thread did not complete within timeout"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while stopping ingestion", e); + } finally { + executorService = null; // Allow recreation + } + } + } + + /** Get the test documents that will be ingested. Exposed for test validation. */ + List getTestDocuments() { + return testDocuments; + } } diff --git a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java index 991dd85ff..27356bbfa 100644 --- a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java +++ b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java @@ -15,7 +15,9 @@ */ package com.yelp.nrtsearch.plugins.example; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.yelp.nrtsearch.server.ServerTestCase; import com.yelp.nrtsearch.server.config.NrtsearchConfig; @@ -26,6 +28,7 @@ import com.yelp.nrtsearch.server.grpc.FieldDefRequest; import com.yelp.nrtsearch.server.grpc.MatchQuery; import com.yelp.nrtsearch.server.grpc.Query; +import com.yelp.nrtsearch.server.grpc.RefreshRequest; import com.yelp.nrtsearch.server.grpc.SearchRequest; import com.yelp.nrtsearch.server.grpc.SearchResponse; import com.yelp.nrtsearch.server.plugins.Plugin; @@ -37,41 +40,81 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class ExamplePluginTest extends ServerTestCase { @ClassRule public static final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private static final String INGESTION_TEST_INDEX = ExamplePlugin.INGESTION_TEST_INDEX; - private static final ExamplePlugin examplePlugin = new ExamplePlugin(getConfig()); + private ExamplePlugin examplePlugin; + private static final String FIELD_1 = "field1"; + + @Override + protected List getIndices() { + return List.of(DEFAULT_TEST_INDEX, INGESTION_TEST_INDEX); + } @Override protected List getPlugins(NrtsearchConfig configuration) { + examplePlugin = new ExamplePlugin(configuration); return List.of(examplePlugin); } @Override protected FieldDefRequest getIndexDef(String name) throws IOException { - return getFieldsFromResourceFile("/register_fields.json"); + if (DEFAULT_TEST_INDEX.equals(name)) { + return getFieldsFromResourceFile("/register_fields.json"); + } else if (INGESTION_TEST_INDEX.equals(name)) { + return getFieldsFromResourceFile("/register_fields_ingestion.json"); + } else { + throw new IllegalArgumentException("Unknown index: " + name); + } } @Override protected void initIndex(String name) throws Exception { - AddDocumentRequest addDocumentRequest = - AddDocumentRequest.newBuilder() - .setIndexName(name) - .putFields( - "field1", - MultiValuedField.newBuilder().addValue("How to use Nrtsearch
").build()) - .build(); - AddDocumentRequest addDocumentRequest2 = - AddDocumentRequest.newBuilder() - .setIndexName(name) - .putFields( - "field1", - MultiValuedField.newBuilder().addValue("How to create plugin").build()) - .build(); - addDocuments(Stream.of(addDocumentRequest, addDocumentRequest2)); + // Only initialize test docs for analysis test index + if (DEFAULT_TEST_INDEX.equals(name)) { + AddDocumentRequest addDocumentRequest = + AddDocumentRequest.newBuilder() + .setIndexName(name) + .putFields( + "field1", + MultiValuedField.newBuilder().addValue("How to use Nrtsearch
").build()) + .build(); + AddDocumentRequest addDocumentRequest2 = + AddDocumentRequest.newBuilder() + .setIndexName(name) + .putFields( + "field1", + MultiValuedField.newBuilder() + .addValue("How to create plugin") + .build()) + .build(); + addDocuments(Stream.of(addDocumentRequest, addDocumentRequest2)); + } + } + + @Before + public void ensurePluginInitialized() throws Exception { + if (examplePlugin == null) { + for (Plugin plugin : getPlugins(getConfig())) { + if (plugin instanceof ExamplePlugin) { + examplePlugin = (ExamplePlugin) plugin; + break; + } + } + } + } + + @After + public void cleanupPlugin() throws Exception { + if (examplePlugin != null) { + examplePlugin.stopIngestion(); + } } @Test @@ -121,6 +164,40 @@ public void testAnalysisForSearch() { assertThat(response.getHitsCount()).isEqualTo(1); } + @Test + public void testPluginIngestion() throws Exception { + examplePlugin.startIngestion(); + + await() + .atMost(5, SECONDS) + .untilAsserted( + () -> { + getGrpcServer() + .getBlockingStub() + .refresh(RefreshRequest.newBuilder().setIndexName(INGESTION_TEST_INDEX).build()); + + SearchResponse response = + getGrpcServer() + .getBlockingStub() + .search( + SearchRequest.newBuilder() + .setIndexName(INGESTION_TEST_INDEX) + .setStartHit(0) + .setTopHits(5) + .setQuery( + Query.newBuilder() + .setMatchQuery( + MatchQuery.newBuilder() + .setField("field1") + .setQuery("test doc") + .build()) + .build()) + .build()); + + assertThat(response.getHitsCount()).isEqualTo(2); + }); + } + private static NrtsearchConfig getConfig() { String config = "nodeName: \"server_foo\""; return new NrtsearchConfig(new ByteArrayInputStream(config.getBytes())); diff --git a/example-plugin/src/test/resources/register_fields_ingestion.json b/example-plugin/src/test/resources/register_fields_ingestion.json new file mode 100644 index 000000000..729e02775 --- /dev/null +++ b/example-plugin/src/test/resources/register_fields_ingestion.json @@ -0,0 +1,15 @@ +{ + "indexName": "ingestion_test_index", + "field": [ + { + "name": "field1", + "type": "TEXT", + "search": true + }, + { + "name": "field2", + "type": "TEXT", + "search": true + } + ] +} diff --git a/settings.gradle b/settings.gradle index b4acf42ee..72a9bc687 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,3 +8,4 @@ pluginManagement { } include('clientlib') rootProject.name = 'nrtsearch' +include('example-plugin') diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index f0d6fbf5b..7b5a566b3 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -95,6 +95,7 @@ import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector.RejectionCounterWrapper; +import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; import com.yelp.nrtsearch.server.plugins.Plugin; import com.yelp.nrtsearch.server.plugins.PluginsService; import com.yelp.nrtsearch.server.remote.RemoteBackend; @@ -158,6 +159,7 @@ public void start() throws IOException { GlobalState globalState = serverImpl.getGlobalState(); registerMetrics(globalState); + initializeIngestionPluginState(globalState, plugins); if (configuration.getMaxConcurrentCallsPerConnectionForReplication() != -1) { replicationServer = @@ -225,6 +227,15 @@ serverImpl, new NrtsearchHeaderInterceptor(), monitoringInterceptor)) BootstrapMetrics.nrtsearchBootstrapTimer.set((System.nanoTime() - startNs) / 1_000_000_000.0); } + private void initializeIngestionPluginState(GlobalState globalState, List plugins) + throws IOException { + for (Plugin plugin : plugins) { + if (plugin instanceof AbstractIngestionPlugin abstractIngestionPlugin) { + abstractIngestionPlugin.initializeState(globalState); + } + } + } + @VisibleForTesting public void stop() { if (server != null) { diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java new file mode 100644 index 000000000..80e536fce --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.plugins; + +import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.handler.AddDocumentHandler; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.index.ShardState; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for ingestion plugins that handles state management and indexing operations. Plugin + * implementations: 1. Must implement startIngestion/stopIngestion to handle source-specific logic + * 2. Should use addDocuments/commit methods to index data during ingestion + */ +public abstract class AbstractIngestionPlugin extends Plugin implements IngestionPlugin { + private static final Logger logger = LoggerFactory.getLogger(AbstractIngestionPlugin.class); + + protected final NrtsearchConfig config; + private GlobalState globalState; + + protected AbstractIngestionPlugin(NrtsearchConfig config) { + this.config = config; + } + + /** + * Initialize plugin state with index access. Called by NrtSearchServer when index is ready. + * + * @param globalState The global server state + */ + public final void initializeState(GlobalState globalState) throws IOException { + if (this.globalState != null) { + throw new IllegalStateException("Plugin already initialized"); + } + this.globalState = globalState; + } + + /** + * Add documents. Available for plugin implementations to call during ingestion. + * + * @param addDocRequests List of document requests to add + * @return The sequence number of the indexing operation + * @throws IOException if there are indexing errors + */ + protected final long addDocuments(List addDocRequests, String indexName) + throws Exception { + return new AddDocumentHandler.DocumentIndexer(globalState, addDocRequests, indexName) + .runIndexingJob(); + } + + /** + * Commit ingested documents. Available for plugin implementations to call during ingestion. + * + * @throws IOException if there are commit errors + */ + protected final void commit(String indexName) throws IOException { + verifyInitialized(indexName); + IndexState indexState = globalState.getIndexOrThrow(indexName); + ShardState shard = indexState.getShard(0); + if (shard == null) { + throw new IllegalStateException("No shard found for index"); + } + shard.commit(); + } + + private void verifyInitialized(String indexName) throws IOException { + IndexState indexState = globalState.getIndexOrThrow(indexName); + if (indexState == null) { + throw new IllegalStateException("Plugin not initialized"); + } + } + + @Override + public void close() throws IOException { + stopIngestion(); + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java new file mode 100644 index 000000000..52baf535a --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.plugins; + +import java.io.IOException; + +/** Interface for ingestion plugins. Defines lifecycle methods that plugins must implement. */ +public interface IngestionPlugin { + + /** + * Start ingesting documents from the source. Plugin implementations can start source connections + * and begin processing. + * + * @throws IOException if there are startup errors + */ + void startIngestion() throws IOException; + + /** + * Stop ingesting documents from the source. Plugin implementations should cleanup source + * connections and stop processing. + * + * @throws IOException if there are shutdown errors + */ + void stopIngestion() throws IOException; +} diff --git a/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java b/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java index 249fc39e9..0cbb57986 100644 --- a/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java +++ b/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java @@ -33,6 +33,7 @@ import com.yelp.nrtsearch.server.grpc.RefreshRequest; import com.yelp.nrtsearch.server.grpc.SettingsRequest; import com.yelp.nrtsearch.server.grpc.StartIndexRequest; +import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; import com.yelp.nrtsearch.server.plugins.Plugin; import com.yelp.nrtsearch.server.state.GlobalState; import com.yelp.nrtsearch.server.utils.NrtsearchTestConfigurationFactory; @@ -235,6 +236,11 @@ private GrpcServer setUpGrpcServer(PrometheusRegistry prometheusRegistry) throws null, getPlugins(configuration)); globalState = server.getGlobalState(); + for (Plugin plugin : getPlugins(configuration)) { + if (plugin instanceof AbstractIngestionPlugin abstractIngestionPlugin) { + abstractIngestionPlugin.initializeState(globalState); + } + } return server; } From d91bf796466f7bf643168cd77a51c0be18ea67c7 Mon Sep 17 00:00:00 2001 From: umesh Date: Wed, 30 Apr 2025 10:51:34 -0700 Subject: [PATCH 2/6] remove redundant code --- .../nrtsearch/plugins/example/ExamplePlugin.java | 16 +--------------- .../plugins/example/ExamplePluginTest.java | 1 - 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java index 2284587ee..65e0a8266 100644 --- a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java +++ b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java @@ -41,7 +41,7 @@ public class ExamplePlugin extends AbstractIngestionPlugin private static final Logger logger = LoggerFactory.getLogger(ExamplePlugin.class); public static final String INGESTION_TEST_INDEX = "ingestion_test_index"; private final String availableAnalyzers = String.join(",", getAnalyzers().keySet()); - private ExecutorService executorService; // No longer final so we can recreate + private ExecutorService executorService; private final AtomicBoolean running = new AtomicBoolean(false); private final List testDocuments = new ArrayList<>(); @@ -65,15 +65,6 @@ public ExamplePlugin(NrtsearchConfig configuration) { .build()); } - private synchronized ExecutorService getOrCreateExecutorService() { - if (executorService == null || executorService.isShutdown()) { - executorService = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); - } - return executorService; - } - @Override public String id() { return "custom_analyzers"; @@ -149,9 +140,4 @@ public void stopIngestion() throws IOException { } } } - - /** Get the test documents that will be ingested. Exposed for test validation. */ - List getTestDocuments() { - return testDocuments; - } } diff --git a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java index 27356bbfa..2510af7a0 100644 --- a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java +++ b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java @@ -50,7 +50,6 @@ public class ExamplePluginTest extends ServerTestCase { private static final String INGESTION_TEST_INDEX = ExamplePlugin.INGESTION_TEST_INDEX; private ExamplePlugin examplePlugin; - private static final String FIELD_1 = "field1"; @Override protected List getIndices() { From 1daeaf23dc50c7578364c44c72ac5a38d071e5c1 Mon Sep 17 00:00:00 2001 From: umesh Date: Fri, 2 May 2025 15:33:13 -0700 Subject: [PATCH 3/6] Refactor ingestion plugin lifecycle to support async execution and plugin-managed threading - Updated AbstractIngestionPlugin to launch startIngestion asynchronously using plugin-provided ExecutorService - Added getIngestionExecutor() abstract method for plugin-controlled threading - Removed IngestionPlugin interface (now redundant) - Updated ExamplePlugin to implement new lifecycle (NOTE: getIngestionExecutor must return a valid executor) - Updated ExamplePluginTest to use initializeAndStartIngestion and Awaitility for async ingestion - Updated NrtsearchServer to initialize ingestion plugins after startup - Added getIngestionPluginConfigs() to NrtsearchConfig for plugin-specific config support - Added unit test for ingestion plugin config parsing --- example-plugin/build.gradle | 2 +- .../plugins/example/ExamplePlugin.java | 5 ++ .../plugins/example/ExamplePluginTest.java | 2 +- gradle/libs.versions.toml | 2 + .../server/config/NrtsearchConfig.java | 21 +++++++ .../server/grpc/NrtsearchServer.java | 6 +- .../plugins/AbstractIngestionPlugin.java | 55 ++++++++++++++++--- .../server/plugins/IngestionPlugin.java | 38 ------------- .../yelp/nrtsearch/server/ServerTestCase.java | 6 -- .../server/config/NrtsearchConfigTest.java | 25 +++++++++ 10 files changed, 104 insertions(+), 58 deletions(-) delete mode 100644 src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java diff --git a/example-plugin/build.gradle b/example-plugin/build.gradle index cb4fe7092..425b5d1be 100644 --- a/example-plugin/build.gradle +++ b/example-plugin/build.gradle @@ -34,7 +34,7 @@ dependencies { testImplementation libs.grpc.testing testImplementation libs.junit testImplementation libs.protobuf.java - testImplementation("org.awaitility:awaitility:4.3.0") + testImplementation libs.awaitility } distributions { diff --git a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java index 65e0a8266..de9bbd67f 100644 --- a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java +++ b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java @@ -65,6 +65,11 @@ public ExamplePlugin(NrtsearchConfig configuration) { .build()); } + @Override + protected ExecutorService getIngestionExecutor() { + return null; + } + @Override public String id() { return "custom_analyzers"; diff --git a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java index 2510af7a0..0e6122de2 100644 --- a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java +++ b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java @@ -165,7 +165,7 @@ public void testAnalysisForSearch() { @Test public void testPluginIngestion() throws Exception { - examplePlugin.startIngestion(); + examplePlugin.initializeAndStartIngestion(getGlobalState()); await() .atMost(5, SECONDS) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f270e4565..c5ba36858 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,6 +6,7 @@ log4j = "2.23.1" lucene = "10.1.0" prometheus = "1.3.1" protobuf = "3.25.3" +awaitility = "4.3.0" [libraries] aws-java-sdk-core = { module = "com.amazonaws:aws-java-sdk-core", version.ref = "aws" } @@ -62,6 +63,7 @@ lucene-test-framework = { module = "org.apache.lucene:lucene-test-framework", ve mockito-core = { module = "org.mockito:mockito-core", version = "5.14.2" } s3mock = { module = "io.findify:s3mock_2.13", version = "0.2.6" } spatial4j = { module = "org.locationtech.spatial4j:spatial4j", version = "0.8" } +awaitility = {module = "org.awaitility:awaitility", version.ref = "awaitility"} [plugins] protobuf = { id = "com.google.protobuf", version = "0.9.4" } diff --git a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java index bd188942b..b7f4352e0 100644 --- a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java +++ b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java @@ -446,4 +446,25 @@ private static List getPluginSearchPath(Object o) { } return paths; } + + @SuppressWarnings("unchecked") + public Map> getIngestionPluginConfigs() { + try { + Object raw = configReader.get("pluginConfigs.ingestion", obj -> obj); + if (raw instanceof Map outerMap) { + Map> result = new HashMap<>(); + for (Map.Entry entry : outerMap.entrySet()) { + if (entry.getKey() instanceof String pluginName + && entry.getValue() instanceof Map pluginConfig) { + result.put(pluginName, (Map) pluginConfig); + } + } + return result; + } else { + throw new IllegalStateException("'pluginConfigs.ingestion' must be a map"); + } + } catch (ConfigKeyNotFoundException e) { + return Collections.emptyMap(); + } + } } diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index 7b5a566b3..ac0dbdde5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -159,7 +159,7 @@ public void start() throws IOException { GlobalState globalState = serverImpl.getGlobalState(); registerMetrics(globalState); - initializeIngestionPluginState(globalState, plugins); + initIngestionPlugin(globalState, plugins); if (configuration.getMaxConcurrentCallsPerConnectionForReplication() != -1) { replicationServer = @@ -227,11 +227,11 @@ serverImpl, new NrtsearchHeaderInterceptor(), monitoringInterceptor)) BootstrapMetrics.nrtsearchBootstrapTimer.set((System.nanoTime() - startNs) / 1_000_000_000.0); } - private void initializeIngestionPluginState(GlobalState globalState, List plugins) + private void initIngestionPlugin(GlobalState globalState, List plugins) throws IOException { for (Plugin plugin : plugins) { if (plugin instanceof AbstractIngestionPlugin abstractIngestionPlugin) { - abstractIngestionPlugin.initializeState(globalState); + abstractIngestionPlugin.initializeAndStartIngestion(globalState); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java index 80e536fce..8983ed4c4 100644 --- a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java @@ -19,10 +19,10 @@ import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; import com.yelp.nrtsearch.server.handler.AddDocumentHandler; import com.yelp.nrtsearch.server.index.IndexState; -import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ * implementations: 1. Must implement startIngestion/stopIngestion to handle source-specific logic * 2. Should use addDocuments/commit methods to index data during ingestion */ -public abstract class AbstractIngestionPlugin extends Plugin implements IngestionPlugin { +public abstract class AbstractIngestionPlugin extends Plugin { private static final Logger logger = LoggerFactory.getLogger(AbstractIngestionPlugin.class); protected final NrtsearchConfig config; @@ -42,15 +42,40 @@ protected AbstractIngestionPlugin(NrtsearchConfig config) { } /** - * Initialize plugin state with index access. Called by NrtSearchServer when index is ready. + * Initialize plugin state with index access and start Ingestion. Called by NrtSearchServer when + * index is ready. * * @param globalState The global server state */ - public final void initializeState(GlobalState globalState) throws IOException { + public final void initializeAndStartIngestion(GlobalState globalState) throws IOException { if (this.globalState != null) { throw new IllegalStateException("Plugin already initialized"); } this.globalState = globalState; + // Run startIngestion asynchronously + getIngestionExecutor() + .submit( + () -> { + try { + startIngestion(); + } catch (IOException e) { + logger.error("Error during startIngestion", e); + onIngestionStartFailure(e); + } + }); + } + + /** + * Provide an executor service for running ingestion tasks. Plugin implementations must manage the + * lifecycle of this executor. + * + * @return ExecutorService to run ingestion + */ + protected abstract ExecutorService getIngestionExecutor(); + + /** Plugin implementations can handle errors or lifecycle events by extending this method */ + protected void onIngestionStartFailure(Exception e) { + logger.error("Ingestion failed to start", e); } /** @@ -74,11 +99,7 @@ protected final long addDocuments(List addDocRequests, Strin protected final void commit(String indexName) throws IOException { verifyInitialized(indexName); IndexState indexState = globalState.getIndexOrThrow(indexName); - ShardState shard = indexState.getShard(0); - if (shard == null) { - throw new IllegalStateException("No shard found for index"); - } - shard.commit(); + indexState.commit(); } private void verifyInitialized(String indexName) throws IOException { @@ -92,4 +113,20 @@ private void verifyInitialized(String indexName) throws IOException { public void close() throws IOException { stopIngestion(); } + + /** + * Start ingesting documents from the source. Plugin implementations can start source connections + * and begin processing. Use the ExecutorService returned by getIngestionExecutor to achieve parallelism + * + * @throws IOException if there are startup errors + */ + protected abstract void startIngestion() throws IOException; + + /** + * Stop ingesting documents from the source. Plugin implementations should cleanup source + * connections and stop processing. + * + * @throws IOException if there are shutdown errors + */ + protected abstract void stopIngestion() throws IOException; } diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java deleted file mode 100644 index 52baf535a..000000000 --- a/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2024 Yelp Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yelp.nrtsearch.server.plugins; - -import java.io.IOException; - -/** Interface for ingestion plugins. Defines lifecycle methods that plugins must implement. */ -public interface IngestionPlugin { - - /** - * Start ingesting documents from the source. Plugin implementations can start source connections - * and begin processing. - * - * @throws IOException if there are startup errors - */ - void startIngestion() throws IOException; - - /** - * Stop ingesting documents from the source. Plugin implementations should cleanup source - * connections and stop processing. - * - * @throws IOException if there are shutdown errors - */ - void stopIngestion() throws IOException; -} diff --git a/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java b/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java index 0cbb57986..249fc39e9 100644 --- a/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java +++ b/src/test/java/com/yelp/nrtsearch/server/ServerTestCase.java @@ -33,7 +33,6 @@ import com.yelp.nrtsearch.server.grpc.RefreshRequest; import com.yelp.nrtsearch.server.grpc.SettingsRequest; import com.yelp.nrtsearch.server.grpc.StartIndexRequest; -import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; import com.yelp.nrtsearch.server.plugins.Plugin; import com.yelp.nrtsearch.server.state.GlobalState; import com.yelp.nrtsearch.server.utils.NrtsearchTestConfigurationFactory; @@ -236,11 +235,6 @@ private GrpcServer setUpGrpcServer(PrometheusRegistry prometheusRegistry) throws null, getPlugins(configuration)); globalState = server.getGlobalState(); - for (Plugin plugin : getPlugins(configuration)) { - if (plugin instanceof AbstractIngestionPlugin abstractIngestionPlugin) { - abstractIngestionPlugin.initializeState(globalState); - } - } return server; } diff --git a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java index b89333fbe..e2cc7e856 100644 --- a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java @@ -23,6 +23,7 @@ import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; import com.yelp.nrtsearch.server.index.DirectoryFactory; import java.io.ByteArrayInputStream; +import java.util.Map; import org.apache.lucene.search.suggest.document.CompletionPostingsFormat.FSTLoadMode; import org.junit.Test; @@ -246,4 +247,28 @@ public void testRequireIdField_set() { NrtsearchConfig luceneConfig = getForConfig(config); assertTrue(luceneConfig.getRequireIdField()); } + + @Test + public void testGetIngestionPluginConfigs() { + String config = + String.join( + "\n", + "nodeName: \"server_foo\"", + "plugins:", + " - kafka-plugin", + " - s3-plugin", + "pluginConfigs:", + " ingestion:", + " kafka:", + " topic: \"my-topic\"", + " autoCommitEnabled: false", + " s3:", + " bucket: \"my-bucket\""); + + NrtsearchConfig luceneConfig = getForConfig(config); + Map> ingestionConfigs = luceneConfig.getIngestionPluginConfigs(); + + assertEquals("my-topic", ingestionConfigs.get("kafka").get("topic")); + assertEquals("my-bucket", ingestionConfigs.get("s3").get("bucket")); + } } From dd4b9bf8826e90df1d806745d5b48fd05c3fbd72 Mon Sep 17 00:00:00 2001 From: umesh Date: Fri, 2 May 2025 15:38:30 -0700 Subject: [PATCH 4/6] fix ExamplePlugin to use executorService --- .../plugins/example/ExamplePlugin.java | 66 +++++++++---------- .../plugins/AbstractIngestionPlugin.java | 3 +- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java index de9bbd67f..a94f94073 100644 --- a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java +++ b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Yelp Inc. + * Copyright 2025 Yelp Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,16 +38,23 @@ public class ExamplePlugin extends AbstractIngestionPlugin implements AnalysisPlugin, CustomRequestPlugin { + private static final Logger logger = LoggerFactory.getLogger(ExamplePlugin.class); public static final String INGESTION_TEST_INDEX = "ingestion_test_index"; + private final String availableAnalyzers = String.join(",", getAnalyzers().keySet()); - private ExecutorService executorService; + private final ExecutorService executorService; private final AtomicBoolean running = new AtomicBoolean(false); private final List testDocuments = new ArrayList<>(); public ExamplePlugin(NrtsearchConfig configuration) { super(configuration); + // Create executor service (single-threaded for this example) + this.executorService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); + // Create test documents testDocuments.add( AddDocumentRequest.newBuilder() @@ -65,11 +72,6 @@ public ExamplePlugin(NrtsearchConfig configuration) { .build()); } - @Override - protected ExecutorService getIngestionExecutor() { - return null; - } - @Override public String id() { return "custom_analyzers"; @@ -95,34 +97,34 @@ public Map> getAnalyzers() { .addTokenFilter("lowercase") .build(); } catch (Exception e) { + logger.error("Failed to create analyzer", e); return null; } }); } + @Override + protected ExecutorService getIngestionExecutor() { + return executorService; + } + @Override public void startIngestion() throws IOException { logger.info("Starting example ingestion"); + if (!running.compareAndSet(false, true)) { logger.warn("Ingestion already running"); return; } - executorService = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); - - executorService.submit( - () -> { - try { - addDocuments(testDocuments, INGESTION_TEST_INDEX); - commit(INGESTION_TEST_INDEX); - } catch (Exception e) { - logger.error("Error during ingestion", e); - } finally { - running.set(false); // Reset running flag when done - } - }); + try { + addDocuments(testDocuments, INGESTION_TEST_INDEX); + commit(INGESTION_TEST_INDEX); + } catch (Exception e) { + logger.error("Error during ingestion", e); + } finally { + running.set(false); + } } @Override @@ -130,19 +132,15 @@ public void stopIngestion() throws IOException { logger.info("Stopping example ingestion"); running.set(false); // Signal stop (not strictly needed for one-shot task) - if (executorService != null) { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - logger.warn("Ingestion thread did not complete within timeout"); - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while stopping ingestion", e); - } finally { - executorService = null; // Allow recreation + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("Ingestion thread did not complete within timeout"); + executorService.shutdownNow(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while stopping ingestion", e); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java index 8983ed4c4..0b5727ab4 100644 --- a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java @@ -116,7 +116,8 @@ public void close() throws IOException { /** * Start ingesting documents from the source. Plugin implementations can start source connections - * and begin processing. Use the ExecutorService returned by getIngestionExecutor to achieve parallelism + * and begin processing. Use the ExecutorService returned by getIngestionExecutor to achieve + * parallelism * * @throws IOException if there are startup errors */ From 24ebf7d68af7ab81987046d8bd4823e4a066a354 Mon Sep 17 00:00:00 2001 From: umesh Date: Mon, 5 May 2025 18:56:49 -0700 Subject: [PATCH 5/6] feat(ingestion): add ingestion plugin framework and revert example plugin changes - Introduced ingestion plugin framework: - Added AbstractIngestor, Ingestor interface, and IngestionPluginUtils - Updated NrtsearchServer to initialize ingestion plugins - Added unit tests for ingestion plugin utilities - Reverted changes to example-plugin: - Restored ExamplePlugin.java and ExamplePluginTest.java to original state from v1.0.0-beta.12 - Removed async ingestion logic and executor service usage - Prevents dependency issues in Jenkins due to plugin build order - Updated build.gradle and settings.gradle to support ingestion plugin framework --- example-plugin/build.gradle | 1 - .../plugins/example/ExamplePlugin.java | 92 +----------- .../plugins/example/ExamplePluginTest.java | 110 +++------------ .../resources/register_fields_ingestion.json | 15 -- gradle/libs.versions.toml | 2 - settings.gradle | 1 - .../server/grpc/NrtsearchServer.java | 7 +- .../server/ingestion/AbstractIngestor.java | 102 ++++++++++++++ .../ingestion/IngestionPluginUtils.java | 41 ++++++ .../nrtsearch/server/ingestion/Ingestor.java | 95 +++++++++++++ .../plugins/AbstractIngestionPlugin.java | 133 ------------------ .../server/plugins/IngestionPlugin.java | 30 ++++ .../ingestion/AbstractIngestorTest.java | 116 +++++++++++++++ .../ingestion/IngestionPluginUtilsTest.java | 84 +++++++++++ 14 files changed, 494 insertions(+), 335 deletions(-) delete mode 100644 example-plugin/src/test/resources/register_fields_ingestion.json create mode 100644 src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java delete mode 100644 src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java create mode 100644 src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java create mode 100644 src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java create mode 100644 src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java diff --git a/example-plugin/build.gradle b/example-plugin/build.gradle index 425b5d1be..0062bbe6e 100644 --- a/example-plugin/build.gradle +++ b/example-plugin/build.gradle @@ -34,7 +34,6 @@ dependencies { testImplementation libs.grpc.testing testImplementation libs.junit testImplementation libs.protobuf.java - testImplementation libs.awaitility } distributions { diff --git a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java index a94f94073..57d29ac65 100644 --- a/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java +++ b/example-plugin/src/main/java/com/yelp/nrtsearch/plugins/example/ExamplePlugin.java @@ -1,5 +1,5 @@ /* - * Copyright 2025 Yelp Inc. + * Copyright 2023 Yelp Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,62 +15,22 @@ */ package com.yelp.nrtsearch.plugins.example; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.yelp.nrtsearch.server.analysis.AnalysisProvider; import com.yelp.nrtsearch.server.config.NrtsearchConfig; -import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; -import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; import com.yelp.nrtsearch.server.plugins.AnalysisPlugin; import com.yelp.nrtsearch.server.plugins.CustomRequestPlugin; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import com.yelp.nrtsearch.server.plugins.Plugin; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.custom.CustomAnalyzer; import org.apache.lucene.util.Version; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ExamplePlugin extends AbstractIngestionPlugin - implements AnalysisPlugin, CustomRequestPlugin { - - private static final Logger logger = LoggerFactory.getLogger(ExamplePlugin.class); - public static final String INGESTION_TEST_INDEX = "ingestion_test_index"; +public class ExamplePlugin extends Plugin implements AnalysisPlugin, CustomRequestPlugin { private final String availableAnalyzers = String.join(",", getAnalyzers().keySet()); - private final ExecutorService executorService; - private final AtomicBoolean running = new AtomicBoolean(false); - private final List testDocuments = new ArrayList<>(); - - public ExamplePlugin(NrtsearchConfig configuration) { - super(configuration); - // Create executor service (single-threaded for this example) - this.executorService = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("example-ingestion-%d").build()); - - // Create test documents - testDocuments.add( - AddDocumentRequest.newBuilder() - .setIndexName(INGESTION_TEST_INDEX) - .putFields( - "field1", - AddDocumentRequest.MultiValuedField.newBuilder().addValue("test doc 1").build()) - .build()); - testDocuments.add( - AddDocumentRequest.newBuilder() - .setIndexName(INGESTION_TEST_INDEX) - .putFields( - "field1", - AddDocumentRequest.MultiValuedField.newBuilder().addValue("test doc 2").build()) - .build()); - } + // Constructor that accepts LuceneServerConfiguration object is required + public ExamplePlugin(NrtsearchConfig configuration) {} @Override public String id() { @@ -97,50 +57,8 @@ public Map> getAnalyzers() { .addTokenFilter("lowercase") .build(); } catch (Exception e) { - logger.error("Failed to create analyzer", e); return null; } }); } - - @Override - protected ExecutorService getIngestionExecutor() { - return executorService; - } - - @Override - public void startIngestion() throws IOException { - logger.info("Starting example ingestion"); - - if (!running.compareAndSet(false, true)) { - logger.warn("Ingestion already running"); - return; - } - - try { - addDocuments(testDocuments, INGESTION_TEST_INDEX); - commit(INGESTION_TEST_INDEX); - } catch (Exception e) { - logger.error("Error during ingestion", e); - } finally { - running.set(false); - } - } - - @Override - public void stopIngestion() throws IOException { - logger.info("Stopping example ingestion"); - running.set(false); // Signal stop (not strictly needed for one-shot task) - - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - logger.warn("Ingestion thread did not complete within timeout"); - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while stopping ingestion", e); - } - } } diff --git a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java index 0e6122de2..991dd85ff 100644 --- a/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java +++ b/example-plugin/src/test/java/com/yelp/nrtsearch/plugins/example/ExamplePluginTest.java @@ -15,9 +15,7 @@ */ package com.yelp.nrtsearch.plugins.example; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import com.yelp.nrtsearch.server.ServerTestCase; import com.yelp.nrtsearch.server.config.NrtsearchConfig; @@ -28,7 +26,6 @@ import com.yelp.nrtsearch.server.grpc.FieldDefRequest; import com.yelp.nrtsearch.server.grpc.MatchQuery; import com.yelp.nrtsearch.server.grpc.Query; -import com.yelp.nrtsearch.server.grpc.RefreshRequest; import com.yelp.nrtsearch.server.grpc.SearchRequest; import com.yelp.nrtsearch.server.grpc.SearchResponse; import com.yelp.nrtsearch.server.plugins.Plugin; @@ -40,80 +37,41 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; -import org.junit.After; -import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class ExamplePluginTest extends ServerTestCase { @ClassRule public static final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - private static final String INGESTION_TEST_INDEX = ExamplePlugin.INGESTION_TEST_INDEX; - private ExamplePlugin examplePlugin; - - @Override - protected List getIndices() { - return List.of(DEFAULT_TEST_INDEX, INGESTION_TEST_INDEX); - } + private static final ExamplePlugin examplePlugin = new ExamplePlugin(getConfig()); @Override protected List getPlugins(NrtsearchConfig configuration) { - examplePlugin = new ExamplePlugin(configuration); return List.of(examplePlugin); } @Override protected FieldDefRequest getIndexDef(String name) throws IOException { - if (DEFAULT_TEST_INDEX.equals(name)) { - return getFieldsFromResourceFile("/register_fields.json"); - } else if (INGESTION_TEST_INDEX.equals(name)) { - return getFieldsFromResourceFile("/register_fields_ingestion.json"); - } else { - throw new IllegalArgumentException("Unknown index: " + name); - } + return getFieldsFromResourceFile("/register_fields.json"); } @Override protected void initIndex(String name) throws Exception { - // Only initialize test docs for analysis test index - if (DEFAULT_TEST_INDEX.equals(name)) { - AddDocumentRequest addDocumentRequest = - AddDocumentRequest.newBuilder() - .setIndexName(name) - .putFields( - "field1", - MultiValuedField.newBuilder().addValue("How to use Nrtsearch
").build()) - .build(); - AddDocumentRequest addDocumentRequest2 = - AddDocumentRequest.newBuilder() - .setIndexName(name) - .putFields( - "field1", - MultiValuedField.newBuilder() - .addValue("How to create plugin") - .build()) - .build(); - addDocuments(Stream.of(addDocumentRequest, addDocumentRequest2)); - } - } - - @Before - public void ensurePluginInitialized() throws Exception { - if (examplePlugin == null) { - for (Plugin plugin : getPlugins(getConfig())) { - if (plugin instanceof ExamplePlugin) { - examplePlugin = (ExamplePlugin) plugin; - break; - } - } - } - } - - @After - public void cleanupPlugin() throws Exception { - if (examplePlugin != null) { - examplePlugin.stopIngestion(); - } + AddDocumentRequest addDocumentRequest = + AddDocumentRequest.newBuilder() + .setIndexName(name) + .putFields( + "field1", + MultiValuedField.newBuilder().addValue("How to use Nrtsearch
").build()) + .build(); + AddDocumentRequest addDocumentRequest2 = + AddDocumentRequest.newBuilder() + .setIndexName(name) + .putFields( + "field1", + MultiValuedField.newBuilder().addValue("How to create plugin").build()) + .build(); + addDocuments(Stream.of(addDocumentRequest, addDocumentRequest2)); } @Test @@ -163,40 +121,6 @@ public void testAnalysisForSearch() { assertThat(response.getHitsCount()).isEqualTo(1); } - @Test - public void testPluginIngestion() throws Exception { - examplePlugin.initializeAndStartIngestion(getGlobalState()); - - await() - .atMost(5, SECONDS) - .untilAsserted( - () -> { - getGrpcServer() - .getBlockingStub() - .refresh(RefreshRequest.newBuilder().setIndexName(INGESTION_TEST_INDEX).build()); - - SearchResponse response = - getGrpcServer() - .getBlockingStub() - .search( - SearchRequest.newBuilder() - .setIndexName(INGESTION_TEST_INDEX) - .setStartHit(0) - .setTopHits(5) - .setQuery( - Query.newBuilder() - .setMatchQuery( - MatchQuery.newBuilder() - .setField("field1") - .setQuery("test doc") - .build()) - .build()) - .build()); - - assertThat(response.getHitsCount()).isEqualTo(2); - }); - } - private static NrtsearchConfig getConfig() { String config = "nodeName: \"server_foo\""; return new NrtsearchConfig(new ByteArrayInputStream(config.getBytes())); diff --git a/example-plugin/src/test/resources/register_fields_ingestion.json b/example-plugin/src/test/resources/register_fields_ingestion.json deleted file mode 100644 index 729e02775..000000000 --- a/example-plugin/src/test/resources/register_fields_ingestion.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "indexName": "ingestion_test_index", - "field": [ - { - "name": "field1", - "type": "TEXT", - "search": true - }, - { - "name": "field2", - "type": "TEXT", - "search": true - } - ] -} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c5ba36858..f270e4565 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,7 +6,6 @@ log4j = "2.23.1" lucene = "10.1.0" prometheus = "1.3.1" protobuf = "3.25.3" -awaitility = "4.3.0" [libraries] aws-java-sdk-core = { module = "com.amazonaws:aws-java-sdk-core", version.ref = "aws" } @@ -63,7 +62,6 @@ lucene-test-framework = { module = "org.apache.lucene:lucene-test-framework", ve mockito-core = { module = "org.mockito:mockito-core", version = "5.14.2" } s3mock = { module = "io.findify:s3mock_2.13", version = "0.2.6" } spatial4j = { module = "org.locationtech.spatial4j:spatial4j", version = "0.8" } -awaitility = {module = "org.awaitility:awaitility", version.ref = "awaitility"} [plugins] protobuf = { id = "com.google.protobuf", version = "0.9.4" } diff --git a/settings.gradle b/settings.gradle index 72a9bc687..b4acf42ee 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,4 +8,3 @@ pluginManagement { } include('clientlib') rootProject.name = 'nrtsearch' -include('example-plugin') diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index ac0dbdde5..9404a3022 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -79,6 +79,7 @@ import com.yelp.nrtsearch.server.handler.UpdateFieldsHandler; import com.yelp.nrtsearch.server.handler.WriteNRTPointHandler; import com.yelp.nrtsearch.server.highlights.HighlighterService; +import com.yelp.nrtsearch.server.ingestion.IngestionPluginUtils; import com.yelp.nrtsearch.server.logging.HitsLoggerCreator; import com.yelp.nrtsearch.server.modules.NrtsearchModule; import com.yelp.nrtsearch.server.monitoring.BootstrapMetrics; @@ -95,7 +96,7 @@ import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector.RejectionCounterWrapper; -import com.yelp.nrtsearch.server.plugins.AbstractIngestionPlugin; +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; import com.yelp.nrtsearch.server.plugins.Plugin; import com.yelp.nrtsearch.server.plugins.PluginsService; import com.yelp.nrtsearch.server.remote.RemoteBackend; @@ -230,8 +231,8 @@ serverImpl, new NrtsearchHeaderInterceptor(), monitoringInterceptor)) private void initIngestionPlugin(GlobalState globalState, List plugins) throws IOException { for (Plugin plugin : plugins) { - if (plugin instanceof AbstractIngestionPlugin abstractIngestionPlugin) { - abstractIngestionPlugin.initializeAndStartIngestion(globalState); + if (plugin instanceof IngestionPlugin ingestionPlugin) { + IngestionPluginUtils.initializeAndStart(ingestionPlugin, globalState); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java new file mode 100644 index 000000000..069ed4d2b --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java @@ -0,0 +1,102 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.handler.AddDocumentHandler; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.List; + +/** + * Abstract base class for ingestion implementations. Provides common ingestion utilities like + * addDocuments and commit. Plugin-specific ingestion logic should extend this class and implement + * start/stop. + */ +public abstract class AbstractIngestor implements Ingestor { + protected final NrtsearchConfig config; + protected GlobalState globalState; + + public AbstractIngestor(NrtsearchConfig config) { + this.config = config; + } + + /** + * Called by the framework to initialize the ingestor with global state. Must be called before + * addDocuments or commit. + */ + @Override + public void initialize(GlobalState globalState) { + this.globalState = globalState; + } + + /** + * Add documents to the index. + * + * @param addDocRequests list of documents to add + * @param indexName target index + * @return sequence number of the indexing operation + * @throws Exception if indexing fails + */ + @Override + public long addDocuments(List addDocRequests, String indexName) + throws Exception { + verifyInitialized(indexName); + return new AddDocumentHandler.DocumentIndexer(globalState, addDocRequests, indexName) + .runIndexingJob(); + } + + /** + * Commit changes to the index. + * + * @param indexName target index + * @throws IOException if commit fails + */ + @Override + public void commit(String indexName) throws IOException { + verifyInitialized(indexName); + IndexState indexState = globalState.getIndexOrThrow(indexName); + indexState.commit(); + } + + private void verifyInitialized(String indexName) throws IOException { + if (globalState == null) { + throw new IllegalStateException("Ingestor not initialized with GlobalState"); + } + IndexState indexState = globalState.getIndexOrThrow(indexName); + if (indexState == null) { + throw new IllegalStateException("Index not found: " + indexName); + } + } + + /** + * Start ingestion logic. Must be implemented by plugin-specific subclass. + * + * @throws IOException if startup fails + */ + @Override + public abstract void start() throws IOException; + + /** + * Stop ingestion logic. Must be implemented by plugin-specific subclass. + * + * @throws IOException if shutdown fails + */ + @Override + public abstract void stop() throws IOException; +} diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java new file mode 100644 index 000000000..94c4539dc --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +public class IngestionPluginUtils { + public static void initializeAndStart(IngestionPlugin plugin, GlobalState globalState) + throws IOException { + Ingestor ingestor = plugin.getIngestor(); + if (ingestor instanceof AbstractIngestor abstractIngestor) { + abstractIngestor.initialize(globalState); + } + + ExecutorService executor = plugin.getIngestionExecutor(); + executor.submit( + () -> { + try { + ingestor.start(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java new file mode 100644 index 000000000..1b5f9a786 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.List; + +/** + * Interface for ingestion logic used by plugins. + * + *

This interface defines the lifecycle and operations for ingesting documents into an index. + * Plugin implementations can use this interface to encapsulate source-specific ingestion logic + * (e.g., reading from Kafka, S3, etc.) while leveraging shared indexing utilities. + * + *

Implementations are expected to: + * + *

    + *
  • Initialize with {@link GlobalState} before starting + *
  • Start ingestion in a background thread (if needed) + *
  • Use {@link #addDocuments(List, String)} and {@link #commit(String)} to index data + *
  • Clean up resources in {@link #stop()} + *
+ */ +public interface Ingestor { + + /** + * Initialize the ingestor with the global server state. + * + *

This method is called once by the framework before ingestion starts. Implementations should + * store the global state for later use (e.g., to access index state). + * + * @param globalState the global server state + */ + void initialize(GlobalState globalState); + + /** + * Start the ingestion process. + * + *

This method should contain the logic to begin reading from the ingestion source (e.g., + * Kafka, file system, etc.). It may block or spawn background threads depending on the + * implementation. + * + * @throws IOException if ingestion startup fails + */ + void start() throws IOException; + + /** + * Stop the ingestion process and clean up resources. + * + *

This method is called during plugin shutdown. Implementations should stop any background + * threads, close connections, and release resources. + * + * @throws IOException if ingestion shutdown fails + */ + void stop() throws IOException; + + /** + * Add a batch of documents to the specified index. + * + *

This method is typically called from within the ingestion loop to index new data. It returns + * the Lucene sequence number of the indexing operation. + * + * @param addDocRequests list of document requests to add + * @param indexName name of the target index + * @return sequence number of the indexing operation + * @throws Exception if indexing fails + */ + long addDocuments(List addDocRequests, String indexName) throws Exception; + + /** + * Commit any pending changes to the specified index. + * + *

This method should be called periodically or after a batch of documents is added to ensure + * durability and visibility of the changes. + * + * @param indexName name of the target index + * @throws IOException if commit fails + */ + void commit(String indexName) throws IOException; +} diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java deleted file mode 100644 index 0b5727ab4..000000000 --- a/src/main/java/com/yelp/nrtsearch/server/plugins/AbstractIngestionPlugin.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2024 Yelp Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yelp.nrtsearch.server.plugins; - -import com.yelp.nrtsearch.server.config.NrtsearchConfig; -import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; -import com.yelp.nrtsearch.server.handler.AddDocumentHandler; -import com.yelp.nrtsearch.server.index.IndexState; -import com.yelp.nrtsearch.server.state.GlobalState; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for ingestion plugins that handles state management and indexing operations. Plugin - * implementations: 1. Must implement startIngestion/stopIngestion to handle source-specific logic - * 2. Should use addDocuments/commit methods to index data during ingestion - */ -public abstract class AbstractIngestionPlugin extends Plugin { - private static final Logger logger = LoggerFactory.getLogger(AbstractIngestionPlugin.class); - - protected final NrtsearchConfig config; - private GlobalState globalState; - - protected AbstractIngestionPlugin(NrtsearchConfig config) { - this.config = config; - } - - /** - * Initialize plugin state with index access and start Ingestion. Called by NrtSearchServer when - * index is ready. - * - * @param globalState The global server state - */ - public final void initializeAndStartIngestion(GlobalState globalState) throws IOException { - if (this.globalState != null) { - throw new IllegalStateException("Plugin already initialized"); - } - this.globalState = globalState; - // Run startIngestion asynchronously - getIngestionExecutor() - .submit( - () -> { - try { - startIngestion(); - } catch (IOException e) { - logger.error("Error during startIngestion", e); - onIngestionStartFailure(e); - } - }); - } - - /** - * Provide an executor service for running ingestion tasks. Plugin implementations must manage the - * lifecycle of this executor. - * - * @return ExecutorService to run ingestion - */ - protected abstract ExecutorService getIngestionExecutor(); - - /** Plugin implementations can handle errors or lifecycle events by extending this method */ - protected void onIngestionStartFailure(Exception e) { - logger.error("Ingestion failed to start", e); - } - - /** - * Add documents. Available for plugin implementations to call during ingestion. - * - * @param addDocRequests List of document requests to add - * @return The sequence number of the indexing operation - * @throws IOException if there are indexing errors - */ - protected final long addDocuments(List addDocRequests, String indexName) - throws Exception { - return new AddDocumentHandler.DocumentIndexer(globalState, addDocRequests, indexName) - .runIndexingJob(); - } - - /** - * Commit ingested documents. Available for plugin implementations to call during ingestion. - * - * @throws IOException if there are commit errors - */ - protected final void commit(String indexName) throws IOException { - verifyInitialized(indexName); - IndexState indexState = globalState.getIndexOrThrow(indexName); - indexState.commit(); - } - - private void verifyInitialized(String indexName) throws IOException { - IndexState indexState = globalState.getIndexOrThrow(indexName); - if (indexState == null) { - throw new IllegalStateException("Plugin not initialized"); - } - } - - @Override - public void close() throws IOException { - stopIngestion(); - } - - /** - * Start ingesting documents from the source. Plugin implementations can start source connections - * and begin processing. Use the ExecutorService returned by getIngestionExecutor to achieve - * parallelism - * - * @throws IOException if there are startup errors - */ - protected abstract void startIngestion() throws IOException; - - /** - * Stop ingesting documents from the source. Plugin implementations should cleanup source - * connections and stop processing. - * - * @throws IOException if there are shutdown errors - */ - protected abstract void stopIngestion() throws IOException; -} diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java new file mode 100644 index 000000000..56babd6b5 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java @@ -0,0 +1,30 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.plugins; + +import com.yelp.nrtsearch.server.ingestion.Ingestor; +import java.util.concurrent.ExecutorService; + +public interface IngestionPlugin { + /** Interface for ingestion logic used by plugins */ + Ingestor getIngestor(); + + /** + * Provide an executor service for running ingestion. Plugin is responsible for managing its + * lifecycle. + */ + ExecutorService getIngestionExecutor(); +} diff --git a/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java b/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java new file mode 100644 index 000000000..d33051883 --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class AbstractIngestorTest { + + private TestIngestor ingestor; + private GlobalState mockGlobalState; + private IndexState mockIndexState; + + @Before + public void setUp() { + NrtsearchConfig config = mock(NrtsearchConfig.class); + ingestor = new TestIngestor(config); + + mockGlobalState = mock(GlobalState.class); + mockIndexState = mock(IndexState.class); + } + + @Test + public void testInitialize() throws IOException { + when(mockGlobalState.getIndexOrThrow("test_index")).thenReturn(mockIndexState); + ingestor.initialize(mockGlobalState); + + // Should not throw + ingestor.commit("test_index"); + verify(mockGlobalState, atLeastOnce()).getIndexOrThrow("test_index"); + verify(mockIndexState).commit(); + } + + @Test(expected = IllegalStateException.class) + public void testCommitWithoutInitialize() throws IOException { + ingestor.commit("test_index"); + } + + @Test(expected = IllegalStateException.class) + public void testAddDocumentsWithoutInitialize() throws Exception { + List docs = Collections.emptyList(); + ingestor.addDocuments(docs, "test_index"); + } + + @Test + public void testStartAndStopBehavior() throws IOException { + FlagIngestor ingestor = new FlagIngestor(mock(NrtsearchConfig.class)); + assertFalse(ingestor.started); + assertFalse(ingestor.stopped); + + ingestor.start(); + assertTrue(ingestor.started); + + ingestor.stop(); + assertTrue(ingestor.stopped); + } + + private static class FlagIngestor extends AbstractIngestor { + boolean started = false; + boolean stopped = false; + + public FlagIngestor(NrtsearchConfig config) { + super(config); + } + + @Override + public void start() throws IOException { + started = true; + } + + @Override + public void stop() throws IOException { + stopped = true; + } + } + + // Minimal concrete subclass for testing + private static class TestIngestor extends AbstractIngestor { + public TestIngestor(NrtsearchConfig config) { + super(config); + } + + @Override + public void start() throws IOException { + // no-op + } + + @Override + public void stop() throws IOException { + // no-op + } + } +} diff --git a/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java b/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java new file mode 100644 index 000000000..7e11b30df --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import static org.mockito.Mockito.*; + +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import org.junit.Before; +import org.junit.Test; + +public class IngestionPluginUtilsTest { + + private IngestionPlugin mockPlugin; + private AbstractIngestor mockIngestor; + private ExecutorService mockExecutor; + private GlobalState mockGlobalState; + + @Before + public void setUp() { + mockPlugin = mock(IngestionPlugin.class); + mockIngestor = mock(AbstractIngestor.class); + mockExecutor = mock(ExecutorService.class); + mockGlobalState = mock(GlobalState.class); + + when(mockPlugin.getIngestor()).thenReturn(mockIngestor); + when(mockPlugin.getIngestionExecutor()).thenReturn(mockExecutor); + } + + @Test + public void testInitializeAndStart_submitsStartTask() throws IOException { + // Capture the submitted Runnable + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); // simulate executor running the task + return null; + }) + .when(mockExecutor) + .submit(any(Runnable.class)); + + IngestionPluginUtils.initializeAndStart(mockPlugin, mockGlobalState); + + // Verify that initialize and start were called + verify(mockIngestor).initialize(mockGlobalState); + verify(mockIngestor).start(); + verify(mockExecutor).submit(any(Runnable.class)); + } + + @Test + public void testInitializeAndStart_nonAbstractIngestor() throws IOException { + Ingestor mockBasicIngestor = mock(Ingestor.class); + when(mockPlugin.getIngestor()).thenReturn(mockBasicIngestor); + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return null; + }) + .when(mockExecutor) + .submit(any(Runnable.class)); + + IngestionPluginUtils.initializeAndStart(mockPlugin, mockGlobalState); + + verify(mockBasicIngestor, never()).initialize(any()); + verify(mockBasicIngestor).start(); + } +} From 3c5416a823da17fbfbfea9f62f50a9a85456bd0d Mon Sep 17 00:00:00 2001 From: umesh Date: Tue, 6 May 2025 15:11:58 -0700 Subject: [PATCH 6/6] address PR comments, use JsonUtils to add type safe deser, move ingestion plugin init location --- .../server/config/IngestionPluginConfigs.java | 34 +++++++++++++++++++ .../server/config/NrtsearchConfig.java | 17 ++-------- .../server/grpc/NrtsearchServer.java | 20 +++++------ .../server/config/NrtsearchConfigTest.java | 20 +++++++++++ 4 files changed, 67 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java diff --git a/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java b/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java new file mode 100644 index 000000000..0436dc3f7 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java @@ -0,0 +1,34 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.config; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import java.util.HashMap; +import java.util.Map; + +public class IngestionPluginConfigs { + + private final Map> pluginConfigs = new HashMap<>(); + + @JsonAnySetter + public void addPluginConfig(String pluginName, Map config) { + pluginConfigs.put(pluginName, config); + } + + public Map> getPluginConfigs() { + return pluginConfigs; + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java index b7f4352e0..350a48cb6 100644 --- a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java +++ b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java @@ -447,22 +447,11 @@ private static List getPluginSearchPath(Object o) { return paths; } - @SuppressWarnings("unchecked") public Map> getIngestionPluginConfigs() { try { - Object raw = configReader.get("pluginConfigs.ingestion", obj -> obj); - if (raw instanceof Map outerMap) { - Map> result = new HashMap<>(); - for (Map.Entry entry : outerMap.entrySet()) { - if (entry.getKey() instanceof String pluginName - && entry.getValue() instanceof Map pluginConfig) { - result.put(pluginName, (Map) pluginConfig); - } - } - return result; - } else { - throw new IllegalStateException("'pluginConfigs.ingestion' must be a map"); - } + return configReader.get( + "pluginConfigs.ingestion", + obj -> JsonUtils.convertValue(obj, IngestionPluginConfigs.class).getPluginConfigs()); } catch (ConfigKeyNotFoundException e) { return Collections.emptyMap(); } diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index 9404a3022..b27a0b94c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -160,7 +160,6 @@ public void start() throws IOException { GlobalState globalState = serverImpl.getGlobalState(); registerMetrics(globalState); - initIngestionPlugin(globalState, plugins); if (configuration.getMaxConcurrentCallsPerConnectionForReplication() != -1) { replicationServer = @@ -228,15 +227,6 @@ serverImpl, new NrtsearchHeaderInterceptor(), monitoringInterceptor)) BootstrapMetrics.nrtsearchBootstrapTimer.set((System.nanoTime() - startNs) / 1_000_000_000.0); } - private void initIngestionPlugin(GlobalState globalState, List plugins) - throws IOException { - for (Plugin plugin : plugins) { - if (plugin instanceof IngestionPlugin ingestionPlugin) { - IngestionPluginUtils.initializeAndStart(ingestionPlugin, globalState); - } - } - } - @VisibleForTesting public void stop() { if (server != null) { @@ -402,6 +392,7 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase { this.globalState = GlobalState.createState(configuration, remoteBackend); // Initialize handlers + initIngestionPlugin(globalState, plugins); addDocumentHandler = new AddDocumentHandler(globalState); backupWarmingQueriesHandler = new BackupWarmingQueriesHandler(globalState); commitHandler = new CommitHandler(globalState); @@ -440,6 +431,15 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase { updateFieldsHandler = new UpdateFieldsHandler(globalState); } + private void initIngestionPlugin(GlobalState globalState, List plugins) + throws IOException { + for (Plugin plugin : plugins) { + if (plugin instanceof IngestionPlugin ingestionPlugin) { + IngestionPluginUtils.initializeAndStart(ingestionPlugin, globalState); + } + } + } + @VisibleForTesting static void initQueryCache(NrtsearchConfig configuration) { QueryCacheConfig cacheConfig = configuration.getQueryCacheConfig(); diff --git a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java index e2cc7e856..994bd1213 100644 --- a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java @@ -271,4 +271,24 @@ public void testGetIngestionPluginConfigs() { assertEquals("my-topic", ingestionConfigs.get("kafka").get("topic")); assertEquals("my-bucket", ingestionConfigs.get("s3").get("bucket")); } + + @Test + public void testMissingIngestionConfigReturnsEmptyMap() { + String config = String.join("\n", "nodeName: \"server_foo\"", "plugins:", " - kafka-plugin"); + + NrtsearchConfig luceneConfig = getForConfig(config); + Map> ingestionConfigs = luceneConfig.getIngestionPluginConfigs(); + + assertTrue(ingestionConfigs.isEmpty()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidIngestionConfigThrows() { + String config = + String.join( + "\n", "nodeName: \"server_foo\"", "pluginConfigs:", " ingestion:", " - kafka"); + + NrtsearchConfig luceneConfig = getForConfig(config); + luceneConfig.getIngestionPluginConfigs(); // should throw + } }