diff --git a/sentry_sdk/ai/utils.py b/sentry_sdk/ai/utils.py index 71f7544a1c..a4ebe96d99 100644 --- a/sentry_sdk/ai/utils.py +++ b/sentry_sdk/ai/utils.py @@ -72,6 +72,397 @@ def parse_data_uri(url: str) -> "Tuple[str, str]": return mime_type, content +def get_modality_from_mime_type(mime_type: str) -> str: + """ + Infer the content modality from a MIME type string. + + Args: + mime_type: A MIME type string (e.g., "image/jpeg", "audio/mp3") + + Returns: + One of: "image", "audio", "video", or "document" + Defaults to "image" for unknown or empty MIME types. + + Examples: + "image/jpeg" -> "image" + "audio/mp3" -> "audio" + "video/mp4" -> "video" + "application/pdf" -> "document" + "text/plain" -> "document" + """ + if not mime_type: + return "image" # Default fallback + + mime_lower = mime_type.lower() + if mime_lower.startswith("image/"): + return "image" + elif mime_lower.startswith("audio/"): + return "audio" + elif mime_lower.startswith("video/"): + return "video" + elif mime_lower.startswith("application/") or mime_lower.startswith("text/"): + return "document" + else: + return "image" # Default fallback for unknown types + + +def transform_openai_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform an OpenAI/LiteLLM content part to Sentry's standardized format. + + This handles the OpenAI image_url format used by OpenAI and LiteLLM SDKs. + + Input format: + - {"type": "image_url", "image_url": {"url": "..."}} + - {"type": "image_url", "image_url": "..."} (string shorthand) + + Output format (one of): + - {"type": "blob", "modality": "image", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "image", "mime_type": "", "uri": "..."} + + Args: + content_part: A dictionary representing a content part from OpenAI/LiteLLM + + Returns: + A transformed dictionary in standardized format, or None if the format + is not OpenAI image_url format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type != "image_url": + return None + + image_url_data = content_part.get("image_url") + if isinstance(image_url_data, str): + url = image_url_data + elif isinstance(image_url_data, dict): + url = image_url_data.get("url", "") + else: + return None + + if not url: + return None + + # Check if it's a data URI (base64 encoded) + if url.startswith("data:"): + try: + mime_type, content = parse_data_uri(url) + return { + "type": "blob", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "content": content, + } + except ValueError: + # If parsing fails, return as URI + return { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": url, + } + else: + # Regular URL + return { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": url, + } + + +def transform_anthropic_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform an Anthropic content part to Sentry's standardized format. + + This handles the Anthropic image and document formats with source dictionaries. + + Input format: + - {"type": "image", "source": {"type": "base64", "media_type": "...", "data": "..."}} + - {"type": "image", "source": {"type": "url", "media_type": "...", "url": "..."}} + - {"type": "image", "source": {"type": "file", "media_type": "...", "file_id": "..."}} + - {"type": "document", "source": {...}} (same source formats) + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part from Anthropic + + Returns: + A transformed dictionary in standardized format, or None if the format + is not Anthropic format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type not in ("image", "document") or "source" not in content_part: + return None + + source = content_part.get("source") + if not isinstance(source, dict): + return None + + source_type = source.get("type") + media_type = source.get("media_type", "") + modality = ( + "document" + if block_type == "document" + else get_modality_from_mime_type(media_type) + ) + + if source_type == "base64": + return { + "type": "blob", + "modality": modality, + "mime_type": media_type, + "content": source.get("data", ""), + } + elif source_type == "url": + return { + "type": "uri", + "modality": modality, + "mime_type": media_type, + "uri": source.get("url", ""), + } + elif source_type == "file": + return { + "type": "file", + "modality": modality, + "mime_type": media_type, + "file_id": source.get("file_id", ""), + } + + return None + + +def transform_google_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a Google GenAI content part to Sentry's standardized format. + + This handles the Google GenAI inline_data and file_data formats. + + Input format: + - {"inline_data": {"mime_type": "...", "data": "..."}} + - {"file_data": {"mime_type": "...", "file_uri": "..."}} + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + + Args: + content_part: A dictionary representing a content part from Google GenAI + + Returns: + A transformed dictionary in standardized format, or None if the format + is not Google format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + # Handle Google inline_data format + if "inline_data" in content_part: + inline_data = content_part.get("inline_data") + if isinstance(inline_data, dict): + mime_type = inline_data.get("mime_type", "") + return { + "type": "blob", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "content": inline_data.get("data", ""), + } + return None + + # Handle Google file_data format + if "file_data" in content_part: + file_data = content_part.get("file_data") + if isinstance(file_data, dict): + mime_type = file_data.get("mime_type", "") + return { + "type": "uri", + "modality": get_modality_from_mime_type(mime_type), + "mime_type": mime_type, + "uri": file_data.get("file_uri", ""), + } + return None + + return None + + +def transform_generic_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a generic/LangChain-style content part to Sentry's standardized format. + + This handles generic formats where the type indicates the modality and + the data is provided via direct base64, url, or file_id fields. + + Input format: + - {"type": "image", "base64": "...", "mime_type": "..."} + - {"type": "audio", "url": "...", "mime_type": "..."} + - {"type": "video", "base64": "...", "mime_type": "..."} + - {"type": "file", "file_id": "...", "mime_type": "..."} + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part in generic format + + Returns: + A transformed dictionary in standardized format, or None if the format + is not generic format or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + block_type = content_part.get("type") + + if block_type not in ("image", "audio", "video", "file"): + return None + + # Ensure it's not Anthropic format (which also uses type: "image") + if "source" in content_part: + return None + + mime_type = content_part.get("mime_type", "") + modality = block_type if block_type != "file" else "document" + + # Check for base64 encoded content + if "base64" in content_part: + return { + "type": "blob", + "modality": modality, + "mime_type": mime_type, + "content": content_part.get("base64", ""), + } + # Check for URL reference + elif "url" in content_part: + return { + "type": "uri", + "modality": modality, + "mime_type": mime_type, + "uri": content_part.get("url", ""), + } + # Check for file_id reference + elif "file_id" in content_part: + return { + "type": "file", + "modality": modality, + "mime_type": mime_type, + "file_id": content_part.get("file_id", ""), + } + + return None + + +def transform_content_part( + content_part: "Dict[str, Any]", +) -> "Optional[Dict[str, Any]]": + """ + Transform a content part from various AI SDK formats to Sentry's standardized format. + + This is a heuristic dispatcher that detects the format and delegates to the + appropriate SDK-specific transformer. For direct SDK integration, prefer using + the specific transformers directly: + - transform_openai_content_part() for OpenAI/LiteLLM + - transform_anthropic_content_part() for Anthropic + - transform_google_content_part() for Google GenAI + - transform_generic_content_part() for LangChain and other generic formats + + Detection order: + 1. OpenAI: type == "image_url" + 2. Google: "inline_data" or "file_data" keys present + 3. Anthropic: type in ("image", "document") with "source" key + 4. Generic: type in ("image", "audio", "video", "file") with base64/url/file_id + + Output format (one of): + - {"type": "blob", "modality": "...", "mime_type": "...", "content": "..."} + - {"type": "uri", "modality": "...", "mime_type": "...", "uri": "..."} + - {"type": "file", "modality": "...", "mime_type": "...", "file_id": "..."} + + Args: + content_part: A dictionary representing a content part from an AI SDK + + Returns: + A transformed dictionary in standardized format, or None if the format + is unrecognized or transformation fails. + """ + if not isinstance(content_part, dict): + return None + + # Try OpenAI format first (most common, clear indicator) + result = transform_openai_content_part(content_part) + if result is not None: + return result + + # Try Google format (unique keys make it easy to detect) + result = transform_google_content_part(content_part) + if result is not None: + return result + + # Try Anthropic format (has "source" key) + result = transform_anthropic_content_part(content_part) + if result is not None: + return result + + # Try generic format as fallback + result = transform_generic_content_part(content_part) + if result is not None: + return result + + # Unrecognized format + return None + + +def transform_message_content(content: "Any") -> "Any": + """ + Transform message content, handling both string content and list of content blocks. + + For list content, each item is transformed using transform_content_part(). + Items that cannot be transformed (return None) are kept as-is. + + Args: + content: Message content - can be a string, list of content blocks, or other + + Returns: + - String content: returned as-is + - List content: list with each transformable item converted to standardized format + - Other: returned as-is + """ + if isinstance(content, str): + return content + + if isinstance(content, (list, tuple)): + transformed = [] + for item in content: + if isinstance(item, dict): + result = transform_content_part(item) + # If transformation succeeded, use the result; otherwise keep original + transformed.append(result if result is not None else item) + else: + transformed.append(item) + return transformed + + return content + + def _normalize_data(data: "Any", unpack: bool = True) -> "Any": # convert pydantic data (e.g. OpenAI v1+) to json compatible format if hasattr(data, "model_dump"): diff --git a/sentry_sdk/integrations/pydantic_ai/spans/ai_client.py b/sentry_sdk/integrations/pydantic_ai/spans/ai_client.py index cb34f36e4f..00c8c934e8 100644 --- a/sentry_sdk/integrations/pydantic_ai/spans/ai_client.py +++ b/sentry_sdk/integrations/pydantic_ai/spans/ai_client.py @@ -1,5 +1,11 @@ import sentry_sdk -from sentry_sdk.ai.utils import set_data_normalized +from sentry_sdk._types import BLOB_DATA_SUBSTITUTE +from sentry_sdk.ai.utils import ( + normalize_message_roles, + set_data_normalized, + truncate_and_annotate_messages, + get_modality_from_mime_type, +) from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.utils import safe_serialize @@ -29,6 +35,7 @@ UserPromptPart, TextPart, ThinkingPart, + BinaryContent, ) except ImportError: # Fallback if these classes are not available @@ -38,6 +45,7 @@ UserPromptPart = None TextPart = None ThinkingPart = None + BinaryContent = None def _set_input_messages(span: "sentry_sdk.tracing.Span", messages: "Any") -> None: @@ -107,6 +115,17 @@ def _set_input_messages(span: "sentry_sdk.tracing.Span", messages: "Any") -> Non for item in part.content: if isinstance(item, str): content.append({"type": "text", "text": item}) + elif BinaryContent and isinstance(item, BinaryContent): + content.append( + { + "type": "blob", + "modality": get_modality_from_mime_type( + item.media_type + ), + "mime_type": item.media_type, + "content": BLOB_DATA_SUBSTITUTE, + } + ) else: content.append(safe_serialize(item)) else: @@ -124,8 +143,13 @@ def _set_input_messages(span: "sentry_sdk.tracing.Span", messages: "Any") -> Non formatted_messages.append(message) if formatted_messages: + normalized_messages = normalize_message_roles(formatted_messages) + scope = sentry_sdk.get_current_scope() + messages_data = truncate_and_annotate_messages( + normalized_messages, span, scope + ) set_data_normalized( - span, SPANDATA.GEN_AI_REQUEST_MESSAGES, formatted_messages, unpack=False + span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False ) except Exception: # If we fail to format messages, just skip it diff --git a/sentry_sdk/integrations/pydantic_ai/spans/invoke_agent.py b/sentry_sdk/integrations/pydantic_ai/spans/invoke_agent.py index 629b3d1206..b4f8307170 100644 --- a/sentry_sdk/integrations/pydantic_ai/spans/invoke_agent.py +++ b/sentry_sdk/integrations/pydantic_ai/spans/invoke_agent.py @@ -1,5 +1,12 @@ import sentry_sdk -from sentry_sdk.ai.utils import get_start_span_function, set_data_normalized +from sentry_sdk._types import BLOB_DATA_SUBSTITUTE +from sentry_sdk.ai.utils import ( + get_modality_from_mime_type, + get_start_span_function, + normalize_message_roles, + set_data_normalized, + truncate_and_annotate_messages, +) from sentry_sdk.consts import OP, SPANDATA from ..consts import SPAN_ORIGIN @@ -16,6 +23,11 @@ if TYPE_CHECKING: from typing import Any +try: + from pydantic_ai.messages import BinaryContent # type: ignore +except ImportError: + BinaryContent = None + def invoke_agent_span( user_prompt: "Any", @@ -93,6 +105,17 @@ def invoke_agent_span( for item in user_prompt: if isinstance(item, str): content.append({"text": item, "type": "text"}) + elif BinaryContent and isinstance(item, BinaryContent): + content.append( + { + "type": "blob", + "modality": get_modality_from_mime_type( + item.media_type + ), + "mime_type": item.media_type, + "content": BLOB_DATA_SUBSTITUTE, + } + ) if content: messages.append( { @@ -102,8 +125,13 @@ def invoke_agent_span( ) if messages: + normalized_messages = normalize_message_roles(messages) + scope = sentry_sdk.get_current_scope() + messages_data = truncate_and_annotate_messages( + normalized_messages, span, scope + ) set_data_normalized( - span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages, unpack=False + span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False ) return span diff --git a/sentry_sdk/integrations/pydantic_ai/spans/utils.py b/sentry_sdk/integrations/pydantic_ai/spans/utils.py index c70afd5f31..89fef172e1 100644 --- a/sentry_sdk/integrations/pydantic_ai/spans/utils.py +++ b/sentry_sdk/integrations/pydantic_ai/spans/utils.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Union + from typing import Union, Dict, Any, List from pydantic_ai.usage import RequestUsage, RunUsage # type: ignore diff --git a/tests/integrations/pydantic_ai/test_pydantic_ai.py b/tests/integrations/pydantic_ai/test_pydantic_ai.py index 049bcde39c..7387f1be15 100644 --- a/tests/integrations/pydantic_ai/test_pydantic_ai.py +++ b/tests/integrations/pydantic_ai/test_pydantic_ai.py @@ -1,12 +1,18 @@ import asyncio +import json import pytest +from unittest.mock import MagicMock from typing import Annotated from pydantic import Field +import sentry_sdk +from sentry_sdk._types import BLOB_DATA_SUBSTITUTE from sentry_sdk.integrations.pydantic_ai import PydanticAIIntegration +from sentry_sdk.integrations.pydantic_ai.spans.ai_client import _set_input_messages from pydantic_ai import Agent +from pydantic_ai.messages import BinaryContent, UserPromptPart from pydantic_ai.models.test import TestModel from pydantic_ai.exceptions import ModelRetry, UnexpectedModelBehavior @@ -2604,3 +2610,123 @@ async def test_ai_client_span_gets_agent_from_scope(sentry_init, capture_events) # Should not crash assert transaction is not None + + +def _get_messages_from_span(span_data): + """Helper to extract and parse messages from span data.""" + messages_data = span_data["gen_ai.request.messages"] + return ( + json.loads(messages_data) if isinstance(messages_data, str) else messages_data + ) + + +def _find_binary_content(messages_data, expected_modality, expected_mime_type): + """Helper to find and verify binary content in messages.""" + for msg in messages_data: + if "content" not in msg: + continue + for content_item in msg["content"]: + if content_item.get("type") == "blob": + assert content_item["modality"] == expected_modality + assert content_item["mime_type"] == expected_mime_type + assert content_item["content"] == BLOB_DATA_SUBSTITUTE + return True + return False + + +@pytest.mark.asyncio +async def test_binary_content_encoding_image(sentry_init, capture_events): + """Test that BinaryContent with image data is properly encoded in messages.""" + sentry_init( + integrations=[PydanticAIIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) + + events = capture_events() + + with sentry_sdk.start_transaction(op="test", name="test"): + span = sentry_sdk.start_span(op="test_span") + binary_content = BinaryContent( + data=b"fake_image_data_12345", media_type="image/png" + ) + user_part = UserPromptPart(content=["Look at this image:", binary_content]) + mock_msg = MagicMock() + mock_msg.parts = [user_part] + mock_msg.instructions = None + + _set_input_messages(span, [mock_msg]) + span.finish() + + (event,) = events + span_data = event["spans"][0]["data"] + messages_data = _get_messages_from_span(span_data) + assert _find_binary_content(messages_data, "image", "image/png") + + +@pytest.mark.asyncio +async def test_binary_content_encoding_mixed_content(sentry_init, capture_events): + """Test that BinaryContent mixed with text content is properly handled.""" + sentry_init( + integrations=[PydanticAIIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) + + events = capture_events() + + with sentry_sdk.start_transaction(op="test", name="test"): + span = sentry_sdk.start_span(op="test_span") + binary_content = BinaryContent( + data=b"fake_image_bytes", media_type="image/jpeg" + ) + user_part = UserPromptPart( + content=["Here is an image:", binary_content, "What do you see?"] + ) + mock_msg = MagicMock() + mock_msg.parts = [user_part] + mock_msg.instructions = None + + _set_input_messages(span, [mock_msg]) + span.finish() + + (event,) = events + span_data = event["spans"][0]["data"] + messages_data = _get_messages_from_span(span_data) + + # Verify both text and binary content are present + found_text = any( + content_item.get("type") == "text" + for msg in messages_data + if "content" in msg + for content_item in msg["content"] + ) + assert found_text, "Text content should be found" + assert _find_binary_content(messages_data, "image", "image/jpeg") + + +@pytest.mark.asyncio +async def test_binary_content_in_agent_run(sentry_init, capture_events): + """Test that BinaryContent in actual agent run is properly captured in spans.""" + agent = Agent("test", name="test_binary_agent") + + sentry_init( + integrations=[PydanticAIIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) + + events = capture_events() + binary_content = BinaryContent( + data=b"fake_image_data_for_testing", media_type="image/png" + ) + await agent.run(["Analyze this image:", binary_content]) + + (transaction,) = events + chat_spans = [s for s in transaction["spans"] if s["op"] == "gen_ai.chat"] + assert len(chat_spans) >= 1 + + chat_span = chat_spans[0] + if "gen_ai.request.messages" in chat_span["data"]: + messages_str = str(chat_span["data"]["gen_ai.request.messages"]) + assert any(keyword in messages_str for keyword in ["blob", "image", "base64"]) diff --git a/tests/test_ai_monitoring.py b/tests/test_ai_monitoring.py index 1ff354f473..f6852d54bb 100644 --- a/tests/test_ai_monitoring.py +++ b/tests/test_ai_monitoring.py @@ -19,6 +19,13 @@ _find_truncation_index, parse_data_uri, redact_blob_message_parts, + get_modality_from_mime_type, + transform_openai_content_part, + transform_anthropic_content_part, + transform_google_content_part, + transform_generic_content_part, + transform_content_part, + transform_message_content, ) from sentry_sdk.serializer import serialize from sentry_sdk.utils import safe_serialize @@ -842,3 +849,906 @@ def test_handles_uri_without_data_prefix(self): assert mime_type == "image/jpeg" assert content == "/9j/4AAQ" + + +class TestGetModalityFromMimeType: + def test_image_mime_types(self): + """Test that image MIME types return 'image' modality""" + assert get_modality_from_mime_type("image/jpeg") == "image" + assert get_modality_from_mime_type("image/png") == "image" + assert get_modality_from_mime_type("image/gif") == "image" + assert get_modality_from_mime_type("image/webp") == "image" + assert get_modality_from_mime_type("IMAGE/JPEG") == "image" # case insensitive + + def test_audio_mime_types(self): + """Test that audio MIME types return 'audio' modality""" + assert get_modality_from_mime_type("audio/mp3") == "audio" + assert get_modality_from_mime_type("audio/wav") == "audio" + assert get_modality_from_mime_type("audio/ogg") == "audio" + assert get_modality_from_mime_type("AUDIO/MP3") == "audio" # case insensitive + + def test_video_mime_types(self): + """Test that video MIME types return 'video' modality""" + assert get_modality_from_mime_type("video/mp4") == "video" + assert get_modality_from_mime_type("video/webm") == "video" + assert get_modality_from_mime_type("video/quicktime") == "video" + assert get_modality_from_mime_type("VIDEO/MP4") == "video" # case insensitive + + def test_document_mime_types(self): + """Test that application and text MIME types return 'document' modality""" + assert get_modality_from_mime_type("application/pdf") == "document" + assert get_modality_from_mime_type("application/json") == "document" + assert get_modality_from_mime_type("text/plain") == "document" + assert get_modality_from_mime_type("text/html") == "document" + + def test_empty_mime_type_returns_image(self): + """Test that empty MIME type defaults to 'image'""" + assert get_modality_from_mime_type("") == "image" + + def test_none_mime_type_returns_image(self): + """Test that None-like values default to 'image'""" + assert get_modality_from_mime_type(None) == "image" + + def test_unknown_mime_type_returns_image(self): + """Test that unknown MIME types default to 'image'""" + assert get_modality_from_mime_type("unknown/type") == "image" + assert get_modality_from_mime_type("custom/format") == "image" + + +class TestTransformOpenAIContentPart: + """Tests for the OpenAI-specific transform function.""" + + def test_image_url_with_data_uri(self): + """Test transforming OpenAI image_url with base64 data URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64,/9j/4AAQSkZJRg=="}, + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_image_url_with_regular_url(self): + """Test transforming OpenAI image_url with regular URL""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_image_url_string_format(self): + """Test transforming OpenAI image_url where image_url is a string""" + content_part = { + "type": "image_url", + "image_url": "https://example.com/image.jpg", + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_image_url_invalid_data_uri(self): + """Test transforming OpenAI image_url with invalid data URI falls back to URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64"}, # Missing comma + } + result = transform_openai_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "data:image/jpeg;base64", + } + + def test_empty_url_returns_none(self): + """Test that image_url with empty URL returns None""" + content_part = {"type": "image_url", "image_url": {"url": ""}} + assert transform_openai_content_part(content_part) is None + + def test_non_image_url_type_returns_none(self): + """Test that non-image_url types return None""" + content_part = {"type": "text", "text": "Hello"} + assert transform_openai_content_part(content_part) is None + + def test_anthropic_format_returns_none(self): + """Test that Anthropic format returns None (not handled)""" + content_part = { + "type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": "abc"}, + } + assert transform_openai_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_openai_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_openai_content_part("string") is None + assert transform_openai_content_part(123) is None + assert transform_openai_content_part(None) is None + + +class TestTransformAnthropicContentPart: + """Tests for the Anthropic-specific transform function.""" + + def test_image_base64(self): + """Test transforming Anthropic image with base64 source""" + content_part = { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": "iVBORw0KGgo=", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0KGgo=", + } + + def test_image_url(self): + """Test transforming Anthropic image with URL source""" + content_part = { + "type": "image", + "source": { + "type": "url", + "media_type": "image/jpeg", + "url": "https://example.com/image.jpg", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "image/jpeg", + "uri": "https://example.com/image.jpg", + } + + def test_image_file(self): + """Test transforming Anthropic image with file source""" + content_part = { + "type": "image", + "source": { + "type": "file", + "media_type": "image/jpeg", + "file_id": "file_123", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "file", + "modality": "image", + "mime_type": "image/jpeg", + "file_id": "file_123", + } + + def test_document_base64(self): + """Test transforming Anthropic document with base64 source""" + content_part = { + "type": "document", + "source": { + "type": "base64", + "media_type": "application/pdf", + "data": "JVBERi0xLjQ=", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "document", + "mime_type": "application/pdf", + "content": "JVBERi0xLjQ=", + } + + def test_document_url(self): + """Test transforming Anthropic document with URL source""" + content_part = { + "type": "document", + "source": { + "type": "url", + "media_type": "application/pdf", + "url": "https://example.com/doc.pdf", + }, + } + result = transform_anthropic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "document", + "mime_type": "application/pdf", + "uri": "https://example.com/doc.pdf", + } + + def test_invalid_source_returns_none(self): + """Test that Anthropic format with invalid source returns None""" + content_part = {"type": "image", "source": "not_a_dict"} + assert transform_anthropic_content_part(content_part) is None + + def test_unknown_source_type_returns_none(self): + """Test that Anthropic format with unknown source type returns None""" + content_part = { + "type": "image", + "source": {"type": "unknown", "data": "something"}, + } + assert transform_anthropic_content_part(content_part) is None + + def test_missing_source_returns_none(self): + """Test that Anthropic format without source returns None""" + content_part = {"type": "image", "data": "something"} + assert transform_anthropic_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_anthropic_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_anthropic_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_anthropic_content_part("string") is None + assert transform_anthropic_content_part(123) is None + assert transform_anthropic_content_part(None) is None + + +class TestTransformGoogleContentPart: + """Tests for the Google GenAI-specific transform function.""" + + def test_inline_data(self): + """Test transforming Google inline_data format""" + content_part = { + "inline_data": { + "mime_type": "image/jpeg", + "data": "/9j/4AAQSkZJRg==", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_file_data(self): + """Test transforming Google file_data format""" + content_part = { + "file_data": { + "mime_type": "video/mp4", + "file_uri": "gs://bucket/video.mp4", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "video", + "mime_type": "video/mp4", + "uri": "gs://bucket/video.mp4", + } + + def test_inline_data_audio(self): + """Test transforming Google inline_data with audio""" + content_part = { + "inline_data": { + "mime_type": "audio/wav", + "data": "UklGRiQA", + } + } + result = transform_google_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + def test_inline_data_not_dict_returns_none(self): + """Test that Google inline_data with non-dict value returns None""" + content_part = {"inline_data": "not_a_dict"} + assert transform_google_content_part(content_part) is None + + def test_file_data_not_dict_returns_none(self): + """Test that Google file_data with non-dict value returns None""" + content_part = {"file_data": "not_a_dict"} + assert transform_google_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_google_content_part(content_part) is None + + def test_anthropic_format_returns_none(self): + """Test that Anthropic format returns None (not handled)""" + content_part = { + "type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": "abc"}, + } + assert transform_google_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_google_content_part("string") is None + assert transform_google_content_part(123) is None + assert transform_google_content_part(None) is None + + +class TestTransformGenericContentPart: + """Tests for the generic/LangChain-style transform function.""" + + def test_image_base64(self): + """Test transforming generic format with base64""" + content_part = { + "type": "image", + "base64": "/9j/4AAQSkZJRg==", + "mime_type": "image/jpeg", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_audio_url(self): + """Test transforming generic format with URL""" + content_part = { + "type": "audio", + "url": "https://example.com/audio.mp3", + "mime_type": "audio/mp3", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "audio", + "mime_type": "audio/mp3", + "uri": "https://example.com/audio.mp3", + } + + def test_file_with_file_id(self): + """Test transforming generic format with file_id""" + content_part = { + "type": "file", + "file_id": "file_456", + "mime_type": "application/pdf", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "file", + "modality": "document", + "mime_type": "application/pdf", + "file_id": "file_456", + } + + def test_video_base64(self): + """Test transforming generic video format""" + content_part = { + "type": "video", + "base64": "AAAA", + "mime_type": "video/mp4", + } + result = transform_generic_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "video", + "mime_type": "video/mp4", + "content": "AAAA", + } + + def test_image_with_source_returns_none(self): + """Test that image with source key (Anthropic style) returns None""" + # This is Anthropic format, should NOT be handled by generic + content_part = { + "type": "image", + "source": {"type": "base64", "data": "abc"}, + } + assert transform_generic_content_part(content_part) is None + + def test_text_type_returns_none(self): + """Test that text type returns None""" + content_part = {"type": "text", "text": "Hello"} + assert transform_generic_content_part(content_part) is None + + def test_openai_format_returns_none(self): + """Test that OpenAI format returns None (not handled)""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com"}, + } + assert transform_generic_content_part(content_part) is None + + def test_google_format_returns_none(self): + """Test that Google format returns None (not handled)""" + content_part = {"inline_data": {"mime_type": "image/jpeg", "data": "abc"}} + assert transform_generic_content_part(content_part) is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_generic_content_part("string") is None + assert transform_generic_content_part(123) is None + assert transform_generic_content_part(None) is None + + def test_missing_data_key_returns_none(self): + """Test that missing data key (base64/url/file_id) returns None""" + content_part = {"type": "image", "mime_type": "image/jpeg"} + assert transform_generic_content_part(content_part) is None + + +class TestTransformContentPart: + # OpenAI/LiteLLM format tests + def test_openai_image_url_with_data_uri(self): + """Test transforming OpenAI image_url with base64 data URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64,/9j/4AAQSkZJRg=="}, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_openai_image_url_with_regular_url(self): + """Test transforming OpenAI image_url with regular URL""" + content_part = { + "type": "image_url", + "image_url": {"url": "https://example.com/image.jpg"}, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_openai_image_url_string_format(self): + """Test transforming OpenAI image_url where image_url is a string""" + content_part = { + "type": "image_url", + "image_url": "https://example.com/image.jpg", + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/image.jpg", + } + + def test_openai_image_url_invalid_data_uri(self): + """Test transforming OpenAI image_url with invalid data URI falls back to URI""" + content_part = { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64"}, # Missing comma + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "data:image/jpeg;base64", + } + + # Anthropic format tests + def test_anthropic_image_base64(self): + """Test transforming Anthropic image with base64 source""" + content_part = { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": "iVBORw0KGgo=", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0KGgo=", + } + + def test_anthropic_image_url(self): + """Test transforming Anthropic image with URL source""" + content_part = { + "type": "image", + "source": { + "type": "url", + "media_type": "image/jpeg", + "url": "https://example.com/image.jpg", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "image", + "mime_type": "image/jpeg", + "uri": "https://example.com/image.jpg", + } + + def test_anthropic_image_file(self): + """Test transforming Anthropic image with file source""" + content_part = { + "type": "image", + "source": { + "type": "file", + "media_type": "image/jpeg", + "file_id": "file_123", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "file", + "modality": "image", + "mime_type": "image/jpeg", + "file_id": "file_123", + } + + def test_anthropic_document_base64(self): + """Test transforming Anthropic document with base64 source""" + content_part = { + "type": "document", + "source": { + "type": "base64", + "media_type": "application/pdf", + "data": "JVBERi0xLjQ=", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "document", + "mime_type": "application/pdf", + "content": "JVBERi0xLjQ=", + } + + def test_anthropic_document_url(self): + """Test transforming Anthropic document with URL source""" + content_part = { + "type": "document", + "source": { + "type": "url", + "media_type": "application/pdf", + "url": "https://example.com/doc.pdf", + }, + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "document", + "mime_type": "application/pdf", + "uri": "https://example.com/doc.pdf", + } + + # Google format tests + def test_google_inline_data(self): + """Test transforming Google inline_data format""" + content_part = { + "inline_data": { + "mime_type": "image/jpeg", + "data": "/9j/4AAQSkZJRg==", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_google_file_data(self): + """Test transforming Google file_data format""" + content_part = { + "file_data": { + "mime_type": "video/mp4", + "file_uri": "gs://bucket/video.mp4", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "video", + "mime_type": "video/mp4", + "uri": "gs://bucket/video.mp4", + } + + def test_google_inline_data_audio(self): + """Test transforming Google inline_data with audio""" + content_part = { + "inline_data": { + "mime_type": "audio/wav", + "data": "UklGRiQA", + } + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + # Generic format tests (LangChain style) + def test_generic_image_base64(self): + """Test transforming generic format with base64""" + content_part = { + "type": "image", + "base64": "/9j/4AAQSkZJRg==", + "mime_type": "image/jpeg", + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQSkZJRg==", + } + + def test_generic_audio_url(self): + """Test transforming generic format with URL""" + content_part = { + "type": "audio", + "url": "https://example.com/audio.mp3", + "mime_type": "audio/mp3", + } + result = transform_content_part(content_part) + + assert result == { + "type": "uri", + "modality": "audio", + "mime_type": "audio/mp3", + "uri": "https://example.com/audio.mp3", + } + + def test_generic_file_with_file_id(self): + """Test transforming generic format with file_id""" + content_part = { + "type": "file", + "file_id": "file_456", + "mime_type": "application/pdf", + } + result = transform_content_part(content_part) + + assert result == { + "type": "file", + "modality": "document", + "mime_type": "application/pdf", + "file_id": "file_456", + } + + def test_generic_video_base64(self): + """Test transforming generic video format""" + content_part = { + "type": "video", + "base64": "AAAA", + "mime_type": "video/mp4", + } + result = transform_content_part(content_part) + + assert result == { + "type": "blob", + "modality": "video", + "mime_type": "video/mp4", + "content": "AAAA", + } + + # Edge cases and error handling + def test_text_block_returns_none(self): + """Test that text blocks return None (not transformed)""" + content_part = {"type": "text", "text": "Hello world"} + result = transform_content_part(content_part) + + assert result is None + + def test_non_dict_returns_none(self): + """Test that non-dict input returns None""" + assert transform_content_part("string") is None + assert transform_content_part(123) is None + assert transform_content_part(None) is None + assert transform_content_part([1, 2, 3]) is None + + def test_empty_dict_returns_none(self): + """Test that empty dict returns None""" + assert transform_content_part({}) is None + + def test_unknown_type_returns_none(self): + """Test that unknown type returns None""" + content_part = {"type": "unknown", "data": "something"} + assert transform_content_part(content_part) is None + + def test_openai_image_url_empty_url_returns_none(self): + """Test that image_url with empty URL returns None""" + content_part = {"type": "image_url", "image_url": {"url": ""}} + assert transform_content_part(content_part) is None + + def test_anthropic_invalid_source_returns_none(self): + """Test that Anthropic format with invalid source returns None""" + content_part = {"type": "image", "source": "not_a_dict"} + assert transform_content_part(content_part) is None + + def test_anthropic_unknown_source_type_returns_none(self): + """Test that Anthropic format with unknown source type returns None""" + content_part = { + "type": "image", + "source": {"type": "unknown", "data": "something"}, + } + assert transform_content_part(content_part) is None + + def test_google_inline_data_not_dict_returns_none(self): + """Test that Google inline_data with non-dict value returns None""" + content_part = {"inline_data": "not_a_dict"} + assert transform_content_part(content_part) is None + + def test_google_file_data_not_dict_returns_none(self): + """Test that Google file_data with non-dict value returns None""" + content_part = {"file_data": "not_a_dict"} + assert transform_content_part(content_part) is None + + +class TestTransformMessageContent: + def test_string_content_returned_as_is(self): + """Test that string content is returned unchanged""" + content = "Hello, world!" + result = transform_message_content(content) + + assert result == "Hello, world!" + + def test_list_with_transformable_items(self): + """Test transforming a list with transformable content parts""" + content = [ + {"type": "text", "text": "What's in this image?"}, + { + "type": "image_url", + "image_url": {"url": "data:image/jpeg;base64,/9j/4AAQ"}, + }, + ] + result = transform_message_content(content) + + assert len(result) == 2 + # Text block should be unchanged (transform returns None, so original kept) + assert result[0] == {"type": "text", "text": "What's in this image?"} + # Image should be transformed + assert result[1] == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQ", + } + + def test_list_with_non_dict_items(self): + """Test that non-dict items in list are kept as-is""" + content = ["text string", 123, {"type": "text", "text": "hi"}] + result = transform_message_content(content) + + assert result == ["text string", 123, {"type": "text", "text": "hi"}] + + def test_tuple_content(self): + """Test that tuple content is also handled""" + content = ( + {"type": "text", "text": "Hello"}, + { + "type": "image_url", + "image_url": {"url": "https://example.com/img.jpg"}, + }, + ) + result = transform_message_content(content) + + assert len(result) == 2 + assert result[0] == {"type": "text", "text": "Hello"} + assert result[1] == { + "type": "uri", + "modality": "image", + "mime_type": "", + "uri": "https://example.com/img.jpg", + } + + def test_other_types_returned_as_is(self): + """Test that other types are returned unchanged""" + assert transform_message_content(123) == 123 + assert transform_message_content(None) is None + assert transform_message_content({"key": "value"}) == {"key": "value"} + + def test_mixed_content_types(self): + """Test transforming mixed content with multiple formats""" + content = [ + {"type": "text", "text": "Look at these:"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,iVBORw0"}, + }, + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": "/9j/4AAQ", + }, + }, + {"inline_data": {"mime_type": "audio/wav", "data": "UklGRiQA"}}, + ] + result = transform_message_content(content) + + assert len(result) == 4 + assert result[0] == {"type": "text", "text": "Look at these:"} + assert result[1] == { + "type": "blob", + "modality": "image", + "mime_type": "image/png", + "content": "iVBORw0", + } + assert result[2] == { + "type": "blob", + "modality": "image", + "mime_type": "image/jpeg", + "content": "/9j/4AAQ", + } + assert result[3] == { + "type": "blob", + "modality": "audio", + "mime_type": "audio/wav", + "content": "UklGRiQA", + } + + def test_empty_list(self): + """Test that empty list is returned as empty list""" + assert transform_message_content([]) == []