Skip to content

Commit f21108c

Browse files
authored
refactor(indexing): add a queue for indexing operations (#237)
1 parent afe5d27 commit f21108c

20 files changed

+1248
-154
lines changed

src/kodit/app.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from kodit._version import version
1111
from kodit.application.services.auto_indexing_service import AutoIndexingService
12+
from kodit.application.services.indexing_worker_service import IndexingWorkerService
1213
from kodit.application.services.sync_scheduler import SyncSchedulerService
1314
from kodit.config import AppContext
1415
from kodit.infrastructure.api.v1.routers import indexes_router, search_router
@@ -28,9 +29,16 @@ async def app_lifespan(_: FastAPI) -> AsyncIterator[AppLifespanState]:
2829

2930
# App context has already been configured by the CLI.
3031
app_context = AppContext()
32+
db = await app_context.get_db()
33+
34+
# Start the queue worker service
35+
_indexing_worker_service = IndexingWorkerService(
36+
app_context=app_context,
37+
session_factory=db.session_factory,
38+
)
39+
await _indexing_worker_service.start()
3140

3241
# Start auto-indexing service
33-
db = await app_context.get_db()
3442
_auto_indexing_service = AutoIndexingService(
3543
app_context=app_context,
3644
session_factory=db.session_factory,
@@ -40,7 +48,6 @@ async def app_lifespan(_: FastAPI) -> AsyncIterator[AppLifespanState]:
4048
# Start sync scheduler service
4149
if app_context.periodic_sync.enabled:
4250
_sync_scheduler_service = SyncSchedulerService(
43-
app_context=app_context,
4451
session_factory=db.session_factory,
4552
)
4653
_sync_scheduler_service.start_periodic_sync(
@@ -54,6 +61,8 @@ async def app_lifespan(_: FastAPI) -> AsyncIterator[AppLifespanState]:
5461
await _sync_scheduler_service.stop_periodic_sync()
5562
if _auto_indexing_service:
5663
await _auto_indexing_service.stop()
64+
if _indexing_worker_service:
65+
await _indexing_worker_service.stop()
5766

5867

5968
# See https://gofastmcp.com/integrations/fastapi#mounting-an-mcp-server

src/kodit/application/services/auto_indexing_service.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Service for automatically indexing configured sources."""
22

33
import asyncio
4+
import warnings
45
from collections.abc import Callable
56
from contextlib import suppress
67

@@ -10,8 +11,10 @@
1011
from kodit.application.factories.code_indexing_factory import (
1112
create_code_indexing_application_service,
1213
)
14+
from kodit.application.services.queue_service import QueueService
1315
from kodit.config import AppContext
14-
from kodit.infrastructure.ui.progress import create_log_progress_callback
16+
from kodit.domain.entities import Task
17+
from kodit.domain.value_objects import QueuePriority
1518

1619

1720
class AutoIndexingService:
@@ -37,13 +40,20 @@ async def start_background_indexing(self) -> None:
3740
self.log.info("Auto-indexing is disabled (no sources configured)")
3841
return
3942

43+
warnings.warn(
44+
"Auto-indexing is deprecated and will be removed in a future version, please use the API to index sources.", # noqa: E501
45+
DeprecationWarning,
46+
stacklevel=2,
47+
)
48+
4049
auto_sources = [source.uri for source in self.app_context.auto_indexing.sources]
4150
self.log.info("Starting background indexing", num_sources=len(auto_sources))
4251
self._indexing_task = asyncio.create_task(self._index_sources(auto_sources))
4352

4453
async def _index_sources(self, sources: list[str]) -> None:
4554
"""Index all configured sources in the background."""
4655
async with self.session_factory() as session:
56+
queue_service = QueueService(session=session)
4757
service = create_code_indexing_application_service(
4858
app_context=self.app_context,
4959
session=session,
@@ -56,18 +66,17 @@ async def _index_sources(self, sources: list[str]) -> None:
5666
self.log.info("Index already exists, skipping", source=source)
5767
continue
5868

59-
self.log.info("Auto-indexing source", source=source)
69+
self.log.info("Adding auto-indexing task to queue", source=source)
6070

6171
# Create index
6272
index = await service.create_index_from_uri(source)
6373

64-
# Run indexing (without progress callback for background mode)
65-
await service.run_index(
66-
index, progress_callback=create_log_progress_callback()
74+
await queue_service.enqueue_task(
75+
Task.create_index_update_task(
76+
index.id, QueuePriority.BACKGROUND
77+
)
6778
)
6879

69-
self.log.info("Successfully auto-indexed source", source=source)
70-
7180
except Exception as exc:
7281
self.log.exception(
7382
"Failed to auto-index source", source=source, error=str(exc)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""Service for processing indexing tasks from the database queue."""
2+
3+
import asyncio
4+
from collections.abc import Callable
5+
from concurrent.futures import ThreadPoolExecutor
6+
from contextlib import suppress
7+
from datetime import UTC, datetime
8+
9+
import structlog
10+
from sqlalchemy.ext.asyncio import AsyncSession
11+
12+
from kodit.application.factories.code_indexing_factory import (
13+
create_code_indexing_application_service,
14+
)
15+
from kodit.config import AppContext
16+
from kodit.domain.entities import Task
17+
from kodit.domain.value_objects import TaskType
18+
from kodit.infrastructure.sqlalchemy.task_repository import SqlAlchemyTaskRepository
19+
20+
21+
class IndexingWorkerService:
22+
"""Service for processing indexing tasks from the database queue.
23+
24+
This worker polls the database for pending tasks and processes the heavy
25+
indexing work in separate threads to prevent blocking API responsiveness.
26+
"""
27+
28+
def __init__(
29+
self,
30+
app_context: AppContext,
31+
session_factory: Callable[[], AsyncSession],
32+
) -> None:
33+
"""Initialize the indexing worker service."""
34+
self.app_context = app_context
35+
self.session_factory = session_factory
36+
self._worker_task: asyncio.Task | None = None
37+
self._shutdown_event = asyncio.Event()
38+
self._executor = ThreadPoolExecutor(
39+
max_workers=1, thread_name_prefix="indexing-worker"
40+
)
41+
self.log = structlog.get_logger(__name__)
42+
43+
async def start(self) -> None:
44+
"""Start the worker to process the queue."""
45+
self._running = True
46+
47+
# Start single worker task
48+
self._worker_task = asyncio.create_task(self._worker_loop())
49+
50+
self.log.info(
51+
"Indexing worker started",
52+
)
53+
54+
async def stop(self) -> None:
55+
"""Stop the worker gracefully."""
56+
self.log.info("Stopping indexing worker")
57+
self._shutdown_event.set()
58+
59+
if self._worker_task and not self._worker_task.done():
60+
self._worker_task.cancel()
61+
with suppress(asyncio.CancelledError):
62+
await self._worker_task
63+
64+
# Shutdown the thread pool executor
65+
self._executor.shutdown(wait=True)
66+
67+
self.log.info("Indexing worker stopped")
68+
69+
async def _worker_loop(self) -> None:
70+
self.log.debug("Worker loop started")
71+
72+
while not self._shutdown_event.is_set():
73+
try:
74+
async with self.session_factory() as session:
75+
repo = SqlAlchemyTaskRepository(session)
76+
task = await repo.take()
77+
await session.commit()
78+
79+
# If there's a task, process it in a new thread
80+
if task:
81+
await asyncio.get_event_loop().run_in_executor(
82+
self._executor, self._process_task, task
83+
)
84+
continue
85+
86+
# If no task, sleep for a bit
87+
await asyncio.sleep(1)
88+
continue
89+
90+
except Exception as e:
91+
self.log.exception(
92+
"Error processing task",
93+
error=str(e),
94+
)
95+
continue
96+
97+
self.log.info("Worker loop stopped")
98+
99+
def _process_task(self, task: Task) -> None:
100+
"""Process a single task."""
101+
self.log.info(
102+
"Processing task",
103+
task_id=task.id,
104+
task_type=task.type.value,
105+
)
106+
107+
start_time = datetime.now(UTC)
108+
109+
# Create a new event loop for this thread
110+
loop = asyncio.new_event_loop()
111+
asyncio.set_event_loop(loop)
112+
113+
try:
114+
# Process based on task type (currently only INDEX_UPDATE is supported)
115+
if task.type is TaskType.INDEX_UPDATE:
116+
loop.run_until_complete(self._process_index_update(task))
117+
else:
118+
self.log.warning(
119+
"Unknown task type",
120+
task_id=task.id,
121+
task_type=task.type,
122+
)
123+
return
124+
finally:
125+
loop.close()
126+
127+
duration = (datetime.now(UTC) - start_time).total_seconds()
128+
self.log.info(
129+
"Task completed successfully",
130+
task_id=task.id,
131+
duration_seconds=duration,
132+
)
133+
134+
async def _process_index_update(self, task: Task) -> None:
135+
"""Process index update/sync task."""
136+
index_id = task.payload.get("index_id")
137+
if not index_id:
138+
raise ValueError("Missing index_id in task payload")
139+
140+
# Create a fresh database connection for this thread's event loop
141+
db = await self.app_context.new_db(run_migrations=True)
142+
try:
143+
async with db.session_factory() as session:
144+
service = create_code_indexing_application_service(
145+
app_context=self.app_context,
146+
session=session,
147+
)
148+
index = await service.index_repository.get(index_id)
149+
if not index:
150+
raise ValueError(f"Index not found: {index_id}")
151+
152+
await service.run_index(index)
153+
finally:
154+
await db.close()
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Queue service for managing tasks."""
2+
3+
import structlog
4+
from sqlalchemy.ext.asyncio import AsyncSession
5+
6+
from kodit.domain.entities import Task
7+
from kodit.domain.value_objects import TaskType
8+
from kodit.infrastructure.sqlalchemy.task_repository import SqlAlchemyTaskRepository
9+
10+
11+
class QueueService:
12+
"""Service for queue operations using database persistence.
13+
14+
This service provides the main interface for enqueuing and managing tasks.
15+
It uses the existing Task entity in the database with a flexible JSON payload.
16+
"""
17+
18+
def __init__(
19+
self,
20+
session: AsyncSession,
21+
) -> None:
22+
"""Initialize the queue service."""
23+
self.session = session
24+
self.log = structlog.get_logger(__name__)
25+
26+
async def enqueue_task(self, task: Task) -> None:
27+
"""Queue a task in the database."""
28+
repo = SqlAlchemyTaskRepository(self.session)
29+
30+
# See if task already exists
31+
db_task = await repo.get(task.id)
32+
if db_task:
33+
# Task already exists, update priority
34+
db_task.priority = task.priority
35+
await repo.update(db_task)
36+
self.log.info("Task updated", task_id=task.id, task_type=task.type)
37+
else:
38+
# Otherwise, add task
39+
await repo.add(task)
40+
self.log.info(
41+
"Task queued",
42+
task_id=task.id,
43+
task_type=task.type,
44+
payload=task.payload,
45+
)
46+
47+
await self.session.commit()
48+
49+
async def list_tasks(self, task_type: TaskType | None = None) -> list[Task]:
50+
"""List all tasks in the queue."""
51+
repo = SqlAlchemyTaskRepository(self.session)
52+
return await repo.list(task_type)

0 commit comments

Comments
 (0)