-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Python: Fix: Implement __deepcopy__ on KernelFunction to handle non-serializable OTEL metrics #12803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… KernelFunction Pydantic model
@microsoft-github-policy-service agree company="Digital Workforce Services Oyj" |
Hi @MMoshtaghi, thanks for the contribution. Please make sure you install the pre-commit hooks locally. You can find the directions here. Specially the part around:
There are pre-commit check failures right now that need to be addressed. Thanks. |
Could we also add tests? |
Hi @moonbox3 , thanks for the review. I fixed the pre-commit check failures. |
Per Tao's comment, which I agree with:
|
1- Ok, I'll create a new test file in 2- I actually followed the DEV_SETUP.md (running |
I think unit tests should be enough to cover the overridden deepcopy method you added. We don't need integration tests. |
Ok, I'll create a unit test to cover deepcopy for KernelFunction. Under which file should I write it? how about test_kernel_function_from_prompt.py ? would this be ok? : def test_kernel_function_from_prompt_deepcopy():
"""Test deepcopying a KernelFunctionFromPrompt."""
function = KernelFunctionFromPrompt(
function_name="test_function",
plugin_name="test_plugin",
prompt="Hello, world!",
description="A test function.",
)
copied_function = deepcopy(function)
assert copied_function is not function
assert copied_function.name == function.name
assert copied_function.plugin_name == function.plugin_name
assert copied_function.description == function.description
assert copied_function.prompt_template.prompt_template_config.template == (
function.prompt_template.prompt_template_config.template
)
assert copied_function.prompt_template is not function.prompt_template |
Hi @MMoshtaghi, I am curious if you ran into the same issue running this sample: https://github.com/microsoft/semantic-kernel/blob/main/python/samples/getting_started_with_agents/multi_agent_orchestration/step4_handoff.py? |
Hi @TaoChenOSU , |
@MMoshtaghi not sure why mypy is failing. There is a unit test failure, though, that I don't see in main. Could be related to the override |
@moonbox3 I know, but as you can see the unit test error occurred after you merged branch 'main' into main. The unit test error is bc of a timeout for getting orchestration result in =================================== FAILURES ===================================
___________________ test_invoke_with_agent_raising_exception ___________________
...
async def test_invoke_with_agent_raising_exception():
"""Test the invoke method of the MagenticOrchestration with a list of messages which raises an error."""
with (
patch.object(
MockChatCompletionService, "get_chat_message_content", new_callable=AsyncMock
) as mock_get_chat_message_content,
patch.object(
StandardMagenticManager, "create_progress_ledger", new_callable=AsyncMock, side_effect=ManagerProgressList
),
):
mock_get_chat_message_content.return_value = ChatMessageContent(role="assistant", content="mock_response")
chat_completion_service = MockChatCompletionService(ai_model_id="test")
prompt_execution_settings = MockPromptExecutionSettings()
manager = StandardMagenticManager(
chat_completion_service=chat_completion_service,
prompt_execution_settings=prompt_execution_settings,
)
agent_a = MockAgentWithException(name="agent_a", description="test agent")
agent_b = MockAgent(name="agent_b", description="test agent")
runtime = InProcessRuntime()
runtime.start()
orchestration = MagenticOrchestration(members=[agent_a, agent_b], manager=manager)
try:
orchestration_result = await orchestration.invoke(task="test_message", runtime=runtime)
with pytest.raises(RuntimeError, match="Mock agent exception"):
> await orchestration_result.get(1.0)
tests/unit/agents/orchestration/test_magentic.py:277:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
semantic_kernel/agents/orchestration/orchestration_base.py:57: in get
await asyncio.wait_for(self.event.wait(), timeout=timeout)
/opt/homebrew/Cellar/[email protected]/3.12.11/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py:519: in wait_for
async with timeouts.timeout(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
...
=========================== short test summary info ============================
FAILED tests/unit/agents/orchestration/test_magentic.py::test_invoke_with_agent_raising_exception - TimeoutError
1 failed, 3675 passed, 5 xfailed, 471 warnings in 88.87s (0:01:28) |
I just fixed the mypy errors nad here I explain the root cause and and the solution: Problem: Solution Overview:
Tests:
![]() |
This is weird because I cannot reproduce locally. |
Hmm ! How about with https://github.com/microsoft/semantic-kernel/blob/main/python/samples/getting_started_with_agents/multi_agent_orchestration/step4b_handoff_streaming_agent_response_callback.py ? |
Still no. I approved to unblock you. |
Thanks again for your contribution, @MMoshtaghi. We appreciate the support. |
Thanks @moonbox3 and @TaoChenOSU for the review. |
Basically you don't necessarily need HandOff Orchestration, just clone a kernel with telemetry through the console. I wrote this as an integration test for telemetry and kernel, but Evan said a unit test would be enough. |
You can comment out the deepcopy method and test this: import logging
import pytest
from opentelemetry._logs import set_logger_provider
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import set_tracer_provider
from semantic_kernel.kernel import Kernel
# Create a resource to represent the service/sample
resource = Resource.create({"service.name": "telemetry-console-customer-service-agent"})
def set_up_logging():
exporter = ConsoleLogExporter()
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
# Add filters to the handler to only process records from semantic_kernel.
handler.addFilter(logging.Filter("semantic_kernel"))
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def set_up_tracing():
exporter = ConsoleSpanExporter()
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporter = ConsoleMetricExporter()
# Initialize a metric provider for the application. This is a factory for creating meters.
meter_provider = MeterProvider(
metric_readers=[PeriodicExportingMetricReader(exporter, export_interval_millis=5000)],
resource=resource,
views=[
# Dropping all instrument names except for those starting with "semantic_kernel"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="semantic_kernel*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
# This must be done before any other telemetry calls, and creating kernels.
set_up_logging()
set_up_tracing()
set_up_metrics()
@pytest.mark.asyncio
async def test_kernel_with_function_clone_with_otel_enabled():
kernel = Kernel()
kernel.add_function(
plugin_name="test_plugin",
function_name="test_function",
prompt="Hello, world!",
)
kernel_clone = kernel.clone()
assert kernel_clone is not None
assert kernel_clone.plugins is not None and len(kernel_clone.plugins) > 0 |
Description
This PR fixes TypeError: cannot pickle '_thread.RLock' object which occurs when kernel.clone() is called while OpenTelemetry metrics are enabled.
TypeError: cannot pickle '_thread.RLock' object
when cloning Kernel with OpenTelemetry enabled #12802Motivation and Context
When using features like
HandoffOrchestration
, the agent'sKernel
is cloned. This process usescopy.deepcopy
, which fails on theKernelFunction
's OpenTelemetryHistogram
fields (invocation_duration_histogram
andstreaming_duration_histogram
). These histogram objects contain thread locks, making them non-serializable.Previous solutions, like the one in PR #9292, used
Field(exclude=True, default_factory=...)
. While that was a necessary step, it was insufficient becausedeepcopy
does not respect theexclude
flag in the same way that Pydantic's serialization does. This bug blocks the use of certain agentic features in any application that has metrics-based observability enabled.Description of the change
This solution introduces a custom deepcopy method to the KernelFunction Pydantic model. This method correctly handles the deep-copying process for KernelFunction instances:
) are correctly duplicated. The histogram fields are explicitly skipped in this step, as they have already been recreated by
model_copy.This approach ensures that the non-serializable fields are re-initialized instead of copied, resolving the TypeError while maintaining the integrity of the cloned object.
How has this been tested?
The fix was validated using the reproduction steps outlined in the associated issue. Running an agent orchestration that triggers kernel.clone() with OpenTelemetry metrics enabled now completes successfully without any errors.
Contribution Checklist