Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/vertice-core/src/vertice_core/adk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Vertice ADK - Agent Development Kit (2026 Google-Native)."""

from vertice_core.adk.base import VerticeAgent
from vertice_core.adk.tools import ToolRegistry, vertice_tool
from vertice_core.adk.registry import ApiRegistry

__all__ = [
"VerticeAgent",
"ToolRegistry",
"vertice_tool",
"ApiRegistry",
]
4 changes: 4 additions & 0 deletions packages/vertice-core/src/vertice_core/adk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from vertice_core.providers.vertex_ai import VertexAIProvider
from vertice_core.memory.cortex.cortex import MemoryCortex
from vertice_core.messaging.events import SystemEvent, get_event_bus
from vertice_core.adk.registry import ApiRegistry


class VerticeAgent(abc.ABC):
Expand All @@ -33,6 +34,9 @@ def __init__(
self._provider = VertexAIProvider(project=project, location=location, model_name=model)
self._cortex = MemoryCortex()

# Dynamic Discovery Registry
self.apis = ApiRegistry(project=project, location=location)

def emit_event(self, event_type: str, payload: Mapping[str, Any]) -> None:
"""
Emit an internal telemetry event (fire-and-forget).
Expand Down
108 changes: 108 additions & 0 deletions packages/vertice-core/src/vertice_core/adk/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Vertice ADK - Dynamic API Registry for 2026 Google-Native ecosystem."""

from __future__ import annotations

import os
import logging
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

# --- Google Gen AI SDK (Vertex AI) ---
HAS_GENAI_SDK = False
try:
from google import genai
HAS_GENAI_SDK = True
except ImportError:
pass


class ApiRegistry:
"""
Dynamic registry for discovering and managing available APIs and models.
Ensures runtime compatibility across different environments (GCP vs local).
"""

def __init__(
self,
project: Optional[str] = None,
location: str = "global"
) -> None:
self.project = project or os.getenv("GOOGLE_CLOUD_PROJECT")
self.location = location
self._genai_client = None

# Check environment
self._is_gcp = self._check_gcp_environment()
logger.debug(f"ApiRegistry initialized. GCP detected: {self._is_gcp}")

def _check_gcp_environment(self) -> bool:
"""Heuristic to detect if running in a GCP environment."""
# Check for standard GCP environment variables
return any([
os.getenv("GOOGLE_CLOUD_PROJECT"),
os.getenv("K_SERVICE"), # Cloud Run
os.getenv("GAE_SERVICE"), # App Engine
os.path.exists("/var/run/secrets/google") # GKE
])

def _get_genai_client(self) -> Optional[Any]:
"""Lazy initialization of the GenAI client."""
if self._genai_client is not None:
return self._genai_client

if not HAS_GENAI_SDK:
logger.debug("google-genai SDK not installed, discovery disabled.")
return None

try:
# We don't want this to raise during instantiation of the registry
self._genai_client = genai.Client(
vertexai=True,
project=self.project,
location=self.location,
)
return self._genai_client
except Exception as e:
logger.warning(f"Failed to initialize GenAI client for discovery: {e}")
return None

async def discover_models(self) -> List[Dict[str, Any]]:
"""
Dynamically discover available Gemini models on Vertex AI.

Returns a list of model metadata. Returns an empty list if discovery is
unavailable or fails.
"""
client = self._get_genai_client()
if client is None:
return []

try:
# Note: client.aio.models.list is the pattern for async discovery in 2026 SDK
response = await client.aio.models.list(config={"page_size": 50})

# Handle both Pager and direct list response
models = getattr(response, "models", response)

return [
{
"name": getattr(m, "name", str(m)),
"display_name": getattr(m, "display_name", ""),
"description": getattr(m, "description", ""),
"capabilities": getattr(m, "supported_generation_methods", []),
}
for m in models
]
except Exception as e:
logger.warning(f"Dynamic model discovery failed: {e}")
return []

def get_status(self) -> Dict[str, Any]:
"""Return the current status of the registry and environment."""
return {
"is_gcp": self._is_gcp,
"has_sdk": HAS_GENAI_SDK,
"project": self.project,
"location": self.location
}
75 changes: 75 additions & 0 deletions tests/unit/test_vertice_adk_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest
import os
from unittest.mock import MagicMock, patch
from vertice_core.adk.base import VerticeAgent
from vertice_core.adk.registry import ApiRegistry
from typing import Dict, Any
from collections.abc import Mapping


class MockAgent(VerticeAgent):
"""Minimal agent for testing discovery."""
@property
def system_prompt(self):
return "Test prompt"

async def query(self, *, input: str | Mapping[str, Any], **kwargs: Any) -> Dict[str, Any]:
return {"output": "test"}


def test_api_registry_instantiation_no_gcp():
"""Valida se o ApiRegistry pode ser instanciado fora do GCP sem erro."""
# Garante que as env vars do GCP não estão presentes
with patch.dict(os.environ, {}, clear=True):
registry = ApiRegistry(project=None)
status = registry.get_status()

assert status["is_gcp"] is False
assert registry.project is None


def test_vertice_agent_instantiates_api_registry():
"""Valida se o VerticeAgent instancia o ApiRegistry automaticamente."""
agent = MockAgent(project="test-project")

assert hasattr(agent, "apis")
assert isinstance(agent.apis, ApiRegistry)
assert agent.apis.project == "test-project"


@pytest.mark.asyncio
async def test_api_registry_discovery_safe_fail():
"""Valida se a descoberta falha graciosamente sem o SDK ou credenciais."""
with patch("vertice_core.adk.registry.HAS_GENAI_SDK", False):
registry = ApiRegistry()
models = await registry.discover_models()
assert models == []


@pytest.mark.asyncio
async def test_api_registry_discovery_mocked():
"""Valida o fluxo de descoberta com o SDK mockado."""
mock_client = MagicMock()

# Mock para client.aio.models.list
mock_model = MagicMock()
mock_model.name = "models/gemini-3-pro"
mock_model.display_name = "Gemini 3 Pro"

mock_response = MagicMock()
mock_response.models = [mock_model]

# Mock async
async def mock_list(*args, **kwargs):
return mock_response

mock_client.aio.models.list = mock_list

with patch("vertice_core.adk.registry.HAS_GENAI_SDK", True):
with patch("google.genai.Client", return_value=mock_client):
registry = ApiRegistry(project="test", location="global")
models = await registry.discover_models()

assert len(models) == 1
assert models[0]["name"] == "models/gemini-3-pro"
assert models[0]["display_name"] == "Gemini 3 Pro"
Loading