Google SecOps SDK for Python
A Python SDK for interacting with Google Security Operations products, currently supporting Chronicle/SecOps SIEM.
This wraps the API for common use cases, including UDM searches, entity lookups, IoCs, alert management, case management, and detection rule management.
Installation
pip install secops
Command Line Interface
The SDK also provides a comprehensive command-line interface (CLI) that makes it easy to interact with Google Security Operations products from your terminal:
secops config set --customer-id "your-instance-id" --project-id "your-project-id" --region "us"
secops search --query "metadata.event_type = \"NETWORK_CONNECTION\""
For detailed CLI documentation and examples, see the CLI Documentation.
Authentication
The SDK supports two main authentication methods:
1. Application Default Credentials (ADC)
The simplest and recommended way to authenticate the SDK. Application Default Credentials provide a consistent authentication method that works across different Google Cloud environments and local development.
There are several ways to use ADC:
a. Using gcloud
CLI (Recommended for Local Development)
gcloud auth application-default login
Then in your code:
from secops import SecOpsClient
client = SecOpsClient()
b. Using Environment Variable
Set the environment variable pointing to your service account key:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Then in your code:
from secops import SecOpsClient
client = SecOpsClient()
c. Google Cloud Environment (Automatic)
When running on Google Cloud services (Compute Engine, Cloud Functions, Cloud Run, etc.), ADC works automatically without any configuration:
from secops import SecOpsClient
client = SecOpsClient()
ADC will automatically try these authentication methods in order:
- Environment variable
GOOGLE_APPLICATION_CREDENTIALS
- Google Cloud SDK credentials (set by
gcloud auth application-default login
)
- Google Cloud-provided service account credentials
- Local service account impersonation credentials
2. Service Account Authentication
For more explicit control, you can authenticate using a service account. This can be done in two ways:
a. Using a Service Account JSON File
from secops import SecOpsClient
client = SecOpsClient(service_account_path="/path/to/service-account.json")
b. Using Service Account Info Dictionary
from secops import SecOpsClient
service_account_info = {
"type": "service_account",
"project_id": "your-project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\n...",
"client_email": "service-account@project.iam.gserviceaccount.com",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/..."
}
client = SecOpsClient(service_account_info=service_account_info)
Using the Chronicle API
Initializing the Chronicle Client
After creating a SecOpsClient, you need to initialize the Chronicle-specific client:
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id",
project_id="your-project-id",
region="us"
)
See available regions
Log Ingestion
Ingest raw logs directly into Chronicle:
from datetime import datetime, timezone
import json
current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
okta_log = {
"actor": {
"displayName": "Joe Doe",
"alternateId": "jdoe@example.com"
},
"client": {
"ipAddress": "192.168.1.100",
"userAgent": {
"os": "Mac OS X",
"browser": "SAFARI"
}
},
"displayMessage": "User login to Okta",
"eventType": "user.session.start",
"outcome": {
"result": "SUCCESS"
},
"published": current_time
}
result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log)
)
print(f"Operation: {result.get('operation')}")
batch_logs = [
json.dumps({"actor": {"displayName": "User 1"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 2"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 3"}, "eventType": "user.session.start"})
]
batch_result = chronicle.ingest_log(
log_type="OKTA",
log_message=batch_logs
)
print(f"Batch operation: {batch_result.get('operation')}")
labeled_result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log),
labels={"environment": "production", "app": "web-portal", "team": "security"}
)
The SDK also supports non-JSON log formats. Here's an example with XML for Windows Event logs:
xml_content = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>
<System>
<Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54849625-5478-4994-A5BA-3E3B0328C30D}'/>
<EventID>4624</EventID>
<Version>1</Version>
<Level>0</Level>
<Task>12544</Task>
<Opcode>0</Opcode>
<Keywords>0x8020000000000000</Keywords>
<TimeCreated SystemTime='2024-05-10T14:30:00Z'/>
<EventRecordID>202117513</EventRecordID>
<Correlation/>
<Execution ProcessID='656' ThreadID='700'/>
<Channel>Security</Channel>
<Computer>WIN-SERVER.xyz.net</Computer>
<Security/>
</System>
<EventData>
<Data Name='SubjectUserSid'>S-1-0-0</Data>
<Data Name='SubjectUserName'>-</Data>
<Data Name='TargetUserName'>svcUser</Data>
<Data Name='WorkstationName'>CLIENT-PC</Data>
<Data Name='LogonType'>3</Data>
</EventData>
</Event>"""
result = chronicle.ingest_log(
log_type="WINEVTLOG_XML",
log_message=xml_content
)
print(f"Operation: {result.get('operation')}")
The SDK supports all log types available in Chronicle. You can:
- View available log types:
log_types = chronicle.get_all_log_types()
for lt in log_types[:5]:
print(f"{lt.id}: {lt.description}")
- Search for specific log types:
firewall_types = chronicle.search_log_types("firewall")
for lt in firewall_types:
print(f"{lt.id}: {lt.description}")
if chronicle.is_valid_log_type("OKTA"):
print("Valid log type")
else:
print("Invalid log type")
forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder")
forwarder_id = forwarder["name"].split("/")[-1]
result = chronicle.ingest_log(
log_type="WINDOWS",
log_message=json.dumps(windows_log),
forwarder_id=forwarder_id
)
from datetime import datetime, timedelta, timezone
log_entry_time = datetime.now(timezone.utc) - timedelta(hours=1)
collection_time = datetime.now(timezone.utc)
result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log),
log_entry_time=log_entry_time,
collection_time=collection_time
)
Ingest UDM events directly into Chronicle:
import uuid
from datetime import datetime, timezone
event_id = str(uuid.uuid4())
current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
network_event = {
"metadata": {
"id": event_id,
"event_timestamp": current_time,
"event_type": "NETWORK_CONNECTION",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"ip": "192.168.1.100",
"port": 12345
},
"target": {
"ip": "203.0.113.10",
"port": 443
},
"network": {
"application_protocol": "HTTPS",
"direction": "OUTBOUND"
}
}
result = chronicle.ingest_udm(udm_events=network_event)
print(f"Ingested event with ID: {event_id}")
process_event = {
"metadata": {
"event_timestamp": current_time,
"event_type": "PROCESS_LAUNCH",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"process": {
"command_line": "ping 8.8.8.8",
"pid": 1234
},
"user": {
"userid": "user123"
}
}
}
result = chronicle.ingest_udm(udm_events=[network_event, process_event])
print("Multiple events ingested successfully")
Data Export
Note: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.
You can export Chronicle logs to Google Cloud Storage using the Data Export API:
from datetime import datetime, timedelta, timezone
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=1)
available_log_types = chronicle.fetch_available_log_types(
start_time=start_time,
end_time=end_time
)
for log_type in available_log_types["available_log_types"]:
print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})")
print(f" Available from {log_type.start_time} to {log_type.end_time}")
export = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
log_type="GCP_DNS"
)
export_id = export["name"].split("/")[-1]
print(f"Created export with ID: {export_id}")
print(f"Status: {export['data_export_status']['stage']}")
status = chronicle.get_data_export(export_id)
print(f"Export status: {status['data_export_status']['stage']}")
print(f"Progress: {status['data_export_status'].get('progress_percentage', 0)}%")
if status['data_export_status']['stage'] in ['IN_QUEUE', 'PROCESSING']:
cancelled = chronicle.cancel_data_export(export_id)
print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}")
export_all = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
export_all_logs=True
)
print(f"Created export for all logs. Status: {export_all['data_export_status']['stage']}")
The Data Export API supports:
- Exporting one or all log types to Google Cloud Storage
- Checking export status and progress
- Cancelling exports in progress
- Fetching available log types for a specific time range
If you encounter any issues with the Data Export functionality, please submit them to our issue tracker with detailed information about the problem and steps to reproduce.
Basic UDM Search
Search for network connection events:
from datetime import datetime, timedelta, timezone
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24)
results = chronicle.search_udm(
query="""
metadata.event_type = "NETWORK_CONNECTION"
ip != ""
""",
start_time=start_time,
end_time=end_time,
max_events=5
)
{
"events": [
{
"name": "projects/my-project/locations/us/instances/my-instance/events/encoded-event-id",
"udm": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "NETWORK_CONNECTION"
},
"target": {
"ip": ["192.168.1.100"],
"port": 443
},
"principal": {
"hostname": "workstation-1"
}
}
}
],
"total_events": 1,
"more_data_available": false
}
Statistics Queries
Get statistics about network connections grouped by hostname:
stats = chronicle.get_stats(
query="""metadata.event_type = "NETWORK_CONNECTION"
match:
target.hostname
outcome:
$count = count(metadata.id)
order:
$count desc""",
start_time=start_time,
end_time=end_time,
max_events=1000,
max_values=10
)
{
"columns": ["hostname", "count"],
"rows": [
{"hostname": "server-1", "count": 1500},
{"hostname": "server-2", "count": 1200}
],
"total_rows": 2
}
CSV Export
Export specific fields to CSV format:
csv_data = chronicle.fetch_udm_search_csv(
query='metadata.event_type = "NETWORK_CONNECTION"',
start_time=start_time,
end_time=end_time,
fields=["timestamp", "user", "hostname", "process name"]
)
"""
metadata.eventTimestamp,principal.hostname,target.ip,target.port
2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443
2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80
"""
Query Validation
Validate a UDM query before execution:
query = 'target.ip != "" and principal.hostname = "test-host"'
validation = chronicle.validate_query(query)
{
"isValid": true,
"queryType": "QUERY_TYPE_UDM_QUERY",
"suggestedFields": [
"target.ip",
"principal.hostname"
]
}
Natural Language Search
Search for events using natural language instead of UDM query syntax:
from datetime import datetime, timedelta, timezone
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24)
udm_query = chronicle.translate_nl_to_udm("show me network connections")
print(f"Translated query: {udm_query}")
results = chronicle.search_udm(
query=udm_query,
start_time=start_time,
end_time=end_time
)
results = chronicle.nl_search(
text="show me failed login attempts",
start_time=start_time,
end_time=end_time,
max_events=100
)
{
"events": [
{
"event": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "USER_LOGIN"
},
"principal": {
"user": {
"userid": "jdoe"
}
},
"securityResult": {
"action": "BLOCK",
"summary": "Failed login attempt"
}
}
}
],
"total_events": 1
}
The natural language search feature supports various query patterns:
- "Show me network connections"
- "Find suspicious processes"
- "Show login failures in the last hour"
- "Display connections to IP address 192.168.1.100"
If the natural language cannot be translated to a valid UDM query, an APIError
will be raised with a message indicating that no valid query could be generated.
Entity Summary
Get detailed information about specific entities like IP addresses, domains, or file hashes. The function automatically detects the entity type based on the provided value and fetches a comprehensive summary including related entities, alerts, timeline, prevalence, and more.
ip_summary = chronicle.summarize_entity(
value="8.8.8.8",
start_time=start_time,
end_time=end_time
)
domain_summary = chronicle.summarize_entity(
value="google.com",
start_time=start_time,
end_time=end_time
)
file_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
file_summary = chronicle.summarize_entity(
value=file_hash,
start_time=start_time,
end_time=end_time
)
user_summary = chronicle.summarize_entity(
value="jdoe",
start_time=start_time,
end_time=end_time,
preferred_entity_type="USER"
)
List IoCs (Indicators of Compromise)
Retrieve IoC matches against ingested events:
iocs = chronicle.list_iocs(
start_time=start_time,
end_time=end_time,
max_matches=1000,
add_mandiant_attributes=True,
prioritized_only=False
)
for ioc in iocs['matches']:
ioc_type = next(iter(ioc['artifactIndicator'].keys()))
ioc_value = next(iter(ioc['artifactIndicator'].values()))
print(f"IoC Type: {ioc_type}, Value: {ioc_value}")
print(f"Sources: {', '.join(ioc['sources'])}")
The IoC response includes:
- The indicator itself (domain, IP, hash, etc.)
- Sources and categories
- Affected assets in your environment
- First and last seen timestamps
- Confidence scores and severity ratings
- Associated threat actors and malware families (with Mandiant attributes)
Alerts and Case Management
Retrieve alerts and their associated cases:
alerts = chronicle.get_alerts(
start_time=start_time,
end_time=end_time,
snapshot_query='feedback_summary.status != "CLOSED"',
max_alerts=100
)
alert_list = alerts.get('alerts', {}).get('alerts', [])
case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}
if case_ids:
cases = chronicle.get_cases(list(case_ids))
for case in cases.cases:
print(f"Case: {case.display_name}")
print(f"Priority: {case.priority}")
print(f"Status: {case.status}")
The alerts response includes:
- Progress status and completion status
- Alert counts (baseline and filtered)
- Alert details (rule information, detection details, etc.)
- Case associations
You can filter alerts using the snapshot query parameter with fields like:
detection.rule_name
detection.alert_state
feedback_summary.verdict
feedback_summary.priority
feedback_summary.status
Case Management Helpers
The CaseList
class provides helper methods for working with cases:
cases = chronicle.get_cases(["case-id-1", "case-id-2"])
high_priority = cases.filter_by_priority("PRIORITY_HIGH")
open_cases = cases.filter_by_status("STATUS_OPEN")
case = cases.get_case("case-id-1")
Rule Management
The SDK provides comprehensive support for managing Chronicle detection rules:
Creating Rules
Create new detection rules using YARA-L 2.0 syntax:
rule_text = """
rule simple_network_rule {
meta:
description = "Example rule to detect network connections"
author = "SecOps SDK Example"
severity = "Medium"
priority = "Medium"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$e.principal.hostname != ""
condition:
$e
}
"""
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
Managing Rules
Retrieve, list, update, enable/disable, and delete rules:
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
enabled = rule.get("deployment", {}).get("enabled", False)
print(f"Rule ID: {rule_id}, Enabled: {enabled}")
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)
deployment = chronicle.enable_rule(rule_id, enabled=True)
deployment = chronicle.enable_rule(rule_id, enabled=False)
chronicle.delete_rule(rule_id)
Searching Rules
Search for rules using regular expressions:
results = chronicle.search_rules("suspicious process")
for rule in results.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}, contains: 'suspicious process'")
mitre_rules = chronicle.search_rules("T1055")
print(f"Found {len(mitre_rules.get('rules', []))} rules mentioning T1055 technique")
Retrohunts
Run rules against historical data to find past matches:
from datetime import datetime, timedelta, timezone
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
Detections and Errors
Monitor rule detections and execution errors:
detections = chronicle.list_detections(rule_id)
for detection in detections.get("detections", []):
detection_id = detection.get("id", "")
event_time = detection.get("eventTime", "")
alerting = detection.get("alertState", "") == "ALERTING"
print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
error_message = error.get("error_message", "")
create_time = error.get("create_time", "")
print(f"Error: {error_message}, Time: {create_time}")
Rule Alerts
Search for alerts generated by rules:
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)
alerts_response = chronicle.search_rule_alerts(
start_time=start_time,
end_time=end_time,
page_size=10
)
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)
for rule_alert in alerts_response.get('ruleAlerts', []):
rule_metadata = rule_alert.get('ruleMetadata', {})
rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
rule_alerts = rule_alert.get('alerts', [])
for alert in rule_alerts:
alert_id = alert.get("id", "")
detection_time = alert.get("detectionTimestamp", "")
commit_time = alert.get("commitTimestamp", "")
alerting_type = alert.get("alertingType", "")
print(f"Alert ID: {alert_id}")
print(f"Rule ID: {rule_id}")
print(f"Rule Name: {rule_name}")
print(f"Detection Time: {detection_time}")
if 'resultEvents' in alert:
for var_name, event_data in alert.get('resultEvents', {}).items():
if 'eventSamples' in event_data:
for sample in event_data.get('eventSamples', []):
if 'event' in sample:
event = sample['event']
event_type = event.get('metadata', {}).get('eventType', 'Unknown')
print(f"Event Type: {event_type}")
If tooManyAlerts
is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.
Rule Sets
Manage curated rule sets:
deployments = [
{
"category_id": "category-uuid",
"rule_set_id": "ruleset-uuid",
"precision": "broad",
"enabled": True,
"alerting": False
}
]
chronicle.batch_update_curated_rule_set_deployments(deployments)
Rule Validation
Validate a YARA-L2 rule before creating or updating it:
rule_text = """
rule test_rule {
meta:
description = "Test rule for validation"
author = "Test Author"
severity = "Low"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
condition:
$e
}
"""
result = chronicle.validate_rule(rule_text)
if result.success:
print("Rule is valid")
else:
print(f"Rule is invalid: {result.message}")
if result.position:
print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}")
Gemini AI
You can use Chronicle's Gemini AI to get security insights, generate detection rules, explain security concepts, and more:
Note: Only enterprise tier users have access to Advanced Gemini features. Users must opt-in to use Gemini in Chronicle before accessing this functionality.
The SDK will automatically attempt to opt you in when you first use the Gemini functionality. If the automatic opt-in fails due to permission issues,
you'll see an error message that includes "users must opt-in before using Gemini."
response = chronicle.gemini("What is Windows event ID 4625?")
text_explanation = response.get_text_content()
print("Explanation:", text_explanation)
for block in response.blocks:
print(f"Block type: {block.block_type}")
if block.block_type == "TEXT":
print("Text content:", block.content)
elif block.block_type == "CODE":
print(f"Code ({block.title}):", block.content)
elif block.block_type == "HTML":
print("HTML content (with tags):", block.content)
code_blocks = response.get_code_blocks()
for code_block in code_blocks:
print(f"Code block ({code_block.title}):", code_block.content)
html_blocks = response.get_html_blocks()
for html_block in html_blocks:
print(f"HTML block (with tags):", html_block.content)
if response.references:
print(f"Found {len(response.references)} references")
for action in response.suggested_actions:
print(f"Suggested action: {action.display_text} ({action.action_type})")
if action.navigation:
print(f"Action URI: {action.navigation.target_uri}")
Response Content Methods
The GeminiResponse
class provides several methods to work with response content:
get_text_content()
: Returns a combined string of all TEXT blocks plus the text content from HTML blocks with HTML tags removed
get_code_blocks()
: Returns a list of blocks with block_type == "CODE"
get_html_blocks()
: Returns a list of blocks with block_type == "HTML"
(HTML tags preserved)
get_raw_response()
: Returns the complete, unprocessed API response as a dictionary
These methods help you work with different types of content in a structured way.
Accessing Raw API Response
For advanced use cases or debugging, you can access the raw API response:
response = chronicle.gemini("What is Windows event ID 4625?")
raw_response = response.get_raw_response()
print(json.dumps(raw_response, indent=2))
if "responses" in raw_response:
for resp in raw_response["responses"]:
if "blocks" in resp:
print(f"Found {len(resp['blocks'])} blocks in raw response")
This gives you direct access to the original API response format, which can be useful for accessing advanced features or troubleshooting.
Manual Opt-In
If your account has sufficient permissions, you can manually opt-in to Gemini before using it:
opt_success = chronicle.opt_in_to_gemini()
if opt_success:
print("Successfully opted in to Gemini")
else:
print("Unable to opt-in due to permission issues")
response = chronicle.gemini("What is Windows event ID 4625?")
This can be useful in environments where you want to explicitly control when the opt-in happens.
Generate Detection Rules
Chronicle Gemini can generate YARA-L rules for detection:
rule_response = chronicle.gemini("Write a rule to detect powershell downloading a file called gdp.zip")
code_blocks = rule_response.get_code_blocks()
if code_blocks:
rule = code_blocks[0].content
print("Generated rule:", rule)
for action in rule_response.suggested_actions:
if action.display_text == "Open in Rule Editor" and action.action_type == "NAVIGATION":
rule_editor_url = action.navigation.target_uri
print("Rule can be opened in editor:", rule_editor_url)
Get Intel Information
Get detailed information about malware, threat actors, files, vulnerabilities:
cve_response = chronicle.gemini("tell me about CVE-2021-44228")
cve_explanation = cve_response.get_text_content()
print("CVE explanation:", cve_explanation)
Maintain Conversation Context
You can maintain conversation context by reusing the same conversation ID:
initial_response = chronicle.gemini("What is a DDoS attack?")
conversation_id = initial_response.name.split('/')[-3]
followup_response = chronicle.gemini(
"What are the most common mitigation techniques?",
conversation_id=conversation_id
)
Error Handling
The SDK defines several custom exceptions:
from secops.exceptions import SecOpsError, AuthenticationError, APIError
try:
results = chronicle.search_udm(...)
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except APIError as e:
print(f"API request failed: {e}")
except SecOpsError as e:
print(f"General error: {e}")
Value Type Detection
The SDK automatically detects the most common entity types when using the summarize_entity
function:
- IP addresses (IPv4 and IPv6)
- MD5/SHA1/SHA256 hashes
- Domain names
- Email addresses
- MAC addresses
- Hostnames
This detection happens internally within summarize_entity
, simplifying its usage. You only need to provide the value
argument.
ip_summary = chronicle.summarize_entity(value="192.168.1.100", ...)
domain_summary = chronicle.summarize_entity(value="example.com", ...)
hash_summary = chronicle.summarize_entity(value="e17dd4eef8b4978673791ef4672f4f6a", ...)
You can optionally provide a preferred_entity_type
hint to summarize_entity
if the automatic detection might be ambiguous (e.g., a string could be a username or a hostname).
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.