
Security News
VulnCon 2025: NVD Scraps Industry Consortium Plan, Raising Questions About Reform
At VulnCon 2025, NIST scrapped its NVD consortium plans, admitted it can't keep up with CVEs, and outlined automation efforts amid a mounting backlog.
A Python SDK for interacting with Google Security Operations products, currently supporting Chronicle/SecOps SIEM. This wraps the API for common use cases, including UDM searches, entity lookups, IoCs, alert management, case management, and detection rule management.
pip install secops
The SDK supports two main authentication methods:
The simplest and recommended way to authenticate the SDK. Application Default Credentials provide a consistent authentication method that works across different Google Cloud environments and local development.
There are several ways to use ADC:
gcloud
CLI (Recommended for Local Development)# Login and set up application-default credentials
gcloud auth application-default login
Then in your code:
from secops import SecOpsClient
# Initialize with default credentials - no explicit configuration needed
client = SecOpsClient()
Set the environment variable pointing to your service account key:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Then in your code:
from secops import SecOpsClient
# Initialize with default credentials - will automatically use the credentials file
client = SecOpsClient()
When running on Google Cloud services (Compute Engine, Cloud Functions, Cloud Run, etc.), ADC works automatically without any configuration:
from secops import SecOpsClient
# Initialize with default credentials - will automatically use the service account
# assigned to your Google Cloud resource
client = SecOpsClient()
ADC will automatically try these authentication methods in order:
GOOGLE_APPLICATION_CREDENTIALS
gcloud auth application-default login
)For more explicit control, you can authenticate using a service account. This can be done in two ways:
from secops import SecOpsClient
# Initialize with service account JSON file
client = SecOpsClient(service_account_path="/path/to/service-account.json")
from secops import SecOpsClient
# Service account details as a dictionary
service_account_info = {
"type": "service_account",
"project_id": "your-project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\n...",
"client_email": "service-account@project.iam.gserviceaccount.com",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/..."
}
# Initialize with service account info
client = SecOpsClient(service_account_info=service_account_info)
After creating a SecOpsClient, you need to initialize the Chronicle-specific client:
# Initialize Chronicle client
chronicle = client.chronicle(
customer_id="your-chronicle-instance-id", # Your Chronicle instance ID
project_id="your-project-id", # Your GCP project ID
region="us" # Chronicle API region
)
Ingest raw logs directly into Chronicle:
from datetime import datetime, timezone
import json
# Create a sample log (this is an OKTA log)
current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
okta_log = {
"actor": {
"displayName": "Joe Doe",
"alternateId": "jdoe@example.com"
},
"client": {
"ipAddress": "192.168.1.100",
"userAgent": {
"os": "Mac OS X",
"browser": "SAFARI"
}
},
"displayMessage": "User login to Okta",
"eventType": "user.session.start",
"outcome": {
"result": "SUCCESS"
},
"published": current_time # Current time in ISO format
}
# Ingest a single log using the default forwarder
result = chronicle.ingest_log(
log_type="OKTA", # Chronicle log type
log_message=json.dumps(okta_log) # JSON string of the log
)
print(f"Operation: {result.get('operation')}")
# Batch ingestion: Ingest multiple logs in a single request
batch_logs = [
json.dumps({"actor": {"displayName": "User 1"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 2"}, "eventType": "user.session.start"}),
json.dumps({"actor": {"displayName": "User 3"}, "eventType": "user.session.start"})
]
# Ingest multiple logs in a single API call
batch_result = chronicle.ingest_log(
log_type="OKTA",
log_message=batch_logs # List of log message strings
)
print(f"Batch operation: {batch_result.get('operation')}")
The SDK also supports non-JSON log formats. Here's an example with XML for Windows Event logs:
# Create a Windows Event XML log
xml_content = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>
<System>
<Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54849625-5478-4994-A5BA-3E3B0328C30D}'/>
<EventID>4624</EventID>
<Version>1</Version>
<Level>0</Level>
<Task>12544</Task>
<Opcode>0</Opcode>
<Keywords>0x8020000000000000</Keywords>
<TimeCreated SystemTime='2024-05-10T14:30:00Z'/>
<EventRecordID>202117513</EventRecordID>
<Correlation/>
<Execution ProcessID='656' ThreadID='700'/>
<Channel>Security</Channel>
<Computer>WIN-SERVER.xyz.net</Computer>
<Security/>
</System>
<EventData>
<Data Name='SubjectUserSid'>S-1-0-0</Data>
<Data Name='SubjectUserName'>-</Data>
<Data Name='TargetUserName'>svcUser</Data>
<Data Name='WorkstationName'>CLIENT-PC</Data>
<Data Name='LogonType'>3</Data>
</EventData>
</Event>"""
# Ingest the XML log - no json.dumps() needed for XML
result = chronicle.ingest_log(
log_type="WINEVTLOG_XML", # Windows Event Log XML format
log_message=xml_content # Raw XML content
)
print(f"Operation: {result.get('operation')}")
The SDK supports all log types available in Chronicle. You can:
# Get all available log types
log_types = chronicle.get_all_log_types()
for lt in log_types[:5]: # Show first 5
print(f"{lt.id}: {lt.description}")
# Search for log types related to firewalls
firewall_types = chronicle.search_log_types("firewall")
for lt in firewall_types:
print(f"{lt.id}: {lt.description}")
# Check if a log type is valid
if chronicle.is_valid_log_type("OKTA"):
print("Valid log type")
else:
print("Invalid log type")
# Create or get a custom forwarder
forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder")
forwarder_id = forwarder["name"].split("/")[-1]
# Use the custom forwarder for log ingestion
result = chronicle.ingest_log(
log_type="WINDOWS",
log_message=json.dumps(windows_log),
forwarder_id=forwarder_id
)
from datetime import datetime, timedelta, timezone
# Define custom timestamps
log_entry_time = datetime.now(timezone.utc) - timedelta(hours=1)
collection_time = datetime.now(timezone.utc)
result = chronicle.ingest_log(
log_type="OKTA",
log_message=json.dumps(okta_log),
log_entry_time=log_entry_time, # When the log was generated
collection_time=collection_time # When the log was collected
)
Ingest UDM events directly into Chronicle:
import uuid
from datetime import datetime, timezone
# Generate a unique ID
event_id = str(uuid.uuid4())
# Get current time in ISO 8601 format
current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Create a UDM event for a network connection
network_event = {
"metadata": {
"id": event_id,
"event_timestamp": current_time,
"event_type": "NETWORK_CONNECTION",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"ip": "192.168.1.100",
"port": 12345
},
"target": {
"ip": "203.0.113.10",
"port": 443
},
"network": {
"application_protocol": "HTTPS",
"direction": "OUTBOUND"
}
}
# Ingest a single UDM event
result = chronicle.ingest_udm(udm_events=network_event)
print(f"Ingested event with ID: {event_id}")
# Create a second event
process_event = {
"metadata": {
# No ID - one will be auto-generated
"event_timestamp": current_time,
"event_type": "PROCESS_LAUNCH",
"product_name": "My Security Product",
"vendor_name": "My Company"
},
"principal": {
"hostname": "workstation-1",
"process": {
"command_line": "ping 8.8.8.8",
"pid": 1234
},
"user": {
"userid": "user123"
}
}
}
# Ingest multiple UDM events in a single call
result = chronicle.ingest_udm(udm_events=[network_event, process_event])
print("Multiple events ingested successfully")
Note: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.
You can export Chronicle logs to Google Cloud Storage using the Data Export API:
from datetime import datetime, timedelta, timezone
# Set time range for export
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=1) # Last 24 hours
# Get available log types for export
available_log_types = chronicle.fetch_available_log_types(
start_time=start_time,
end_time=end_time
)
# Print available log types
for log_type in available_log_types["available_log_types"]:
print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})")
print(f" Available from {log_type.start_time} to {log_type.end_time}")
# Create a data export for a specific log type
export = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
log_type="GCP_DNS" # Specify log type to export
)
# Get the export ID
export_id = export["name"].split("/")[-1]
print(f"Created export with ID: {export_id}")
print(f"Status: {export['data_export_status']['stage']}")
# Check export status
status = chronicle.get_data_export(export_id)
print(f"Export status: {status['data_export_status']['stage']}")
print(f"Progress: {status['data_export_status'].get('progress_percentage', 0)}%")
# Cancel an export if needed
if status['data_export_status']['stage'] in ['IN_QUEUE', 'PROCESSING']:
cancelled = chronicle.cancel_data_export(export_id)
print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}")
# Export all log types at once
export_all = chronicle.create_data_export(
gcs_bucket="projects/my-project/buckets/my-export-bucket",
start_time=start_time,
end_time=end_time,
export_all_logs=True
)
print(f"Created export for all logs. Status: {export_all['data_export_status']['stage']}")
The Data Export API supports:
If you encounter any issues with the Data Export functionality, please submit them to our issue tracker with detailed information about the problem and steps to reproduce.
Search for network connection events:
from datetime import datetime, timedelta, timezone
# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Perform UDM search
results = chronicle.search_udm(
query="""
metadata.event_type = "NETWORK_CONNECTION"
ip != ""
""",
start_time=start_time,
end_time=end_time,
max_events=5
)
# Example response:
{
"events": [
{
"name": "projects/my-project/locations/us/instances/my-instance/events/encoded-event-id",
"udm": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "NETWORK_CONNECTION"
},
"target": {
"ip": ["192.168.1.100"],
"port": 443
},
"principal": {
"hostname": "workstation-1"
}
}
}
],
"total_events": 1,
"more_data_available": false
}
Get statistics about network connections grouped by hostname:
stats = chronicle.get_stats(
query="""metadata.event_type = "NETWORK_CONNECTION"
match:
target.hostname
outcome:
$count = count(metadata.id)
order:
$count desc""",
start_time=start_time,
end_time=end_time,
max_events=1000,
max_values=10
)
# Example response:
{
"columns": ["hostname", "count"],
"rows": [
{"hostname": "server-1", "count": 1500},
{"hostname": "server-2", "count": 1200}
],
"total_rows": 2
}
Export specific fields to CSV format:
csv_data = chronicle.fetch_udm_search_csv(
query='metadata.event_type = "NETWORK_CONNECTION"',
start_time=start_time,
end_time=end_time,
fields=["timestamp", "user", "hostname", "process name"]
)
# Example response:
"""
metadata.eventTimestamp,principal.hostname,target.ip,target.port
2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443
2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80
"""
Validate a UDM query before execution:
query = 'target.ip != "" and principal.hostname = "test-host"'
validation = chronicle.validate_query(query)
# Example response:
{
"isValid": true,
"queryType": "QUERY_TYPE_UDM_QUERY",
"suggestedFields": [
"target.ip",
"principal.hostname"
]
}
Search for events using natural language instead of UDM query syntax:
from datetime import datetime, timedelta, timezone
# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Last 24 hours
# Option 1: Translate natural language to UDM query
udm_query = chronicle.translate_nl_to_udm("show me network connections")
print(f"Translated query: {udm_query}")
# Example output: 'metadata.event_type="NETWORK_CONNECTION"'
# Then run the query manually if needed
results = chronicle.search_udm(
query=udm_query,
start_time=start_time,
end_time=end_time
)
# Option 2: Perform complete search with natural language
results = chronicle.nl_search(
text="show me failed login attempts",
start_time=start_time,
end_time=end_time,
max_events=100
)
# Example response (same format as search_udm):
{
"events": [
{
"event": {
"metadata": {
"eventTimestamp": "2024-02-09T10:30:00Z",
"eventType": "USER_LOGIN"
},
"principal": {
"user": {
"userid": "jdoe"
}
},
"securityResult": {
"action": "BLOCK",
"summary": "Failed login attempt"
}
}
}
],
"total_events": 1
}
The natural language search feature supports various query patterns:
If the natural language cannot be translated to a valid UDM query, an APIError
will be raised with a message indicating that no valid query could be generated.
Get detailed information about specific entities like IP addresses, domains, or file hashes. The function automatically detects the entity type based on the provided value and fetches a comprehensive summary including related entities, alerts, timeline, prevalence, and more.
# IP address summary
ip_summary = chronicle.summarize_entity(
value="8.8.8.8",
start_time=start_time,
end_time=end_time
)
# Domain summary
domain_summary = chronicle.summarize_entity(
value="google.com",
start_time=start_time,
end_time=end_time
)
# File hash summary (SHA256)
file_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
file_summary = chronicle.summarize_entity(
value=file_hash,
start_time=start_time,
end_time=end_time
)
# Optionally hint the preferred type if auto-detection might be ambiguous
user_summary = chronicle.summarize_entity(
value="jdoe",
start_time=start_time,
end_time=end_time,
preferred_entity_type="USER"
)
# Example response structure (EntitySummary object):
# Access attributes like: ip_summary.primary_entity, ip_summary.related_entities,
# ip_summary.alert_counts, ip_summary.timeline, ip_summary.prevalence, etc.
# Example fields within the EntitySummary object:
# primary_entity: {
# "name": "entities/...",
# "metadata": {
# "entityType": "ASSET", # Or FILE, DOMAIN_NAME, USER, etc.
# "interval": { "startTime": "...", "endTime": "..." }
# },
# "metric": { "firstSeen": "...", "lastSeen": "..." },
# "entity": { # Contains specific details like 'asset', 'file', 'domain'
# "asset": { "ip": ["8.8.8.8"] }
# }
# }
# related_entities: [ { ... similar to primary_entity ... } ]
# alert_counts: [ { "rule": "Rule Name", "count": 5 } ]
# timeline: { "buckets": [ { "alertCount": 1, "eventCount": 10 } ], "bucketSize": "3600s" }
# prevalence: [ { "prevalenceTime": "...", "count": 100 } ]
# file_metadata_and_properties: { # Only for FILE entities
# "metadata": [ { "key": "...", "value": "..." } ],
# "properties": [ { "title": "...", "properties": [ { "key": "...", "value": "..." } ] } ]
# }
Retrieve IoC matches against ingested events:
iocs = chronicle.list_iocs(
start_time=start_time,
end_time=end_time,
max_matches=1000,
add_mandiant_attributes=True,
prioritized_only=False
)
# Process the results
for ioc in iocs['matches']:
ioc_type = next(iter(ioc['artifactIndicator'].keys()))
ioc_value = next(iter(ioc['artifactIndicator'].values()))
print(f"IoC Type: {ioc_type}, Value: {ioc_value}")
print(f"Sources: {', '.join(ioc['sources'])}")
The IoC response includes:
Retrieve alerts and their associated cases:
# Get non-closed alerts
alerts = chronicle.get_alerts(
start_time=start_time,
end_time=end_time,
snapshot_query='feedback_summary.status != "CLOSED"',
max_alerts=100
)
# Get alerts from the response
alert_list = alerts.get('alerts', {}).get('alerts', [])
# Extract case IDs from alerts
case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}
# Get case details
if case_ids:
cases = chronicle.get_cases(list(case_ids))
# Process cases
for case in cases.cases:
print(f"Case: {case.display_name}")
print(f"Priority: {case.priority}")
print(f"Status: {case.status}")
The alerts response includes:
You can filter alerts using the snapshot query parameter with fields like:
detection.rule_name
detection.alert_state
feedback_summary.verdict
feedback_summary.priority
feedback_summary.status
The CaseList
class provides helper methods for working with cases:
# Get details for specific cases
cases = chronicle.get_cases(["case-id-1", "case-id-2"])
# Filter cases by priority
high_priority = cases.filter_by_priority("PRIORITY_HIGH")
# Filter cases by status
open_cases = cases.filter_by_status("STATUS_OPEN")
# Look up a specific case
case = cases.get_case("case-id-1")
The SDK provides comprehensive support for managing Chronicle detection rules:
Create new detection rules using YARA-L 2.0 syntax:
rule_text = """
rule simple_network_rule {
meta:
description = "Example rule to detect network connections"
author = "SecOps SDK Example"
severity = "Medium"
priority = "Medium"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$e.principal.hostname != ""
condition:
$e
}
"""
# Create the rule
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
Retrieve, list, update, enable/disable, and delete rules:
# List all rules
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
rule_id = rule.get("name", "").split("/")[-1]
enabled = rule.get("deployment", {}).get("enabled", False)
print(f"Rule ID: {rule_id}, Enabled: {enabled}")
# Get specific rule
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")
# Update rule
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)
# Enable/disable rule
deployment = chronicle.enable_rule(rule_id, enabled=True) # Enable
deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable
# Delete rule
chronicle.delete_rule(rule_id)
Run rules against historical data to find past matches:
from datetime import datetime, timedelta, timezone
# Set time range for retrohunt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Create retrohunt
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]
# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
Monitor rule detections and execution errors:
# List detections for a rule
detections = chronicle.list_detections(rule_id)
for detection in detections.get("detections", []):
detection_id = detection.get("id", "")
event_time = detection.get("eventTime", "")
alerting = detection.get("alertState", "") == "ALERTING"
print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")
# List execution errors for a rule
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
error_message = error.get("error_message", "")
create_time = error.get("create_time", "")
print(f"Error: {error_message}, Time: {create_time}")
Search for alerts generated by rules:
# Set time range for alert search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7) # Search past 7 days
# Search for rule alerts
alerts_response = chronicle.search_rule_alerts(
start_time=start_time,
end_time=end_time,
page_size=10
)
# The API returns a nested structure where alerts are grouped by rule
# Extract and process all alerts from this structure
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)
# Process the nested response structure - alerts are grouped by rule
for rule_alert in alerts_response.get('ruleAlerts', []):
# Extract rule metadata
rule_metadata = rule_alert.get('ruleMetadata', {})
rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
# Get alerts for this rule
rule_alerts = rule_alert.get('alerts', [])
# Process each alert
for alert in rule_alerts:
# Extract important fields
alert_id = alert.get("id", "")
detection_time = alert.get("detectionTimestamp", "")
commit_time = alert.get("commitTimestamp", "")
alerting_type = alert.get("alertingType", "")
print(f"Alert ID: {alert_id}")
print(f"Rule ID: {rule_id}")
print(f"Rule Name: {rule_name}")
print(f"Detection Time: {detection_time}")
# Extract events from the alert
if 'resultEvents' in alert:
for var_name, event_data in alert.get('resultEvents', {}).items():
if 'eventSamples' in event_data:
for sample in event_data.get('eventSamples', []):
if 'event' in sample:
event = sample['event']
# Process event data
event_type = event.get('metadata', {}).get('eventType', 'Unknown')
print(f"Event Type: {event_type}")
If tooManyAlerts
is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.
Manage curated rule sets:
# Define deployments for rule sets
deployments = [
{
"category_id": "category-uuid",
"rule_set_id": "ruleset-uuid",
"precision": "broad",
"enabled": True,
"alerting": False
}
]
# Update rule set deployments
chronicle.batch_update_curated_rule_set_deployments(deployments)
Validate a YARA-L2 rule before creating or updating it:
# Example rule
rule_text = """
rule test_rule {
meta:
description = "Test rule for validation"
author = "Test Author"
severity = "Low"
yara_version = "YL2.0"
rule_version = "1.0"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
condition:
$e
}
"""
# Validate the rule
result = chronicle.validate_rule(rule_text)
if result.success:
print("Rule is valid")
else:
print(f"Rule is invalid: {result.message}")
if result.position:
print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}")
The SDK defines several custom exceptions:
from secops.exceptions import SecOpsError, AuthenticationError, APIError
try:
results = chronicle.search_udm(...)
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except APIError as e:
print(f"API request failed: {e}")
except SecOpsError as e:
print(f"General error: {e}")
The SDK automatically detects the most common entity types when using the summarize_entity
function:
This detection happens internally within summarize_entity
, simplifying its usage. You only need to provide the value
argument.
# The SDK automatically determines how to query for these values
ip_summary = chronicle.summarize_entity(value="192.168.1.100", ...)
domain_summary = chronicle.summarize_entity(value="example.com", ...)
hash_summary = chronicle.summarize_entity(value="e17dd4eef8b4978673791ef4672f4f6a", ...)
You can optionally provide a preferred_entity_type
hint to summarize_entity
if the automatic detection might be ambiguous (e.g., a string could be a username or a hostname).
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
FAQs
Python SDK for wrapping the Google SecOps API for common use cases
We found that secops demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
At VulnCon 2025, NIST scrapped its NVD consortium plans, admitted it can't keep up with CVEs, and outlined automation efforts amid a mounting backlog.
Product
We redesigned our GitHub PR comments to deliver clear, actionable security insights without adding noise to your workflow.
Product
Our redesigned Repositories page adds alert severity, filtering, and tabs for faster triage and clearer insights across all your projects.