Launch Week Day 1: Socket for Jira Is Now Available.Learn More
Socket
Book a DemoSign in
Socket

crewplus

Package Overview
Dependencies
Maintainers
2
Versions
95
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

crewplus - pypi Package Compare versions

Comparing version
0.2.95
to
0.2.96
+563
tests/test_gemini3_pro_preview.py
"""
Tests for Gemini 3 Pro Preview model (gemini-3-pro-preview@us-central1).
This test suite validates the Gemini 3 Pro Preview model functionality,
including basic inference, tool binding, and advanced capabilities.
Langfuse tracing is enabled for integration tests to track performance and usage.
"""
import os
import sys
from pathlib import Path
import pytest
from typing import Optional
from pydantic import BaseModel, Field
from google.genai import types
from langchain_core.tools import BaseTool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
# Add project root to path
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from crewplus.services import init_load_balancer, get_model_balancer
# =============================================================================
# Test Tools Definition
# =============================================================================
class SearchInput(BaseModel):
"""Input schema for search tool."""
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum number of results to return")
class SearchTool(BaseTool):
"""A mock search tool."""
name: str = "web_search"
description: str = (
"Searches the web for information. "
"Returns search results for the given query."
)
args_schema: type[BaseModel] = SearchInput
def _run(self, query: str, max_results: int = 5) -> str:
"""Execute the search."""
# Mock response
return f"Found {max_results} results for '{query}': Result 1: Sample content about {query}..."
async def _arun(self, query: str, max_results: int = 5) -> str:
"""Async version of _run."""
return self._run(query, max_results)
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
operation: str = Field(
description="The operation to perform: 'add', 'subtract', 'multiply', or 'divide'"
)
a: float = Field(description="The first number")
b: float = Field(description="The second number")
class CalculatorTool(BaseTool):
"""A simple calculator tool for basic arithmetic operations."""
name: str = "calculator"
description: str = (
"Performs basic arithmetic operations (add, subtract, multiply, divide). "
"Use this tool when you need to calculate numerical results. "
"Provide the operation type and two numbers."
)
args_schema: type[BaseModel] = CalculatorInput
def _run(self, operation: str, a: float, b: float) -> str:
"""Execute the calculator operation."""
try:
if operation == "add":
result = a + b
return f"The result of {a} + {b} is {result}"
elif operation == "subtract":
result = a - b
return f"The result of {a} - {b} is {result}"
elif operation == "multiply":
result = a * b
return f"The result of {a} × {b} is {result}"
elif operation == "divide":
if b == 0:
return "Error: Cannot divide by zero"
result = a / b
return f"The result of {a} ÷ {b} is {result}"
else:
return f"Error: Unknown operation '{operation}'"
except Exception as e:
return f"Error performing calculation: {str(e)}"
async def _arun(self, operation: str, a: float, b: float) -> str:
"""Async version of _run."""
return self._run(operation, a, b)
class CodeAnalysisInput(BaseModel):
"""Input schema for code analysis tool."""
code: str = Field(description="The code to analyze")
language: str = Field(default="python", description="Programming language")
class CodeAnalysisTool(BaseTool):
"""A mock code analysis tool."""
name: str = "analyze_code"
description: str = (
"Analyzes code for potential issues, complexity, and best practices. "
"Returns an analysis report."
)
args_schema: type[BaseModel] = CodeAnalysisInput
def _run(self, code: str, language: str = "python") -> str:
"""Execute the code analysis."""
# Mock response
lines = len(code.split('\n'))
return f"Code analysis for {language}: {lines} lines analyzed. No critical issues found."
async def _arun(self, code: str, language: str = "python") -> str:
"""Async version of _run."""
return self._run(code, language)
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture(scope="module")
def langfuse_config():
"""Configure Langfuse environment variables for tracing."""
os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv(
"LANGFUSE_PUBLIC_KEY",
"pk-lf-874857f5-6bad-4141-96eb-cf36f70009e6"
)
os.environ["LANGFUSE_SECRET_KEY"] = os.getenv(
"LANGFUSE_SECRET_KEY",
"sk-lf-3fe02b88-be46-4394-8da0-9ec409660de1"
)
os.environ["LANGFUSE_HOST"] = os.getenv(
"LANGFUSE_HOST",
"https://langfuse-test.crewplus.ai"
)
yield {
"public_key": os.environ["LANGFUSE_PUBLIC_KEY"],
"secret_key": os.environ["LANGFUSE_SECRET_KEY"],
"host": os.environ["LANGFUSE_HOST"]
}
@pytest.fixture(scope="module")
def model_balancer(langfuse_config):
"""Initialize and return the model load balancer."""
config_path = PROJECT_ROOT / "_config" / "models_config.json"
if not config_path.exists():
pytest.skip(f"Config file not found: {config_path}")
init_load_balancer(str(config_path))
return get_model_balancer()
@pytest.fixture
def gemini3_model(model_balancer, request):
"""Create a Gemini 3 Pro Preview model instance for testing."""
deployment_name = "gemini-3-pro-preview@us-central1"
try:
model = model_balancer.get_model(deployment_name=deployment_name)
# Enable tracing for integration tests
if hasattr(model, 'enable_tracing'):
if hasattr(request, 'node') and request.node.get_closest_marker('integration'):
model.enable_tracing = True
else:
model.enable_tracing = False
return model
except Exception as e:
pytest.skip(f"Could not get model '{deployment_name}': {e}")
@pytest.fixture
def calculator_tool():
"""Create a calculator tool instance."""
return CalculatorTool()
@pytest.fixture
def search_tool():
"""Create a search tool instance."""
return SearchTool()
@pytest.fixture
def code_analysis_tool():
"""Create a code analysis tool instance."""
return CodeAnalysisTool()
# =============================================================================
# Test Basic Model Functionality
# =============================================================================
class TestGemini3BasicFunctionality:
"""Tests for basic Gemini 3 Pro Preview model functionality."""
def test_model_initialization(self, gemini3_model):
"""Test that the model initializes correctly."""
assert gemini3_model is not None
assert hasattr(gemini3_model, 'invoke')
assert hasattr(gemini3_model, 'bind_tools')
@pytest.mark.integration
def test_simple_inference(self, gemini3_model):
"""Test basic inference without tools."""
query = "What is the capital of France?"
response = gemini3_model.invoke(query)
assert response is not None
assert isinstance(response, AIMessage)
assert response.content is not None
assert len(response.content) > 0
# Should mention Paris in some form
assert "paris" in response.content.lower()
@pytest.mark.integration
def test_multi_turn_conversation(self, gemini3_model):
"""Test multi-turn conversation."""
messages = [
HumanMessage(content="My favorite color is blue."),
]
response1 = gemini3_model.invoke(messages)
assert response1 is not None
messages.append(response1)
messages.append(HumanMessage(content="What is my favorite color?"))
response2 = gemini3_model.invoke(messages)
assert response2 is not None
assert "blue" in response2.content.lower()
@pytest.mark.integration
def test_reasoning_capability(self, gemini3_model):
"""Test advanced reasoning capabilities."""
query = """
If a train leaves Station A at 10:00 AM traveling at 60 mph,
and another train leaves Station B (180 miles away) at 10:30 AM
traveling toward Station A at 90 mph, at what time will they meet?
"""
response = gemini3_model.invoke(query)
assert response is not None
assert response.content is not None
# Check that it attempted to solve the problem
assert len(response.content) > 50 # Should have a detailed answer
# =============================================================================
# Test Tool Binding and Conversion
# =============================================================================
class TestGemini3ToolBinding:
"""Tests for tool binding with Gemini 3 Pro Preview."""
def test_bind_single_tool(self, gemini3_model, calculator_tool):
"""Test binding a single tool to the model."""
model_with_tools = gemini3_model.bind_tools([calculator_tool])
assert model_with_tools is not None
assert hasattr(model_with_tools, "kwargs")
assert "tools" in model_with_tools.kwargs
assert len(model_with_tools.kwargs["tools"]) == 1
def test_bind_multiple_tools(self, gemini3_model, calculator_tool, search_tool, code_analysis_tool):
"""Test binding multiple tools to the model."""
model_with_tools = gemini3_model.bind_tools([
calculator_tool,
search_tool,
code_analysis_tool
])
assert len(model_with_tools.kwargs["tools"]) == 3
tool_names = {t.name for t in model_with_tools.kwargs["tools"]}
assert tool_names == {"calculator", "web_search", "analyze_code"}
def test_tool_declaration_structure(self, gemini3_model, calculator_tool):
"""Test the structure of tool declarations."""
func_decl = gemini3_model._convert_langchain_tool_to_gemini_declaration(calculator_tool)
assert isinstance(func_decl, types.FunctionDeclaration)
assert func_decl.name == "calculator"
assert func_decl.description is not None
assert func_decl.parameters.type == types.Type.OBJECT
assert "operation" in func_decl.parameters.properties
assert "a" in func_decl.parameters.properties
assert "b" in func_decl.parameters.properties
# =============================================================================
# Test Tool Invocation
# =============================================================================
@pytest.mark.integration
class TestGemini3ToolInvocation:
"""Tests for tool invocation with Gemini 3 Pro Preview."""
def test_calculator_tool_invocation(self, gemini3_model, calculator_tool):
"""Test calculator tool invocation."""
model_with_tools = gemini3_model.bind_tools([calculator_tool])
query = "Calculate 123 multiplied by 456 using the calculator tool"
response = model_with_tools.invoke(query)
print(f"\n{'='*60}")
print(f"Query: {query}")
print(f"Response content: {response.content}")
if hasattr(response, 'tool_calls'):
print(f"Tool calls: {response.tool_calls}")
print(f"{'='*60}\n")
assert response is not None
# Check if tool was called or answer was provided directly
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_call = response.tool_calls[0]
assert tool_call['name'] == 'calculator'
assert 'args' in tool_call
def test_search_tool_invocation(self, gemini3_model, search_tool):
"""Test search tool invocation."""
model_with_tools = gemini3_model.bind_tools([search_tool])
query = "Search for information about quantum computing"
response = model_with_tools.invoke(query)
print(f"\n{'='*60}")
print(f"Query: {query}")
print(f"Response content: {response.content}")
if hasattr(response, 'tool_calls'):
print(f"Tool calls: {response.tool_calls}")
print(f"{'='*60}\n")
assert response is not None
def test_code_analysis_tool_invocation(self, gemini3_model, code_analysis_tool):
"""Test code analysis tool invocation."""
model_with_tools = gemini3_model.bind_tools([code_analysis_tool])
query = """
Analyze this Python code:
def factorial(n):
if n == 0:
return 1
return n * factorial(n - 1)
"""
response = model_with_tools.invoke(query)
print(f"\n{'='*60}")
print(f"Query: {query}")
print(f"Response content: {response.content}")
if hasattr(response, 'tool_calls'):
print(f"Tool calls: {response.tool_calls}")
print(f"{'='*60}\n")
assert response is not None
def test_complete_tool_execution_loop(self, gemini3_model, calculator_tool):
"""Test complete tool execution: request -> call -> execution -> answer."""
model_with_tools = gemini3_model.bind_tools([calculator_tool])
# Step 1: Initial query
query = "What is 789 plus 321?"
response = model_with_tools.invoke(query)
print(f"\n{'='*60}")
print(f"Step 1: Initial Query")
print(f"Query: {query}")
print(f"Response content: {response.content}")
if hasattr(response, 'tool_calls'):
print(f"Tool calls: {response.tool_calls}")
# Step 2: Execute tools if called
if hasattr(response, 'tool_calls') and response.tool_calls:
messages = [
HumanMessage(content=query),
response
]
for tool_call in response.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
tool_id = tool_call["id"]
print(f"\nStep 2: Executing tool '{tool_name}'")
print(f"Arguments: {tool_args}")
if tool_name == "calculator":
tool_result = calculator_tool._run(**tool_args)
print(f"Result: {tool_result}")
messages.append(
ToolMessage(
content=tool_result,
tool_call_id=tool_id
)
)
# Step 3: Get final response
print("\nStep 3: Getting final response...")
final_response = model_with_tools.invoke(messages)
print(f"Final Answer: {final_response.content}")
print(f"{'='*60}\n")
assert final_response is not None
assert final_response.content is not None
# Should mention the result (1110)
assert "1110" in final_response.content or "1,110" in final_response.content
def test_multi_tool_selection(self, gemini3_model, calculator_tool, search_tool):
"""Test model's ability to select the appropriate tool."""
model_with_tools = gemini3_model.bind_tools([calculator_tool, search_tool])
# Test 1: Should use calculator
calc_query = "What is 99 times 88?"
calc_response = model_with_tools.invoke(calc_query)
print(f"\n{'='*60}")
print(f"Calculator Test Query: {calc_query}")
print(f"Response: {calc_response.content}")
if hasattr(calc_response, 'tool_calls') and calc_response.tool_calls:
print(f"Tool called: {calc_response.tool_calls[0]['name']}")
print(f"{'='*60}\n")
# Test 2: Should use search
search_query = "Search for the latest AI research papers"
search_response = model_with_tools.invoke(search_query)
print(f"\n{'='*60}")
print(f"Search Test Query: {search_query}")
print(f"Response: {search_response.content}")
if hasattr(search_response, 'tool_calls') and search_response.tool_calls:
print(f"Tool called: {search_response.tool_calls[0]['name']}")
print(f"{'='*60}\n")
assert calc_response is not None
assert search_response is not None
# =============================================================================
# Test Streaming
# =============================================================================
@pytest.mark.integration
class TestGemini3Streaming:
"""Tests for streaming with Gemini 3 Pro Preview."""
def test_basic_streaming(self, gemini3_model):
"""Test basic streaming without tools."""
query = "Write a short poem about artificial intelligence"
chunks = list(gemini3_model.stream(query))
assert len(chunks) > 0
assert any(chunk.content for chunk in chunks)
# Combine all chunks
full_content = "".join(chunk.content for chunk in chunks if chunk.content)
print(f"\n{'='*60}")
print(f"Streaming Test - Full Content:")
print(full_content)
print(f"{'='*60}\n")
assert len(full_content) > 0
def test_streaming_with_tools(self, gemini3_model, calculator_tool):
"""Test streaming with bound tools."""
model_with_tools = gemini3_model.bind_tools([calculator_tool])
query = "What is 15 times 24?"
chunks = list(model_with_tools.stream(query))
assert len(chunks) > 0
print(f"\n{'='*60}")
print(f"Streaming with Tools Test:")
print(f"Number of chunks: {len(chunks)}")
for i, chunk in enumerate(chunks[:5]): # Print first 5 chunks
print(f"Chunk {i}: {chunk.content if chunk.content else '(empty)'}")
if hasattr(chunk, 'tool_calls') and chunk.tool_calls:
print(f" Tool calls: {chunk.tool_calls}")
print(f"{'='*60}\n")
# =============================================================================
# Test Model Configuration
# =============================================================================
class TestGemini3Configuration:
"""Tests for Gemini 3 Pro Preview model configuration."""
def test_model_parameters(self, gemini3_model):
"""Test model parameters from config."""
# Should have temperature set from config
assert hasattr(gemini3_model, 'temperature')
# From config: temperature: 0.0
assert gemini3_model.temperature == 0.0
def test_vertex_ai_configuration(self, gemini3_model):
"""Test Vertex AI specific configuration."""
# Should be configured for Vertex AI
assert hasattr(gemini3_model, 'project_id')
assert hasattr(gemini3_model, 'location')
# From config: project_id: "curious-domain-456415-m9", location: "us-central1"
assert gemini3_model.project_id == "curious-domain-456415-m9"
assert gemini3_model.location == "us-central1"
# =============================================================================
# Test Error Handling
# =============================================================================
@pytest.mark.integration
class TestGemini3ErrorHandling:
"""Tests for error handling with Gemini 3 Pro Preview."""
def test_invalid_tool_arguments(self, gemini3_model, calculator_tool):
"""Test handling of invalid tool arguments."""
model_with_tools = gemini3_model.bind_tools([calculator_tool])
# Create a scenario that might lead to invalid arguments
query = "Use the calculator to divide by zero: 10 / 0"
response = model_with_tools.invoke(query)
# Model should either refuse or the tool should handle the error
assert response is not None
def test_empty_input(self, gemini3_model):
"""Test handling of empty input."""
try:
response = gemini3_model.invoke("")
# Some models might handle empty input gracefully
assert response is not None
except Exception as e:
# Or they might raise an error, which is also acceptable
assert e is not None
# =============================================================================
# Run Tests
# =============================================================================
if __name__ == "__main__":
# Run with: python test_gemini3_pro_preview.py
# Or: pytest test_gemini3_pro_preview.py -v
# Run only integration tests: pytest test_gemini3_pro_preview.py -v -m integration
# Run only unit tests: pytest test_gemini3_pro_preview.py -v -m "not integration"
pytest.main([__file__, "-v", "--tb=short"])
+34
-13
import os
import logging
from typing import Any, Dict, Iterator, List, Optional, AsyncIterator, Union
from google.oauth2 import service_account
from langchain_core.language_models import BaseChatModel

@@ -44,5 +45,7 @@ from langchain_core.messages import (

Authentication is handled via Google Cloud credentials:
- **Application Default Credentials (ADC):** Run `gcloud auth application-default login`
- **Service Account:** Provide `service_account_file` or set `GOOGLE_APPLICATION_CREDENTIALS`
- The SDK automatically uses the standard `google-auth-library` flow
- **Service Account (Recommended):** Provide `service_account_file` parameter or set
`GCP_SERVICE_ACCOUNT_FILE` environment variable (also supports `GOOGLE_APPLICATION_CREDENTIALS`)
- **Credentials Object:** Pass a pre-loaded `credentials` object directly
- **Application Default Credentials (ADC):** If no service account file is provided,
falls back to ADC (e.g., `gcloud auth application-default login`)

@@ -98,2 +101,3 @@ **Tracing Integration:**

service_account_file (Optional[str]): Path to GCP service account JSON file.
credentials (Optional[Any]): GCP credentials object (alternative to service_account_file).
capabilities (Dict[str, Dict]): Capability configuration with enabled flag and version.

@@ -195,2 +199,7 @@ logger (Optional[logging.Logger]): An optional logger instance.

)
credentials: Optional[Any] = Field(
default=None,
description="Google Cloud credentials object (alternative to service_account_file)",
exclude=True
)

@@ -265,15 +274,24 @@ # Capability configuration

# Set service account file from environment if available
if not self.service_account_file:
self.service_account_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
# Load credentials explicitly (following Gemini's pattern)
creds = self.credentials
if creds is None:
# Priority: service_account_file param > GCP_SERVICE_ACCOUNT_FILE > GOOGLE_APPLICATION_CREDENTIALS
sa_file = self.service_account_file or os.getenv("GCP_SERVICE_ACCOUNT_FILE") or os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
if sa_file:
try:
creds = service_account.Credentials.from_service_account_file(
sa_file,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
self.logger.info(f"Loaded GCP credentials from: {sa_file}")
except Exception as e:
error_msg = f"Failed to load credentials from service account file '{sa_file}': {e}"
self.logger.error(error_msg)
raise ValueError(error_msg)
else:
self.logger.info("No service account file specified, using Application Default Credentials (ADC)")
# Set environment variable for google-auth-library if service account file is provided
if self.service_account_file:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.service_account_file
self.logger.debug(f"Using service account file: {self.service_account_file}")
# If creds is still None, the client will use Application Default Credentials (ADC).
try:
# Initialize the AnthropicVertex clients (sync and async)
# The clients use google-auth-library for authentication
# Build beta headers based on enabled capabilities

@@ -288,5 +306,7 @@ beta_headers = self._build_beta_headers()

# Initialize the AnthropicVertex clients with explicit credentials
self._client = AnthropicVertex(
project_id=self.project_id,
region=self.location,
credentials=creds,
default_headers=beta_headers if beta_headers else None

@@ -299,2 +319,3 @@ )

region=self.location,
credentials=creds,
default_headers=beta_headers if beta_headers else None

@@ -301,0 +322,0 @@ )

@@ -0,10 +1,14 @@

import base64
import logging
import os
import asyncio
import logging
from typing import Any, Dict, Iterator, List, Optional, AsyncIterator, Union, Tuple
from typing import Any, Dict, Iterator, List, Optional, AsyncIterator, Union
import requests
from google import genai
from google.genai import types
from google.oauth2 import service_account
import base64
import requests
from langchain_core.callbacks import (
CallbackManagerForLLMRun,
AsyncCallbackManagerForLLMRun
)
from langchain_core.language_models import BaseChatModel

@@ -20,10 +24,8 @@ from langchain_core.messages import (

from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.callbacks import (
CallbackManagerForLLMRun,
AsyncCallbackManagerForLLMRun
)
from langchain_core.utils import convert_to_secret_str
from pydantic import Field, SecretStr
from langchain_core.utils import convert_to_secret_str
from .tracing_manager import TracingManager, TracingContext
from .tracing_manager import TracingManager
class GeminiChatModel(BaseChatModel):

@@ -336,3 +338,3 @@ """Custom chat model for Google Gemini, supporting text, image, and video.

sa_file = self.service_account_file or os.getenv("GCP_SERVICE_ACCOUNT_FILE")
self.logger.debug(f"Service account file: {sa_file}")
self.logger.info(f"Service account file: {sa_file}")
if sa_file:

@@ -344,2 +346,3 @@ try:

)
self.logger.info(f"Loaded GCP credentials from: {sa_file}")
except Exception as e:

@@ -349,3 +352,5 @@ error_msg = f"Failed to load credentials from service account file '{sa_file}': {e}"

raise ValueError(error_msg)
else:
self.logger.info("No service account file specified, using Application Default Credentials (ADC)")
# If creds is still None, the client will use Application Default Credentials (ADC).

@@ -352,0 +357,0 @@

Metadata-Version: 2.1
Name: crewplus
Version: 0.2.95
Version: 0.2.96
Summary: Base services for CrewPlus AI applications

@@ -5,0 +5,0 @@ Author-Email: Tim Liu <tim@opsmateai.com>

@@ -9,3 +9,3 @@ [build-system]

name = "crewplus"
version = "0.2.95"
version = "0.2.96"
description = "Base services for CrewPlus AI applications"

@@ -12,0 +12,0 @@ authors = [