Skip to content

Commit 9577334

Browse files
authored
Add option to use virtual threads in place of thread pools (#865)
1 parent a43fd70 commit 9577334

File tree

8 files changed

+570
-28
lines changed

8 files changed

+570
-28
lines changed

docs/server_configuration.rst

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Example server configuration
2222
search:
2323
maxThreads: 4
2424
index:
25-
maxThreads: 18
25+
useVirtualThreads: true
2626
botoCfgPath: "/user/app/boto.cfg"
2727
bucketName: "nrtsearch-bucket"
2828
serviceName: "nrtsearch-service-test"
@@ -141,6 +141,11 @@ Example server configuration
141141
- Name prefix for threads created by searcher threadpool executor
142142
- LuceneSearchExecutor
143143

144+
* - search.useVirtualThreads
145+
- bool
146+
- Whether to use virtual threads instead of a traditional thread pool for search operations
147+
- false
148+
144149
* - index.maxThreads
145150
- int
146151
- Size of indexing threadpool executor
@@ -156,6 +161,11 @@ Example server configuration
156161
- Name prefix for threads created by indexing threadpool executor
157162
- LuceneIndexingExecutor
158163

164+
* - index.useVirtualThreads
165+
- bool
166+
- Whether to use virtual threads instead of a traditional thread pool for indexing operations
167+
- false
168+
159169
* - server.maxThreads
160170
- int
161171
- Size of NrtsearchServer threadpool executor
@@ -171,6 +181,11 @@ Example server configuration
171181
- Name prefix for threads created by NrtsearchServer threadpool executor
172182
- GrpcServerExecutor
173183

184+
* - server.useVirtualThreads
185+
- bool
186+
- Whether to use virtual threads instead of a traditional thread pool for server operations
187+
- false
188+
174189
* - replicationserver.maxThreads
175190
- int
176191
- Size of ReplicationServer threadpool executor
@@ -186,6 +201,11 @@ Example server configuration
186201
- Name prefix for threads created by ReplicationServer threadpool executor
187202
- GrpcReplicationServerExecutor
188203

204+
* - replicationserver.useVirtualThreads
205+
- bool
206+
- Whether to use virtual threads instead of a traditional thread pool for replication server operations
207+
- false
208+
189209
* - fetch.maxThreads
190210
- int
191211
- Size of fetch threadpool executor
@@ -201,6 +221,11 @@ Example server configuration
201221
- Name prefix for threads created by fetch threadpool executor
202222
- LuceneFetchExecutor
203223

224+
* - fetch.useVirtualThreads
225+
- bool
226+
- Whether to use virtual threads instead of a traditional thread pool for fetch operations
227+
- false
228+
204229
* - grpc.maxThreads
205230
- int
206231
- Size of gRPC threadpool executor
@@ -216,6 +241,11 @@ Example server configuration
216241
- Name prefix for threads created by gRPC threadpool executor
217242
- GrpcExecutor
218243

244+
* - grpc.useVirtualThreads
245+
- bool
246+
- Whether to use virtual threads instead of a traditional thread pool for gRPC operations
247+
- false
248+
219249
* - metrics.maxThreads
220250
- int
221251
- Size of metrics threadpool executor
@@ -231,6 +261,11 @@ Example server configuration
231261
- Name prefix for threads created by metrics threadpool executor
232262
- MetricsExecutor
233263

264+
* - metrics.useVirtualThreads
265+
- bool
266+
- Whether to use virtual threads instead of a traditional thread pool for metrics operations
267+
- false
268+
234269
* - vectormerge.maxThreads
235270
- int
236271
- Size of vector merge threadpool executor
@@ -246,6 +281,11 @@ Example server configuration
246281
- Name prefix for threads created by vector merge threadpool executor
247282
- VectorMergeExecutor
248283

284+
* - vectormerge.useVirtualThreads
285+
- bool
286+
- Whether to use virtual threads instead of a traditional thread pool for vector merge operations
287+
- false
288+
249289
* - commit.maxThreads
250290
- int
251291
- Size of commit threadpool executor
@@ -261,6 +301,11 @@ Example server configuration
261301
- Name prefix for threads created by commit threadpool executor
262302
- CommitExecutor
263303

304+
* - commit.useVirtualThreads
305+
- bool
306+
- Whether to use virtual threads instead of a traditional thread pool for commit operations
307+
- false
308+
264309
.. list-table:: `Alternative Max Threads Config <https://github.com/Yelp/nrtsearch/blob/master/src/main/java/com/yelp/nrtsearch/server/config/ThreadPoolConfiguration.java>`_ (``threadPoolConfiguration.*.maxThreads.*``)
265310
:widths: 25 10 50 25
266311
:header-rows: 1

src/main/java/com/yelp/nrtsearch/server/concurrent/ExecutorFactory.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.BlockingQueue;
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
2425
import java.util.concurrent.LinkedBlockingQueue;
2526
import java.util.concurrent.ThreadPoolExecutor;
2627
import java.util.concurrent.TimeUnit;
@@ -97,21 +98,30 @@ public ExecutorService getExecutor(ExecutorType executorType) {
9798
private ExecutorService createExecutor(ExecutorType executorType) {
9899
ThreadPoolConfiguration.ThreadPoolSettings threadPoolSettings =
99100
threadPoolConfiguration.getThreadPoolSettings(executorType);
100-
logger.info(
101-
"Creating {} of size {}",
102-
threadPoolSettings.threadNamePrefix(),
103-
threadPoolSettings.maxThreads());
104-
BlockingQueue<Runnable> queue =
105-
new LinkedBlockingQueue<>(threadPoolSettings.maxBufferedItems());
106-
ThreadPoolExecutor threadPoolExecutor =
107-
new ThreadPoolExecutor(
108-
threadPoolSettings.maxThreads(),
109-
threadPoolSettings.maxThreads(),
110-
0L,
111-
TimeUnit.SECONDS,
112-
queue,
113-
new NamedThreadFactory(threadPoolSettings.threadNamePrefix()));
114-
ThreadPoolCollector.addPool(executorType.name(), threadPoolExecutor);
115-
return threadPoolExecutor;
101+
if (threadPoolSettings.useVirtualThreads()) {
102+
logger.info("Creating virtual thread executor for {}", threadPoolSettings.threadNamePrefix());
103+
ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
104+
ExecutorServiceStatsWrapper executorServiceStatsWrapper =
105+
new ExecutorServiceStatsWrapper(virtualThreadExecutor);
106+
ThreadPoolCollector.addVirtualPool(executorType.name(), executorServiceStatsWrapper);
107+
return executorServiceStatsWrapper;
108+
} else {
109+
logger.info(
110+
"Creating {} of size {}",
111+
threadPoolSettings.threadNamePrefix(),
112+
threadPoolSettings.maxThreads());
113+
BlockingQueue<Runnable> queue =
114+
new LinkedBlockingQueue<>(threadPoolSettings.maxBufferedItems());
115+
ThreadPoolExecutor threadPoolExecutor =
116+
new ThreadPoolExecutor(
117+
threadPoolSettings.maxThreads(),
118+
threadPoolSettings.maxThreads(),
119+
0L,
120+
TimeUnit.SECONDS,
121+
queue,
122+
new NamedThreadFactory(threadPoolSettings.threadNamePrefix()));
123+
ThreadPoolCollector.addPool(executorType.name(), threadPoolExecutor);
124+
return threadPoolExecutor;
125+
}
116126
}
117127
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2025 Yelp Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.yelp.nrtsearch.server.concurrent;
17+
18+
import java.util.Collection;
19+
import java.util.List;
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Future;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
/**
29+
* {@link ExecutorService} implementation that wraps another {@link ExecutorService} and tracks
30+
* stats. Currently, only tracks the total number of tasks submitted to the executor.
31+
*/
32+
public class ExecutorServiceStatsWrapper implements ExecutorService {
33+
private final ExecutorService delegate;
34+
private final AtomicLong totalTasks = new AtomicLong(0);
35+
36+
public ExecutorServiceStatsWrapper(ExecutorService delegate) {
37+
this.delegate = delegate;
38+
}
39+
40+
/**
41+
* Returns the total number of tasks submitted to this executor.
42+
*
43+
* @return total number of tasks
44+
*/
45+
public long getTotalTasks() {
46+
return totalTasks.get();
47+
}
48+
49+
@Override
50+
public void shutdown() {
51+
delegate.shutdown();
52+
}
53+
54+
@Override
55+
public List<Runnable> shutdownNow() {
56+
return delegate.shutdownNow();
57+
}
58+
59+
@Override
60+
public boolean isShutdown() {
61+
return delegate.isShutdown();
62+
}
63+
64+
@Override
65+
public boolean isTerminated() {
66+
return delegate.isTerminated();
67+
}
68+
69+
@Override
70+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
71+
return delegate.awaitTermination(timeout, unit);
72+
}
73+
74+
@Override
75+
public <T> Future<T> submit(Callable<T> task) {
76+
totalTasks.incrementAndGet();
77+
return delegate.submit(task);
78+
}
79+
80+
@Override
81+
public <T> Future<T> submit(Runnable task, T result) {
82+
totalTasks.incrementAndGet();
83+
return delegate.submit(task, result);
84+
}
85+
86+
@Override
87+
public Future<?> submit(Runnable task) {
88+
totalTasks.incrementAndGet();
89+
return delegate.submit(task);
90+
}
91+
92+
@Override
93+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
94+
throws InterruptedException {
95+
totalTasks.addAndGet(tasks.size());
96+
return delegate.invokeAll(tasks);
97+
}
98+
99+
@Override
100+
public <T> List<Future<T>> invokeAll(
101+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
102+
throws InterruptedException {
103+
totalTasks.addAndGet(tasks.size());
104+
return delegate.invokeAll(tasks, timeout, unit);
105+
}
106+
107+
@Override
108+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
109+
throws InterruptedException, ExecutionException {
110+
totalTasks.addAndGet(tasks.size());
111+
return delegate.invokeAny(tasks);
112+
}
113+
114+
@Override
115+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
116+
throws InterruptedException, ExecutionException, TimeoutException {
117+
totalTasks.addAndGet(tasks.size());
118+
return delegate.invokeAny(tasks, timeout, unit);
119+
}
120+
121+
@Override
122+
public void execute(Runnable command) {
123+
totalTasks.incrementAndGet();
124+
delegate.execute(command);
125+
}
126+
}

src/main/java/com/yelp/nrtsearch/server/config/ThreadPoolConfiguration.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,47 +63,62 @@ public class ThreadPoolConfiguration {
6363
* @param maxThreads max number of threads
6464
* @param maxBufferedItems max number of buffered items
6565
* @param threadNamePrefix prefix for thread names
66+
* @param useVirtualThreads whether to use virtual threads, instead of a thread pool
6667
*/
67-
public record ThreadPoolSettings(int maxThreads, int maxBufferedItems, String threadNamePrefix) {}
68+
public record ThreadPoolSettings(
69+
int maxThreads, int maxBufferedItems, String threadNamePrefix, boolean useVirtualThreads) {}
6870

6971
private static final Map<ExecutorFactory.ExecutorType, ThreadPoolSettings>
7072
defaultThreadPoolSettings =
7173
Map.of(
7274
ExecutorFactory.ExecutorType.SEARCH,
7375
new ThreadPoolSettings(
74-
DEFAULT_SEARCHING_THREADS, DEFAULT_SEARCH_BUFFERED_ITEMS, "LuceneSearchExecutor"),
76+
DEFAULT_SEARCHING_THREADS,
77+
DEFAULT_SEARCH_BUFFERED_ITEMS,
78+
"LuceneSearchExecutor",
79+
false),
7580
ExecutorFactory.ExecutorType.INDEX,
7681
new ThreadPoolSettings(
7782
DEFAULT_INDEXING_THREADS,
7883
DEFAULT_INDEXING_BUFFERED_ITEMS,
79-
"LuceneIndexingExecutor"),
84+
"LuceneIndexingExecutor",
85+
false),
8086
ExecutorFactory.ExecutorType.SERVER,
8187
new ThreadPoolSettings(
8288
DEFAULT_GRPC_SERVER_THREADS,
8389
DEFAULT_GRPC_SERVER_BUFFERED_ITEMS,
84-
"GrpcServerExecutor"),
90+
"GrpcServerExecutor",
91+
false),
8592
ExecutorFactory.ExecutorType.REPLICATIONSERVER,
8693
new ThreadPoolSettings(
8794
DEFAULT_GRPC_REPLICATIONSERVER_THREADS,
8895
DEFAULT_GRPC_REPLICATIONSERVER_BUFFERED_ITEMS,
89-
"GrpcReplicationServerExecutor"),
96+
"GrpcReplicationServerExecutor",
97+
false),
9098
ExecutorFactory.ExecutorType.FETCH,
9199
new ThreadPoolSettings(
92-
DEFAULT_FETCH_THREADS, DEFAULT_FETCH_BUFFERED_ITEMS, "LuceneFetchExecutor"),
100+
DEFAULT_FETCH_THREADS,
101+
DEFAULT_FETCH_BUFFERED_ITEMS,
102+
"LuceneFetchExecutor",
103+
false),
93104
ExecutorFactory.ExecutorType.GRPC,
94105
new ThreadPoolSettings(
95-
DEFAULT_GRPC_THREADS, DEFAULT_GRPC_BUFFERED_ITEMS, "GrpcExecutor"),
106+
DEFAULT_GRPC_THREADS, DEFAULT_GRPC_BUFFERED_ITEMS, "GrpcExecutor", false),
96107
ExecutorFactory.ExecutorType.METRICS,
97108
new ThreadPoolSettings(
98-
DEFAULT_METRICS_THREADS, DEFAULT_METRICS_BUFFERED_ITEMS, "MetricsExecutor"),
109+
DEFAULT_METRICS_THREADS,
110+
DEFAULT_METRICS_BUFFERED_ITEMS,
111+
"MetricsExecutor",
112+
false),
99113
ExecutorFactory.ExecutorType.VECTORMERGE,
100114
new ThreadPoolSettings(
101115
DEFAULT_VECTOR_MERGE_THREADS,
102116
DEFAULT_VECTOR_MERGE_BUFFERED_ITEMS,
103-
"VectorMergeExecutor"),
117+
"VectorMergeExecutor",
118+
false),
104119
ExecutorFactory.ExecutorType.COMMIT,
105120
new ThreadPoolSettings(
106-
DEFAULT_COMMIT_THREADS, DEFAULT_COMMIT_BUFFERED_ITEMS, "CommitExecutor"));
121+
DEFAULT_COMMIT_THREADS, DEFAULT_COMMIT_BUFFERED_ITEMS, "CommitExecutor", false));
107122

108123
private final Map<ExecutorFactory.ExecutorType, ThreadPoolSettings> threadPoolSettings;
109124

@@ -121,8 +136,13 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
121136
String threadNamePrefix =
122137
configReader.getString(
123138
poolConfigPrefix + "threadNamePrefix", defaultSettings.threadNamePrefix());
139+
boolean useVirtualThreads =
140+
configReader.getBoolean(
141+
poolConfigPrefix + "useVirtualThreads", defaultSettings.useVirtualThreads());
124142
threadPoolSettings.put(
125-
executorType, new ThreadPoolSettings(maxThreads, maxBufferedItems, threadNamePrefix));
143+
executorType,
144+
new ThreadPoolSettings(
145+
maxThreads, maxBufferedItems, threadNamePrefix, useVirtualThreads));
126146
}
127147
}
128148

0 commit comments

Comments
 (0)