Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 170 additions & 23 deletions src/mistralai/client/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@
)
from mistralai.extra.run.utils import run_requirements
from mistralai.extra.observability.otel import GenAISpanEnum, get_or_create_otel_tracer
from mistralai.extra.exceptions import (
DeferralReason,
DeferredToolCallsException,
DeferredToolCallEntry,
DeferredToolCallResponse,
)
from mistralai.extra.run.deferred import (
_is_deferred_response,
_is_server_deferred,
_process_deferred_responses,
)

logger = logging.getLogger(__name__)
tracing_enabled, tracer = get_or_create_otel_tracer()
Expand All @@ -48,7 +59,11 @@ class Conversations(BaseSDK):
async def run_async(
self,
run_ctx: "RunContext",
inputs: Union[models.ConversationInputs, models.ConversationInputsTypedDict],
inputs: Union[
models.ConversationInputs,
models.ConversationInputsTypedDict,
List[DeferredToolCallResponse],
],
instructions: OptionalNullable[str] = UNSET,
tools: OptionalNullable[
Union[
Expand All @@ -68,16 +83,44 @@ async def run_async(
) -> RunResult:
"""Run a conversation with the given inputs and context.

The execution of a run will only stop when no required local execution can be done."""
The execution of a run will only stop when no required local execution can be done.

Inputs can be:
- Regular conversation inputs (messages, function results, etc.)
- DeferredToolResponse objects (from deferred.confirm(), reject())

When passing DeferredToolResponse objects, the SDK will:
- Execute confirmed tools automatically
- Convert rejections to function results with the rejection message
"""
from mistralai.client.beta import Beta # pylint: disable=import-outside-toplevel
from mistralai.extra.run.context import _validate_run # pylint: disable=import-outside-toplevel
from mistralai.extra.run.tools import get_function_calls # pylint: disable=import-outside-toplevel

# Check if inputs contain deferred responses - process them
pending_tool_confirmations: Optional[List[models.ToolCallConfirmation]] = None
if inputs and isinstance(inputs, list):
deferred_inputs = typing.cast(
List[DeferredToolCallResponse],
[i for i in inputs if _is_deferred_response(i)],
)
other_inputs = typing.cast(
List[InputEntries], [i for i in inputs if not _is_deferred_response(i)]
)
if deferred_inputs:
(
processed,
pending_tool_confirmations,
) = await _process_deferred_responses(run_ctx, deferred_inputs)
inputs = other_inputs + processed
if not pending_tool_confirmations:
pending_tool_confirmations = None

with tracer.start_as_current_span(GenAISpanEnum.VALIDATE_RUN.value):
req, run_result, input_entries = await _validate_run(
beta_client=Beta(self.sdk_configuration),
run_ctx=run_ctx,
inputs=inputs,
inputs=typing.cast(List[InputEntries], inputs),
instructions=instructions,
tools=tools,
completion_args=completion_args,
Expand Down Expand Up @@ -105,26 +148,68 @@ async def run_async(
res = await self.append_async(
conversation_id=run_ctx.conversation_id,
inputs=input_entries,
tool_confirmations=pending_tool_confirmations,
retries=retries,
server_url=server_url,
timeout_ms=timeout_ms,
http_headers=http_headers,
)
# Clear after first use
pending_tool_confirmations = None
run_ctx.request_count += 1
run_result.output_entries.extend(res.outputs)
fcalls = get_function_calls(res.outputs)
if not fcalls:
logger.debug("No more function calls to execute")
break
fresults = await run_ctx.execute_function_calls(fcalls)
run_result.output_entries.extend(fresults)
input_entries = typing.cast(list[InputEntries], fresults)

# Partition by permission: include server-side deferred calls
to_defer = [
fc
for fc in fcalls
if run_ctx.requires_confirmation(fc.name) or _is_server_deferred(fc)
]
to_execute = [fc for fc in fcalls if fc not in to_defer]

# Execute approved
fresults = []
if to_execute:
fresults = await run_ctx.execute_function_calls(to_execute)
run_result.output_entries.extend(fresults)
input_entries = typing.cast(list[InputEntries], fresults)

# Defer the rest - include executed_results so user can pass them back
if to_defer:
deferred_objects = [
DeferredToolCallEntry(
fc,
reason=DeferralReason.SERVER_SIDE_CONFIRMATION_REQUIRED
if _is_server_deferred(fc)
else DeferralReason.CONFIRMATION_REQUIRED,
)
for fc in to_defer
]
raise DeferredToolCallsException(
run_ctx.conversation_id,
deferred_objects,
run_result.output_entries,
executed_results=fresults,
)

# If we only executed tools (none deferred), continue the loop
if not to_execute:
break
return run_result

@run_requirements
async def run_stream_async(
self,
run_ctx: "RunContext",
inputs: Union[models.ConversationInputs, models.ConversationInputsTypedDict],
inputs: Union[
models.ConversationInputs,
models.ConversationInputsTypedDict,
List[DeferredToolCallResponse],
],
instructions: OptionalNullable[str] = UNSET,
tools: OptionalNullable[
Union[
Expand All @@ -144,23 +229,48 @@ async def run_stream_async(
) -> AsyncGenerator[Union[RunResultEvents, RunResult], None]:
"""Similar to `run_async` but returns a generator which streams events.

The last streamed object is the RunResult object which summarises what happened in the run."""
The last streamed object is the RunResult object which summarises what happened in the run.

Inputs can be:
- Regular conversation inputs (messages, function results, etc.)
- DeferredToolResponse objects (from deferred.confirm(), reject())
"""
from mistralai.client.beta import Beta # pylint: disable=import-outside-toplevel
from mistralai.extra.run.context import _validate_run # pylint: disable=import-outside-toplevel
from mistralai.extra.run.tools import get_function_calls # pylint: disable=import-outside-toplevel

# Check if inputs contain deferred responses - process them
pending_tool_confirmations: Optional[List[models.ToolCallConfirmation]] = None
if inputs and isinstance(inputs, list):
deferred_inputs = typing.cast(
List[DeferredToolCallResponse],
[i for i in inputs if _is_deferred_response(i)],
)
other_inputs = typing.cast(
List[InputEntries], [i for i in inputs if not _is_deferred_response(i)]
)
if deferred_inputs:
(
processed,
pending_tool_confirmations,
) = await _process_deferred_responses(run_ctx, deferred_inputs)
inputs = other_inputs + processed
if not pending_tool_confirmations:
pending_tool_confirmations = None

req, run_result, input_entries = await _validate_run(
beta_client=Beta(self.sdk_configuration),
run_ctx=run_ctx,
inputs=inputs,
inputs=typing.cast(List[InputEntries], inputs),
instructions=instructions,
tools=tools,
completion_args=completion_args,
)

async def run_generator() -> (
AsyncGenerator[Union[RunResultEvents, RunResult], None]
):
async def run_generator() -> AsyncGenerator[
Union[RunResultEvents, RunResult], None
]:
nonlocal pending_tool_confirmations
current_entries = input_entries
while True:
received_event_tracker: defaultdict[
Expand All @@ -181,10 +291,13 @@ async def run_generator() -> (
res = await self.append_stream_async(
conversation_id=run_ctx.conversation_id,
inputs=current_entries,
tool_confirmations=pending_tool_confirmations,
retries=retries,
server_url=server_url,
timeout_ms=timeout_ms,
)
# Clear after first use
pending_tool_confirmations = None
async for event in res:
if (
isinstance(event.data, ResponseStartedEvent)
Expand All @@ -207,18 +320,52 @@ async def run_generator() -> (
if not fcalls:
logger.debug("No more function calls to execute")
break
fresults = await run_ctx.execute_function_calls(fcalls)
run_result.output_entries.extend(fresults)
for fresult in fresults:
yield RunResultEvents(
event="function.result",
data=FunctionResultEvent(
type="function.result",
result=fresult.result,
tool_call_id=fresult.tool_call_id,
),

# Partition by permission: include server-side deferred calls
to_defer = [
fc
for fc in fcalls
if run_ctx.requires_confirmation(fc.name) or _is_server_deferred(fc)
]
to_execute = [fc for fc in fcalls if fc not in to_defer]

# Execute approved
fresults = []
if to_execute:
fresults = await run_ctx.execute_function_calls(to_execute)
run_result.output_entries.extend(fresults)
for fresult in fresults:
yield RunResultEvents(
event="function.result",
data=FunctionResultEvent(
type="function.result",
result=fresult.result,
tool_call_id=fresult.tool_call_id,
),
)
current_entries = typing.cast(list[InputEntries], fresults)

# Defer the rest - include executed_results so user can pass them back
if to_defer:
deferred_objects = [
DeferredToolCallEntry(
fc,
reason=DeferralReason.SERVER_SIDE_CONFIRMATION_REQUIRED
if _is_server_deferred(fc)
else DeferralReason.CONFIRMATION_REQUIRED,
)
for fc in to_defer
]
raise DeferredToolCallsException(
run_ctx.conversation_id,
deferred_objects,
run_result.output_entries,
executed_results=fresults,
)
current_entries = typing.cast(list[InputEntries], fresults)

# If we only executed tools (none deferred), continue the loop
if not to_execute:
break
yield run_result

return run_generator()
Expand Down
Loading
Loading