Skip to content

Fix for mypy 1.14.1 #41674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def case_insensitive_match(d: Mapping[str, Any], key: str) -> Optional[Any]:
cloud = _KNOWN_AZURE_ENVIRONMENTS.get(name) or case_insensitive_match(_KNOWN_AZURE_ENVIRONMENTS, name)
if cloud:
return cloud
default_endpoint = (_KNOWN_AZURE_ENVIRONMENTS
.get(_DEFAULT_AZURE_ENV_NAME, {})
.get("resource_manager_endpoint"))
default_cloud = _KNOWN_AZURE_ENVIRONMENTS.get(_DEFAULT_AZURE_ENV_NAME)
default_endpoint = default_cloud.get("resource_manager_endpoint") if default_cloud else None

metadata_url = self.get_default_metadata_url(default_endpoint)
clouds = await self.get_clouds_async(metadata_url=metadata_url, update_cached=update_cached)
Expand Down Expand Up @@ -155,7 +154,8 @@ def get_default_metadata_url(default_endpoint: Optional[str] = None) -> str:
@staticmethod
async def _get_registry_discovery_url_async(cloud_name: str, cloud_suffix: str) -> str:
async with _ASYNC_LOCK:
discovery_url = _KNOWN_AZURE_ENVIRONMENTS.get(cloud_name, {}).get("registry_discovery_endpoint")
cloud_metadata = _KNOWN_AZURE_ENVIRONMENTS.get(cloud_name)
discovery_url = cloud_metadata.get("registry_discovery_endpoint") if cloud_metadata else None
if discovery_url:
return discovery_url

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Union, Any

if TYPE_CHECKING:
from . import models as _models

# Note: Some of these model references may not exist in the current models module
# Using Any as fallback for missing types
AgentsApiResponseFormatOption = Union[
str,
str,
"_models.AgentsApiResponseFormatMode",
"_models.AgentsApiResponseFormat",
"_models.ResponseFormatJsonSchemaType",
Any, # "_models.AgentsApiResponseFormatMode",
Any, # "_models.AgentsApiResponseFormat",
Any, # "_models.ResponseFormatJsonSchemaType",
]
MessageAttachmentToolDefinition = Union["_models.CodeInterpreterToolDefinition", "_models.FileSearchToolDefinition"]
AgentsApiToolChoiceOption = Union[str, str, "_models.AgentsApiToolChoiceOptionMode", "_models.AgentsNamedToolChoice"]
MessageAttachmentToolDefinition = Union[Any, Any] # "_models.CodeInterpreterToolDefinition", "_models.FileSearchToolDefinition"
AgentsApiToolChoiceOption = Union[str, str, Any, Any] # "_models.AgentsApiToolChoiceOptionMode", "_models.AgentsNamedToolChoice"
Comment on lines 16 to +24
Copy link
Preview

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The Union[str, str, ...] includes str twice, which is redundant. Removing the duplicate entry will simplify the type definition.

Copilot uses AI. Check for mistakes.

Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

from pydantic import BaseModel

from typing import List, Optional, Union
from typing import List, Optional, Union, TYPE_CHECKING, Any

# Models moved in a later version of agents SDK, so try a few different locations
try:
from azure.ai.projects.models import RunStepFunctionToolCall
except ImportError:
pass
try:
from azure.ai.agents.models import RunStepFunctionToolCall
except ImportError:
pass
if TYPE_CHECKING:
try:
from azure.ai.projects.models import RunStepFunctionToolCall
except ImportError:
try:
from azure.ai.agents.models import RunStepFunctionToolCall
except ImportError:
RunStepFunctionToolCall = Any

# Message roles constants.
_SYSTEM = "system"
Expand Down Expand Up @@ -244,14 +244,16 @@ def break_tool_call_into_messages(tool_call: ToolCall, run_id: str) -> List[Mess
# Treat built-in tools separately. Object models may be unique so handle each case separately
# Just converting to dicts here rather than custom serializers for simplicity for now.
# Don't fail if we run into a newly seen tool, just skip
if tool_call.details["type"] == "code_interpreter":
arguments = {"input": tool_call.details.code_interpreter.input}
elif tool_call.details["type"] == "bing_grounding":
arguments = {"requesturl": tool_call.details["bing_grounding"]["requesturl"]}
elif tool_call.details["type"] == "file_search":
options = tool_call.details["file_search"]["ranking_options"]
if hasattr(tool_call.details, "type") and tool_call.details.type == "code_interpreter": # type: ignore
arguments = {"input": getattr(tool_call.details, "code_interpreter", {}).get("input", "")} # type: ignore
elif hasattr(tool_call.details, "type") and tool_call.details.type == "bing_grounding": # type: ignore
bing_grounding = getattr(tool_call.details, "bing_grounding", {}) # type: ignore
arguments = {"requesturl": bing_grounding.get("requesturl", "")}
elif hasattr(tool_call.details, "type") and tool_call.details.type == "file_search": # type: ignore
file_search = getattr(tool_call.details, "file_search", {}) # type: ignore
options = file_search.get("ranking_options", {})
arguments = {
"ranking_options": {"ranker": options["ranker"], "score_threshold": options["score_threshold"]}
"ranking_options": {"ranker": options.get("ranker", ""), "score_threshold": options.get("score_threshold", 0)}
}
elif tool_call.details["type"] == "azure_ai_search":
arguments = {"input": tool_call.details["azure_ai_search"]["input"]}
Expand All @@ -276,46 +278,52 @@ def break_tool_call_into_messages(tool_call: ToolCall, run_id: str) -> List[Mess
# assistant's action of calling the tool.
messages.append(AssistantMessage(run_id=run_id, content=[to_dict(content_tool_call)], createdAt=tool_call.created))

output = None
if hasattr(tool_call.details, _FUNCTION):
output = safe_loads(tool_call.details.function["output"])
output = safe_loads(getattr(tool_call.details, "function", {}).get("output", "")) # type: ignore
else:
try:
# Some built-ins may have output, others may not
# Try to retrieve it, but if we don't find anything, skip adding the message
# Just manually converting to dicts for easy serialization for now rather than custom serializers
if tool_call.details.type == _CODE_INTERPRETER:
output = tool_call.details.code_interpreter.outputs
elif tool_call.details.type == _BING_GROUNDING:
if hasattr(tool_call.details, "type") and tool_call.details.type == _CODE_INTERPRETER: # type: ignore
output = getattr(tool_call.details, "code_interpreter", {}).get("outputs", []) # type: ignore
elif hasattr(tool_call.details, "type") and tool_call.details.type == _BING_GROUNDING: # type: ignore
return messages # not supported yet from bing grounding tool
elif tool_call.details.type == _FILE_SEARCH:
elif hasattr(tool_call.details, "type") and tool_call.details.type == _FILE_SEARCH: # type: ignore
file_search_results = getattr(tool_call.details, "file_search", {}).get("results", []) # type: ignore
output = [
{
"file_id": result.file_id,
"file_name": result.file_name,
"score": result.score,
"content": result.content,
"file_id": getattr(result, "file_id", ""),
"file_name": getattr(result, "file_name", ""),
"score": getattr(result, "score", 0),
"content": getattr(result, "content", ""),
}
for result in tool_call.details.file_search.results
for result in file_search_results
]
elif tool_call.details.type == _AZURE_AI_SEARCH:
output = tool_call.details.azure_ai_search["output"]
elif tool_call.details.type == _FABRIC_DATAAGENT:
output = tool_call.details.fabric_dataagent["output"]
elif hasattr(tool_call.details, "type") and tool_call.details.type == _AZURE_AI_SEARCH: # type: ignore
azure_ai_search = getattr(tool_call.details, "azure_ai_search", {}) # type: ignore
output = azure_ai_search.get("output", "")
elif hasattr(tool_call.details, "type") and tool_call.details.type == _FABRIC_DATAAGENT: # type: ignore
fabric_dataagent = getattr(tool_call.details, "fabric_dataagent", {}) # type: ignore
output = fabric_dataagent.get("output", "")
except:
Copy link
Preview

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Catching a bare except: can mask unexpected errors. It's better to catch specific exceptions (e.g., except AttributeError: or except Exception:).

Suggested change
except:
except (AttributeError, TypeError) as e:
# Log the error for debugging purposes
print(f"Error occurred while processing tool_call details: {e}")

Copilot uses AI. Check for mistakes.

return messages

# Now, onto the tool result, which only includes the result of the function call.
content_tool_call_result = {"type": _TOOL_RESULT, _TOOL_RESULT: output}

# Since this is a tool's action of returning, we put it as a tool message.
messages.append(
ToolMessage(
run_id=run_id,
tool_call_id=tool_call_id,
content=[to_dict(content_tool_call_result)],
createdAt=tool_call.completed,
# Only add tool result if we have output
if output is not None:
# Now, onto the tool result, which only includes the result of the function call.
content_tool_call_result = {"type": _TOOL_RESULT, _TOOL_RESULT: output}

# Since this is a tool's action of returning, we put it as a tool message.
messages.append(
ToolMessage(
run_id=run_id,
tool_call_id=tool_call_id,
content=[to_dict(content_tool_call_result)],
createdAt=tool_call.completed,
)
)
)
return messages


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


try:
from promptflow.core._errors import MissingRequiredPackage as _MissingRequiredPackage
from promptflow.core._errors import MissingRequiredPackage
except ImportError:
from azure.ai.evaluation._exceptions import ErrorBlame, ErrorCategory, ErrorTarget, EvaluationException

class _MissingRequiredPackage(EvaluationException):
class MissingRequiredPackage(EvaluationException):
"""Raised when a required package is missing.

:param message: A message describing the error. This is the error message the user will see.
Expand All @@ -24,6 +24,3 @@ def __init__(self, message: str, **kwargs: Any):
kwargs.setdefault("target", ErrorTarget.EVALUATE)
kwargs.setdefault("internal_message", "Missing required package.")
super().__init__(message=message, **kwargs)


MissingRequiredPackage: TypeAlias = _MissingRequiredPackage
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,18 @@


try:
from promptflow.core._flow import AsyncPrompty as _AsyncPrompty
from promptflow._sdk.entities._flows import FlexFlow as _FlexFlow
from promptflow._sdk.entities._flows.dag import Flow as _Flow
from promptflow.core._flow import AsyncPrompty
from promptflow._sdk.entities._flows import FlexFlow
from promptflow._sdk.entities._flows.dag import Flow
except ImportError:
from azure.ai.evaluation._legacy.prompty import AsyncPrompty as _AsyncPrompty
from azure.ai.evaluation._legacy.prompty import AsyncPrompty

class _FlexFlow:
class FlexFlow:
pass

_FlexFlow.__name__ = "FlexFlow"
FlexFlow.__name__ = "FlexFlow"

class _Flow:
class Flow:
name: str

_Flow.__name__ = "Flow"


AsyncPrompty: TypeAlias = _AsyncPrompty
FlexFlow: TypeAlias = _FlexFlow
Flow: TypeAlias = _Flow
Flow.__name__ = "Flow"
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@


try:
from promptflow.client import PFClient as _PFClient
from promptflow.client import PFClient
except ImportError:

class _PFClient:
class PFClient:
def __init__(self, **kwargs):
self._config = Configuration(override_config=kwargs.pop("config", None))

Expand Down Expand Up @@ -46,6 +46,3 @@ def get_details(self, run: Union[str, Run], max_results: int = 100, all_results:

def get_metrics(self, run: Union[str, Run]) -> Dict[str, Any]:
return {}


PFClient: TypeAlias = _PFClient
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,20 @@


try:
from promptflow._utils.user_agent_utils import ClientUserAgentUtil as _ClientUserAgentUtil
from promptflow._utils.async_utils import async_run_allowing_running_loop as _async_run_allowing_running_loop
from promptflow._cli._utils import get_workspace_triad_from_local as _get_workspace_triad_from_local
from promptflow._utils.user_agent_utils import ClientUserAgentUtil
from promptflow._utils.async_utils import async_run_allowing_running_loop
from promptflow._cli._utils import get_workspace_triad_from_local
except ImportError:
from azure.ai.evaluation._legacy._batch_engine._utils_deprecated import (
async_run_allowing_running_loop as _async_run_allowing_running_loop,
async_run_allowing_running_loop,
)
from azure.ai.evaluation._evaluate._utils import AzureMLWorkspace

class _ClientUserAgentUtil:
class ClientUserAgentUtil:
@staticmethod
def append_user_agent(user_agent: Optional[str]):
# TODO ralphe: implement?
pass

def _get_workspace_triad_from_local() -> AzureMLWorkspace:
def get_workspace_triad_from_local() -> AzureMLWorkspace:
return AzureMLWorkspace("", "", "")


ClientUserAgentUtil: TypeAlias = _ClientUserAgentUtil
async_run_allowing_running_loop: Final = _async_run_allowing_running_loop
get_workspace_triad_from_local: Final = _get_workspace_triad_from_local
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from functools import partial
from contextlib import contextmanager
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Final, Generator, Mapping, MutableMapping, Optional, Sequence, Set, Tuple, cast
from typing import Any, Callable, Dict, Final, Generator, List, Mapping, MutableMapping, Optional, Sequence, Set, Tuple, cast
from uuid import uuid4

from ._utils import DEFAULTS_KEY, get_int_env_var, get_value_from_path, is_async_callable
Expand Down Expand Up @@ -122,7 +122,7 @@ def _apply_column_mapping(
) -> Sequence[Mapping[str, str]]:
data = data[:max_lines] if max_lines else data

inputs: Sequence[Mapping[str, Any]] = []
inputs: List[Mapping[str, Any]] = []
line: int = 0
defaults = cast(Mapping[str, Any], column_mapping.get(DEFAULTS_KEY, {}))

Expand Down Expand Up @@ -270,6 +270,7 @@ async def create_under_semaphore(index: int, inputs: Mapping[str, Any]):
# TODO ralphe: Fix this code so it doesn't re-order the outputs
# wait for any task to complete
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
pending = list(pending) # Convert set back to list
completed_line_results = [task.result() for task in done]
# persist node run infos and flow run info in line result to storage
self._persist_run_info([result for _, result in completed_line_results])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from concurrent.futures import Executor
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Mapping, Optional, Sequence, TextIO, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, TextIO, Union

from ._run import Run, RunStatus
from ._trace import start_trace
Expand Down Expand Up @@ -97,7 +97,7 @@ async def _submit_bulk_run(self, run: Run, local_storage: AbstractRunStorage, **
# has also been removed since it can be problematic in a multi-threaded environment.

if run.previous_run:
previous: Optional[Run] = run.previous_run
previous: Run = run.previous_run
if previous.status != RunStatus.COMPLETED:
raise BatchEngineValidationError(
f"Referenced run {previous.name} is not completed, got status {previous.status.value}."
Expand Down Expand Up @@ -138,7 +138,7 @@ async def _submit_bulk_run(self, run: Run, local_storage: AbstractRunStorage, **
batch_result = await batch_engine.run(data=run.inputs, column_mapping=run.column_mapping, id=run.name)
run._status = RunStatus.from_batch_result_status(batch_result.status)

error_logs: Sequence[str] = []
error_logs: List[str] = []
if run._status != RunStatus.COMPLETED:
error_logs.append(f"Run {run.name} failed with status {batch_result.status}.")
if batch_result.error:
Expand Down
Loading