diff --git a/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java b/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java new file mode 100644 index 000000000..0436dc3f7 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/config/IngestionPluginConfigs.java @@ -0,0 +1,34 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.config; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import java.util.HashMap; +import java.util.Map; + +public class IngestionPluginConfigs { + + private final Map> pluginConfigs = new HashMap<>(); + + @JsonAnySetter + public void addPluginConfig(String pluginName, Map config) { + pluginConfigs.put(pluginName, config); + } + + public Map> getPluginConfigs() { + return pluginConfigs; + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java index bd188942b..350a48cb6 100644 --- a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java +++ b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java @@ -446,4 +446,14 @@ private static List getPluginSearchPath(Object o) { } return paths; } + + public Map> getIngestionPluginConfigs() { + try { + return configReader.get( + "pluginConfigs.ingestion", + obj -> JsonUtils.convertValue(obj, IngestionPluginConfigs.class).getPluginConfigs()); + } catch (ConfigKeyNotFoundException e) { + return Collections.emptyMap(); + } + } } diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index f0d6fbf5b..b27a0b94c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -79,6 +79,7 @@ import com.yelp.nrtsearch.server.handler.UpdateFieldsHandler; import com.yelp.nrtsearch.server.handler.WriteNRTPointHandler; import com.yelp.nrtsearch.server.highlights.HighlighterService; +import com.yelp.nrtsearch.server.ingestion.IngestionPluginUtils; import com.yelp.nrtsearch.server.logging.HitsLoggerCreator; import com.yelp.nrtsearch.server.modules.NrtsearchModule; import com.yelp.nrtsearch.server.monitoring.BootstrapMetrics; @@ -95,6 +96,7 @@ import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector; import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector.RejectionCounterWrapper; +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; import com.yelp.nrtsearch.server.plugins.Plugin; import com.yelp.nrtsearch.server.plugins.PluginsService; import com.yelp.nrtsearch.server.remote.RemoteBackend; @@ -390,6 +392,7 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase { this.globalState = GlobalState.createState(configuration, remoteBackend); // Initialize handlers + initIngestionPlugin(globalState, plugins); addDocumentHandler = new AddDocumentHandler(globalState); backupWarmingQueriesHandler = new BackupWarmingQueriesHandler(globalState); commitHandler = new CommitHandler(globalState); @@ -428,6 +431,15 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase { updateFieldsHandler = new UpdateFieldsHandler(globalState); } + private void initIngestionPlugin(GlobalState globalState, List plugins) + throws IOException { + for (Plugin plugin : plugins) { + if (plugin instanceof IngestionPlugin ingestionPlugin) { + IngestionPluginUtils.initializeAndStart(ingestionPlugin, globalState); + } + } + } + @VisibleForTesting static void initQueryCache(NrtsearchConfig configuration) { QueryCacheConfig cacheConfig = configuration.getQueryCacheConfig(); diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java new file mode 100644 index 000000000..069ed4d2b --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestor.java @@ -0,0 +1,102 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.handler.AddDocumentHandler; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.List; + +/** + * Abstract base class for ingestion implementations. Provides common ingestion utilities like + * addDocuments and commit. Plugin-specific ingestion logic should extend this class and implement + * start/stop. + */ +public abstract class AbstractIngestor implements Ingestor { + protected final NrtsearchConfig config; + protected GlobalState globalState; + + public AbstractIngestor(NrtsearchConfig config) { + this.config = config; + } + + /** + * Called by the framework to initialize the ingestor with global state. Must be called before + * addDocuments or commit. + */ + @Override + public void initialize(GlobalState globalState) { + this.globalState = globalState; + } + + /** + * Add documents to the index. + * + * @param addDocRequests list of documents to add + * @param indexName target index + * @return sequence number of the indexing operation + * @throws Exception if indexing fails + */ + @Override + public long addDocuments(List addDocRequests, String indexName) + throws Exception { + verifyInitialized(indexName); + return new AddDocumentHandler.DocumentIndexer(globalState, addDocRequests, indexName) + .runIndexingJob(); + } + + /** + * Commit changes to the index. + * + * @param indexName target index + * @throws IOException if commit fails + */ + @Override + public void commit(String indexName) throws IOException { + verifyInitialized(indexName); + IndexState indexState = globalState.getIndexOrThrow(indexName); + indexState.commit(); + } + + private void verifyInitialized(String indexName) throws IOException { + if (globalState == null) { + throw new IllegalStateException("Ingestor not initialized with GlobalState"); + } + IndexState indexState = globalState.getIndexOrThrow(indexName); + if (indexState == null) { + throw new IllegalStateException("Index not found: " + indexName); + } + } + + /** + * Start ingestion logic. Must be implemented by plugin-specific subclass. + * + * @throws IOException if startup fails + */ + @Override + public abstract void start() throws IOException; + + /** + * Stop ingestion logic. Must be implemented by plugin-specific subclass. + * + * @throws IOException if shutdown fails + */ + @Override + public abstract void stop() throws IOException; +} diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java new file mode 100644 index 000000000..94c4539dc --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +public class IngestionPluginUtils { + public static void initializeAndStart(IngestionPlugin plugin, GlobalState globalState) + throws IOException { + Ingestor ingestor = plugin.getIngestor(); + if (ingestor instanceof AbstractIngestor abstractIngestor) { + abstractIngestor.initialize(globalState); + } + + ExecutorService executor = plugin.getIngestionExecutor(); + executor.submit( + () -> { + try { + ingestor.start(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } +} diff --git a/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java b/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java new file mode 100644 index 000000000..1b5f9a786 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/ingestion/Ingestor.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.List; + +/** + * Interface for ingestion logic used by plugins. + * + *

This interface defines the lifecycle and operations for ingesting documents into an index. + * Plugin implementations can use this interface to encapsulate source-specific ingestion logic + * (e.g., reading from Kafka, S3, etc.) while leveraging shared indexing utilities. + * + *

Implementations are expected to: + * + *

    + *
  • Initialize with {@link GlobalState} before starting + *
  • Start ingestion in a background thread (if needed) + *
  • Use {@link #addDocuments(List, String)} and {@link #commit(String)} to index data + *
  • Clean up resources in {@link #stop()} + *
+ */ +public interface Ingestor { + + /** + * Initialize the ingestor with the global server state. + * + *

This method is called once by the framework before ingestion starts. Implementations should + * store the global state for later use (e.g., to access index state). + * + * @param globalState the global server state + */ + void initialize(GlobalState globalState); + + /** + * Start the ingestion process. + * + *

This method should contain the logic to begin reading from the ingestion source (e.g., + * Kafka, file system, etc.). It may block or spawn background threads depending on the + * implementation. + * + * @throws IOException if ingestion startup fails + */ + void start() throws IOException; + + /** + * Stop the ingestion process and clean up resources. + * + *

This method is called during plugin shutdown. Implementations should stop any background + * threads, close connections, and release resources. + * + * @throws IOException if ingestion shutdown fails + */ + void stop() throws IOException; + + /** + * Add a batch of documents to the specified index. + * + *

This method is typically called from within the ingestion loop to index new data. It returns + * the Lucene sequence number of the indexing operation. + * + * @param addDocRequests list of document requests to add + * @param indexName name of the target index + * @return sequence number of the indexing operation + * @throws Exception if indexing fails + */ + long addDocuments(List addDocRequests, String indexName) throws Exception; + + /** + * Commit any pending changes to the specified index. + * + *

This method should be called periodically or after a batch of documents is added to ensure + * durability and visibility of the changes. + * + * @param indexName name of the target index + * @throws IOException if commit fails + */ + void commit(String indexName) throws IOException; +} diff --git a/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java new file mode 100644 index 000000000..56babd6b5 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/plugins/IngestionPlugin.java @@ -0,0 +1,30 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.plugins; + +import com.yelp.nrtsearch.server.ingestion.Ingestor; +import java.util.concurrent.ExecutorService; + +public interface IngestionPlugin { + /** Interface for ingestion logic used by plugins */ + Ingestor getIngestor(); + + /** + * Provide an executor service for running ingestion. Plugin is responsible for managing its + * lifecycle. + */ + ExecutorService getIngestionExecutor(); +} diff --git a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java index b89333fbe..994bd1213 100644 --- a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java @@ -23,6 +23,7 @@ import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; import com.yelp.nrtsearch.server.index.DirectoryFactory; import java.io.ByteArrayInputStream; +import java.util.Map; import org.apache.lucene.search.suggest.document.CompletionPostingsFormat.FSTLoadMode; import org.junit.Test; @@ -246,4 +247,48 @@ public void testRequireIdField_set() { NrtsearchConfig luceneConfig = getForConfig(config); assertTrue(luceneConfig.getRequireIdField()); } + + @Test + public void testGetIngestionPluginConfigs() { + String config = + String.join( + "\n", + "nodeName: \"server_foo\"", + "plugins:", + " - kafka-plugin", + " - s3-plugin", + "pluginConfigs:", + " ingestion:", + " kafka:", + " topic: \"my-topic\"", + " autoCommitEnabled: false", + " s3:", + " bucket: \"my-bucket\""); + + NrtsearchConfig luceneConfig = getForConfig(config); + Map> ingestionConfigs = luceneConfig.getIngestionPluginConfigs(); + + assertEquals("my-topic", ingestionConfigs.get("kafka").get("topic")); + assertEquals("my-bucket", ingestionConfigs.get("s3").get("bucket")); + } + + @Test + public void testMissingIngestionConfigReturnsEmptyMap() { + String config = String.join("\n", "nodeName: \"server_foo\"", "plugins:", " - kafka-plugin"); + + NrtsearchConfig luceneConfig = getForConfig(config); + Map> ingestionConfigs = luceneConfig.getIngestionPluginConfigs(); + + assertTrue(ingestionConfigs.isEmpty()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidIngestionConfigThrows() { + String config = + String.join( + "\n", "nodeName: \"server_foo\"", "pluginConfigs:", " ingestion:", " - kafka"); + + NrtsearchConfig luceneConfig = getForConfig(config); + luceneConfig.getIngestionPluginConfigs(); // should throw + } } diff --git a/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java b/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java new file mode 100644 index 000000000..d33051883 --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/ingestion/AbstractIngestorTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.yelp.nrtsearch.server.config.NrtsearchConfig; +import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class AbstractIngestorTest { + + private TestIngestor ingestor; + private GlobalState mockGlobalState; + private IndexState mockIndexState; + + @Before + public void setUp() { + NrtsearchConfig config = mock(NrtsearchConfig.class); + ingestor = new TestIngestor(config); + + mockGlobalState = mock(GlobalState.class); + mockIndexState = mock(IndexState.class); + } + + @Test + public void testInitialize() throws IOException { + when(mockGlobalState.getIndexOrThrow("test_index")).thenReturn(mockIndexState); + ingestor.initialize(mockGlobalState); + + // Should not throw + ingestor.commit("test_index"); + verify(mockGlobalState, atLeastOnce()).getIndexOrThrow("test_index"); + verify(mockIndexState).commit(); + } + + @Test(expected = IllegalStateException.class) + public void testCommitWithoutInitialize() throws IOException { + ingestor.commit("test_index"); + } + + @Test(expected = IllegalStateException.class) + public void testAddDocumentsWithoutInitialize() throws Exception { + List docs = Collections.emptyList(); + ingestor.addDocuments(docs, "test_index"); + } + + @Test + public void testStartAndStopBehavior() throws IOException { + FlagIngestor ingestor = new FlagIngestor(mock(NrtsearchConfig.class)); + assertFalse(ingestor.started); + assertFalse(ingestor.stopped); + + ingestor.start(); + assertTrue(ingestor.started); + + ingestor.stop(); + assertTrue(ingestor.stopped); + } + + private static class FlagIngestor extends AbstractIngestor { + boolean started = false; + boolean stopped = false; + + public FlagIngestor(NrtsearchConfig config) { + super(config); + } + + @Override + public void start() throws IOException { + started = true; + } + + @Override + public void stop() throws IOException { + stopped = true; + } + } + + // Minimal concrete subclass for testing + private static class TestIngestor extends AbstractIngestor { + public TestIngestor(NrtsearchConfig config) { + super(config); + } + + @Override + public void start() throws IOException { + // no-op + } + + @Override + public void stop() throws IOException { + // no-op + } + } +} diff --git a/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java b/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java new file mode 100644 index 000000000..7e11b30df --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/server/ingestion/IngestionPluginUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2025 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.ingestion; + +import static org.mockito.Mockito.*; + +import com.yelp.nrtsearch.server.plugins.IngestionPlugin; +import com.yelp.nrtsearch.server.state.GlobalState; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import org.junit.Before; +import org.junit.Test; + +public class IngestionPluginUtilsTest { + + private IngestionPlugin mockPlugin; + private AbstractIngestor mockIngestor; + private ExecutorService mockExecutor; + private GlobalState mockGlobalState; + + @Before + public void setUp() { + mockPlugin = mock(IngestionPlugin.class); + mockIngestor = mock(AbstractIngestor.class); + mockExecutor = mock(ExecutorService.class); + mockGlobalState = mock(GlobalState.class); + + when(mockPlugin.getIngestor()).thenReturn(mockIngestor); + when(mockPlugin.getIngestionExecutor()).thenReturn(mockExecutor); + } + + @Test + public void testInitializeAndStart_submitsStartTask() throws IOException { + // Capture the submitted Runnable + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); // simulate executor running the task + return null; + }) + .when(mockExecutor) + .submit(any(Runnable.class)); + + IngestionPluginUtils.initializeAndStart(mockPlugin, mockGlobalState); + + // Verify that initialize and start were called + verify(mockIngestor).initialize(mockGlobalState); + verify(mockIngestor).start(); + verify(mockExecutor).submit(any(Runnable.class)); + } + + @Test + public void testInitializeAndStart_nonAbstractIngestor() throws IOException { + Ingestor mockBasicIngestor = mock(Ingestor.class); + when(mockPlugin.getIngestor()).thenReturn(mockBasicIngestor); + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return null; + }) + .when(mockExecutor) + .submit(any(Runnable.class)); + + IngestionPluginUtils.initializeAndStart(mockPlugin, mockGlobalState); + + verify(mockBasicIngestor, never()).initialize(any()); + verify(mockBasicIngestor).start(); + } +}