Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

trcli

Package Overview
Dependencies
Maintainers
1
Versions
45
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

trcli - npm Package Compare versions

Comparing version
1.12.6
to
1.13.0
+257
tests/test_cmd_export_gherkin.py
import pytest
from unittest import mock
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from pathlib import Path
from trcli.cli import Environment
from trcli.commands import cmd_export_gherkin
class TestCmdExportGherkin:
"""Test class for export_gherkin command functionality"""
def setup_method(self):
"""Set up test environment and runner"""
self.runner = CliRunner()
self.sample_feature_content = """@smoke
Feature: User Login
As a user
I want to log in
Scenario: Successful login
Given I am on the login page
When I enter valid credentials
Then I should see the dashboard
"""
# Set up environment with required parameters
self.environment = Environment(cmd="export_gherkin")
self.environment.host = "https://test.testrail.com"
self.environment.username = "test@example.com"
self.environment.password = "password"
self.environment.project = "Test Project"
self.environment.project_id = 1
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_success_to_file(self, mock_api_client_class, mock_api_handler_class):
"""Test successful export to file"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (self.sample_feature_content, "")
with self.runner.isolated_filesystem():
result = self.runner.invoke(
cmd_export_gherkin.cli, ["--case-id", "456", "--output", "exported.feature"], obj=self.environment
)
assert result.exit_code == 0
assert "successfully exported" in result.output.lower()
assert "exported.feature" in result.output
# Verify file was created with correct content
with open("exported.feature", "r") as f:
content = f.read()
assert "Feature: User Login" in content
assert "@smoke" in content
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_success_to_stdout(self, mock_api_client_class, mock_api_handler_class):
"""Test successful export to stdout"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (self.sample_feature_content, "")
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "456"], obj=self.environment)
assert result.exit_code == 0
# Content should be printed to stdout
assert "Feature: User Login" in result.output
assert "@smoke" in result.output
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_verbose_logging(self, mock_api_client_class, mock_api_handler_class):
"""Test export with verbose logging"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (self.sample_feature_content, "")
# Enable verbose mode via environment (verbose is now a global option)
self.environment.verbose = True
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "456"], obj=self.environment)
assert result.exit_code == 0
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_api_error_case_not_found(self, mock_api_client_class, mock_api_handler_class):
"""Test API error when case not found"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler with error
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = ("", "Failed to retrieve BDD test case (HTTP 404)")
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "99999"], obj=self.environment)
assert result.exit_code == 1
assert "error" in result.output.lower()
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_empty_content(self, mock_api_client_class, mock_api_handler_class):
"""Test when no BDD content is returned"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler with empty content
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = ("", "") # Empty content, no error
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "456"], obj=self.environment)
assert result.exit_code == 1
assert "no bdd content found" in result.output.lower()
@pytest.mark.cmd_export_gherkin
def test_export_gherkin_required_parameters(self):
"""Test that required parameters are validated"""
# Missing --case-id
result = self.runner.invoke(cmd_export_gherkin.cli, ["--project-id", "1"])
assert result.exit_code == 2 # Click error for missing required option
# Missing --project-id (handled by check_for_required_parameters)
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "456"])
# Will fail due to missing required params
@pytest.mark.cmd_export_gherkin
def test_export_gherkin_invalid_case_id(self):
"""Test with invalid case ID (negative or zero)"""
result = self.runner.invoke(cmd_export_gherkin.cli, ["--case-id", "-1"], obj=self.environment)
# Click IntRange validation should catch this
assert result.exit_code == 2
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
@patch("builtins.open", side_effect=PermissionError("Permission denied"))
def test_export_gherkin_permission_error(self, mock_open, mock_api_client_class, mock_api_handler_class):
"""Test file write permission error"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (self.sample_feature_content, "")
result = self.runner.invoke(
cmd_export_gherkin.cli,
["--case-id", "456", "--output", "/root/no_permission.feature"],
obj=self.environment,
)
assert result.exit_code == 1
# Check for various error messages related to file writing
assert (
"permission denied" in result.output.lower()
or "read-only file system" in result.output.lower()
or "error writing file" in result.output.lower()
)
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_create_nested_directory(self, mock_api_client_class, mock_api_handler_class):
"""Test that parent directories are created if they don't exist"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (self.sample_feature_content, "")
with self.runner.isolated_filesystem():
output_path = "nested/dir/exported.feature"
result = self.runner.invoke(
cmd_export_gherkin.cli, ["--case-id", "456", "--output", output_path], obj=self.environment
)
assert result.exit_code == 0
# Verify nested directory was created
assert Path(output_path).exists()
assert Path(output_path).is_file()
@pytest.mark.cmd_export_gherkin
@patch("trcli.commands.cmd_export_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_export_gherkin.APIClient")
def test_export_gherkin_unicode_content(self, mock_api_client_class, mock_api_handler_class):
"""Test export with unicode characters"""
unicode_content = """@test
Feature: Tëst with ūnīcödé 测试
Scenario: Test scenario
Given test step with émojis 🎉
"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.get_bdd.return_value = (unicode_content, "")
with self.runner.isolated_filesystem():
result = self.runner.invoke(
cmd_export_gherkin.cli, ["--case-id", "456", "--output", "unicode.feature"], obj=self.environment
)
assert result.exit_code == 0
# Verify unicode content is preserved
with open("unicode.feature", "r", encoding="utf-8") as f:
content = f.read()
assert "ūnīcödé" in content
assert "测试" in content
assert "🎉" in content
import pytest
import json
from unittest import mock
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from pathlib import Path
from trcli.cli import Environment
from trcli.commands import cmd_import_gherkin
class TestCmdImportGherkin:
"""Test class for import_gherkin command functionality"""
def setup_method(self):
"""Set up test environment and runner"""
self.runner = CliRunner()
self.test_feature_path = str(Path(__file__).parent / "test_data" / "FEATURE" / "sample_bdd.feature")
# Set up environment with required parameters
self.environment = Environment(cmd="import_gherkin")
self.environment.host = "https://test.testrail.com"
self.environment.username = "test@example.com"
self.environment.password = "password"
self.environment.project = "Test Project"
self.environment.project_id = 1
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_success(self, mock_api_client_class, mock_api_handler_class):
"""Test successful feature file upload"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([456], "") # Success: case ID 456, no error
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test scenario\n Given test step\n")
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "test.feature", "--section-id", "123"], obj=self.environment
)
assert result.exit_code == 0
assert "successfully uploaded" in result.output.lower()
assert "456" in result.output
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_json_output(self, mock_api_client_class, mock_api_handler_class):
"""Test feature file upload with JSON output"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([101, 102], "") # Success: 2 case IDs
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test 1\n Scenario: Test 2\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--section-id", "123", "--json-output"],
obj=self.environment,
)
assert result.exit_code == 0
# Output contains logging messages + JSON, extract JSON (starts with '{')
json_start = result.output.find("{")
assert json_start >= 0, "No JSON found in output"
json_str = result.output[json_start:]
output_data = json.loads(json_str)
assert "case_ids" in output_data
assert output_data["case_ids"] == [101, 102]
assert output_data["count"] == 2
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_verbose_logging(self, mock_api_client_class, mock_api_handler_class):
"""Test feature file upload with verbose logging"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([456], "")
# Enable verbose mode via environment (verbose is now a global option)
self.environment.verbose = True
with self.runner.isolated_filesystem():
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--section-id", "123"],
obj=self.environment,
)
assert result.exit_code == 0
# Verbose output should show API endpoint
# (verbose logs might not appear in captured output but command should succeed)
@pytest.mark.cmd_import_gherkin
def test_import_gherkin_missing_file(self):
"""Test with non-existent file"""
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "/nonexistent/file.feature", "--section-id", "123"], obj=self.environment
)
# Click returns exit code 2 for invalid parameter (file doesn't exist)
assert result.exit_code in [1, 2]
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_empty_file(self, mock_api_client_class, mock_api_handler_class):
"""Test with empty feature file"""
with self.runner.isolated_filesystem():
# Create empty file
with open("empty.feature", "w") as f:
f.write("")
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "empty.feature", "--section-id", "123"], obj=self.environment
)
assert result.exit_code == 1
assert "empty" in result.output.lower()
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_api_error(self, mock_api_client_class, mock_api_handler_class):
"""Test API error handling"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler with error
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([], "API Error: Section not found")
with self.runner.isolated_filesystem():
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "test.feature", "--section-id", "999"], obj=self.environment
)
assert result.exit_code == 1
assert "error" in result.output.lower()
assert "section not found" in result.output.lower()
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_no_cases_created(self, mock_api_client_class, mock_api_handler_class):
"""Test when no case IDs are returned from API"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler with empty case IDs
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([], "") # No error, but no cases created
with self.runner.isolated_filesystem():
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "test.feature", "--section-id", "123"], obj=self.environment
)
assert result.exit_code == 0 # Not an error, just a warning
assert "warning" in result.output.lower()
assert "no case" in result.output.lower()
@pytest.mark.cmd_import_gherkin
def test_import_gherkin_required_parameters(self):
"""Test that required parameters are validated"""
# Missing --file
result = self.runner.invoke(cmd_import_gherkin.cli, ["--section-id", "123", "--project-id", "1"])
assert result.exit_code == 2 # Click error for missing required option
# Missing --section-id
with self.runner.isolated_filesystem():
with open("test.feature", "w") as f:
f.write("Feature: Test\n")
result = self.runner.invoke(cmd_import_gherkin.cli, ["--file", "test.feature", "--project-id", "1"])
assert result.exit_code == 2
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_invalid_section_id(self, mock_api_client_class, mock_api_handler_class):
"""Test with invalid section ID (negative number)"""
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", self.test_feature_path, "--section-id", "-1"], # Invalid: negative
obj=self.environment,
)
# Click IntRange validation should catch this
assert result.exit_code == 2
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_unicode_content(self, mock_api_client_class, mock_api_handler_class):
"""Test feature file with unicode characters"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.add_bdd.return_value = ([456], "")
with self.runner.isolated_filesystem():
# Create feature file with unicode
with open("unicode.feature", "w", encoding="utf-8") as f:
f.write("Feature: Tëst with ūnīcödé\n Scenario: Test 测试\n Given test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli, ["--file", "unicode.feature", "--section-id", "123"], obj=self.environment
)
assert result.exit_code == 0
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_update_mode(self, mock_api_client_class, mock_api_handler_class):
"""Test feature file update with --update flag"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.update_bdd.return_value = ([456], "") # Success: case ID 456, no error
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Updated scenario\n Given updated step\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--case-id", "456", "--update"],
obj=self.environment,
)
assert result.exit_code == 0
assert "successfully updated" in result.output.lower()
assert "456" in result.output
# Verify update_bdd was called with case_id, not add_bdd
mock_handler.update_bdd.assert_called_once_with(456, mock.ANY)
mock_handler.add_bdd.assert_not_called()
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_update_mode_json_output(self, mock_api_client_class, mock_api_handler_class):
"""Test feature file update with --update and JSON output"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.update_bdd.return_value = ([789], "")
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n Scenario: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--case-id", "789", "--update", "--json-output"],
obj=self.environment,
)
assert result.exit_code == 0
# Extract JSON from output
json_start = result.output.find("{")
assert json_start >= 0, "No JSON found in output"
json_str = result.output[json_start:]
import json
output_data = json.loads(json_str)
assert "case_ids" in output_data
assert output_data["case_ids"] == [789]
# Verify update_bdd was called with case_id
mock_handler.update_bdd.assert_called_once_with(789, mock.ANY)
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_update_mode_api_error(self, mock_api_client_class, mock_api_handler_class):
"""Test update mode with API error"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler with error
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.update_bdd.return_value = ([], "TestRail API error: Case not found")
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--case-id", "999", "--update"],
obj=self.environment,
)
assert result.exit_code == 1
assert "error" in result.output.lower()
assert "updating" in result.output.lower()
@pytest.mark.cmd_import_gherkin
@patch("trcli.commands.cmd_import_gherkin.ApiRequestHandler")
@patch("trcli.commands.cmd_import_gherkin.APIClient")
def test_import_gherkin_update_mode_verbose(self, mock_api_client_class, mock_api_handler_class):
"""Test update mode with verbose logging shows correct endpoint"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API request handler
mock_handler = MagicMock()
mock_api_handler_class.return_value = mock_handler
mock_handler.update_bdd.return_value = ([456], "")
# Enable verbose mode
self.environment.verbose = True
with self.runner.isolated_filesystem():
# Create test feature file
with open("test.feature", "w") as f:
f.write("Feature: Test\n")
result = self.runner.invoke(
cmd_import_gherkin.cli,
["--file", "test.feature", "--case-id", "456", "--update"],
obj=self.environment,
)
assert result.exit_code == 0
# Verify verbose output shows update_bdd endpoint
assert "update_bdd" in result.output
assert "456" in result.output # case_id in verbose log
# Verify update_bdd was called with case_id
mock_handler.update_bdd.assert_called_once_with(456, mock.ANY)
import pytest
import json
from unittest import mock
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from pathlib import Path
from trcli.cli import Environment
from trcli.commands import cmd_parse_cucumber
class TestCmdParseCucumber:
"""Test class for parse_cucumber command functionality"""
def setup_method(self):
"""Set up test environment and runner"""
self.runner = CliRunner()
self.test_cucumber_path = str(Path(__file__).parent / "test_data" / "CUCUMBER" / "sample_cucumber.json")
# Set up environment with required parameters
self.environment = Environment(cmd="parse_cucumber")
self.environment.host = "https://test.testrail.com"
self.environment.username = "test@example.com"
self.environment.password = "password"
self.environment.project = "Test Project"
self.environment.project_id = 1
self.environment.auto_creation_response = True # Enable auto-creation for tests
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.ResultsUploader")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_workflow1_results_only(
self, mock_parser_class, mock_uploader_class, mock_api_client_class, mock_api_handler_class
):
"""Test Workflow 1: Parse and upload results only (no feature upload)"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_suite = MagicMock()
mock_suite.name = "Test Suite"
mock_parser.parse_file.return_value = [mock_suite]
# Mock uploader
mock_uploader = MagicMock()
mock_uploader_class.return_value = mock_uploader
mock_uploader.last_run_id = 123
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", self.test_cucumber_path, "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
assert result.exit_code == 0
mock_parser.parse_file.assert_called_once()
mock_uploader.upload_results.assert_called_once()
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.ResultsUploader")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
@patch(
"builtins.open",
new_callable=mock.mock_open,
read_data='[{"name":"Test Feature","elements":[{"type":"scenario","name":"Test Scenario"}]}]',
)
def test_parse_cucumber_auto_create_missing_features(
self, mock_open, mock_parser_class, mock_uploader_class, mock_api_client_class, mock_api_handler_class
):
"""Test auto-creation of missing BDD test cases (default behavior)"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
# First parse returns case_id=-1 (needs creation)
mock_suite = MagicMock()
mock_suite.name = "Test Suite"
mock_section = MagicMock()
mock_section.name = "Test Feature"
mock_case = MagicMock()
mock_case.case_id = -1 # Marker for auto-creation
mock_case.result = MagicMock()
mock_section.testcases = [mock_case]
mock_suite.testsections = [mock_section]
mock_parser.parse_file.return_value = [mock_suite]
# Mock _generate_feature_content to return Gherkin content
mock_parser._generate_feature_content.return_value = "Feature: Test\n Scenario: Test\n Given test step\n"
mock_parser._normalize_title.return_value = "test feature"
# Mock section fetch and creation
mock_api_handler._ApiRequestHandler__get_all_sections.return_value = ([], None)
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 456}
mock_api_handler.client.send_post.return_value = mock_response
# Mock BDD template and add_bdd
mock_api_handler.get_bdd_template_id.return_value = (2, None)
mock_api_handler.add_bdd.return_value = ([101], None)
# Mock uploader
mock_uploader = MagicMock()
mock_uploader_class.return_value = mock_uploader
mock_uploader.last_run_id = 123
result = self.runner.invoke(
cmd_parse_cucumber.cli,
[
"--file",
self.test_cucumber_path,
"--suite-id",
"2",
"--title",
"Test Run",
],
obj=self.environment,
)
assert result.exit_code == 0
mock_api_handler.get_bdd_template_id.assert_called_once()
mock_api_handler.add_bdd.assert_called_once()
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.ResultsUploader")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_with_n_flag(
self, mock_parser_class, mock_uploader_class, mock_api_client_class, mock_api_handler_class
):
"""Test that -n flag only matches existing BDD test cases"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_suite = MagicMock()
mock_suite.name = "Test Suite"
mock_section = MagicMock()
mock_section.name = "Test Feature"
mock_section.testcases = []
mock_suite.testsections = [mock_section]
mock_parser.parse_file.return_value = [mock_suite]
# Mock uploader
mock_uploader = MagicMock()
mock_uploader_class.return_value = mock_uploader
mock_uploader.last_run_id = 123
# Set auto_creation_response to False (simulates -n flag)
self.environment.auto_creation_response = False
result = self.runner.invoke(
cmd_parse_cucumber.cli,
[
"--file",
self.test_cucumber_path,
"--suite-id",
"2",
"--title",
"Test Run",
],
obj=self.environment,
)
assert result.exit_code == 0
# Verify auto_create=False was passed to parser
mock_parser.parse_file.assert_called_with(bdd_matching_mode=True, project_id=1, suite_id=2, auto_create=False)
@pytest.mark.cmd_parse_cucumber
def test_parse_cucumber_missing_file(self):
"""Test with non-existent Cucumber JSON file"""
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", "/nonexistent/results.json", "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
assert result.exit_code == 1
assert "not found" in result.output.lower() or result.exception is not None
@pytest.mark.cmd_parse_cucumber
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_invalid_json(self, mock_parser_class):
"""Test with invalid JSON format"""
# Mock parser to raise JSONDecodeError
mock_parser_class.side_effect = json.JSONDecodeError("Invalid JSON", "", 0)
with self.runner.isolated_filesystem():
# Create invalid JSON file
with open("invalid.json", "w") as f:
f.write("This is not valid JSON{{{")
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", "invalid.json", "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
assert result.exit_code == 1
@pytest.mark.cmd_parse_cucumber
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_empty_json(self, mock_parser_class):
"""Test with empty JSON file"""
with self.runner.isolated_filesystem():
# Create empty JSON file
with open("empty.json", "w") as f:
f.write("[]")
# Mock parser to return empty list
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_file.return_value = []
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", "empty.json", "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
# Should handle gracefully (may succeed with warning or fail)
# Exit code depends on implementation
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
@patch("builtins.open", new_callable=mock.mock_open, read_data="[]")
def test_parse_cucumber_invalid_cucumber_json(
self, mock_open, mock_parser_class, mock_api_client_class, mock_api_handler_class
):
"""Test with invalid Cucumber JSON structure (empty array)"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser to raise error for empty JSON
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_file.side_effect = ValueError("Invalid Cucumber JSON format: empty array")
result = self.runner.invoke(
cmd_parse_cucumber.cli,
[
"--file",
self.test_cucumber_path,
"--suite-id",
"2",
"--title",
"Test Run",
],
obj=self.environment,
)
assert result.exit_code == 1
# Check that it fails with any appropriate error (either JSON format or parsing error)
assert "invalid" in result.output.lower() or "error parsing" in result.output.lower()
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
@patch(
"builtins.open",
new_callable=mock.mock_open,
read_data='[{"name":"Test Feature","elements":[{"type":"scenario","name":"Test Scenario"}]}]',
)
def test_parse_cucumber_api_error_during_auto_creation(
self, mock_open, mock_parser_class, mock_api_client_class, mock_api_handler_class
):
"""Test API error during BDD test case auto-creation"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler with error
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_suite = MagicMock()
mock_section = MagicMock()
mock_section.name = "Test Feature"
mock_case = MagicMock()
mock_case.case_id = -1 # Needs creation
mock_section.testcases = [mock_case]
mock_suite.testsections = [mock_section]
mock_parser.parse_file.return_value = [mock_suite]
mock_parser._generate_feature_content.return_value = "Feature: Test\n Scenario: Test\n"
mock_parser._normalize_title.return_value = "test feature"
# Mock section fetch
mock_api_handler._ApiRequestHandler__get_all_sections.return_value = ([], None)
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 456}
mock_api_handler.client.send_post.return_value = mock_response
# Mock BDD template and add_bdd with error
mock_api_handler.get_bdd_template_id.return_value = (2, None)
mock_api_handler.add_bdd.return_value = ([], "API Error: Section not found")
result = self.runner.invoke(
cmd_parse_cucumber.cli,
[
"--file",
self.test_cucumber_path,
"--suite-id",
"2",
"--title",
"Test Run",
],
obj=self.environment,
)
assert result.exit_code == 1
assert "error" in result.output.lower()
@pytest.mark.cmd_parse_cucumber
def test_parse_cucumber_required_parameters(self):
"""Test that required parameters are validated"""
# Missing --file
result = self.runner.invoke(
cmd_parse_cucumber.cli, ["--project-id", "1", "--suite-id", "2", "--title", "Test Run"]
)
# Will fail due to missing required params
# Missing --project-id (handled by check_for_required_parameters)
result = self.runner.invoke(
cmd_parse_cucumber.cli, ["--file", self.test_cucumber_path, "--suite-id", "2", "--title", "Test Run"]
)
# Will fail
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.ResultsUploader")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_validation_exception(
self, mock_parser_class, mock_uploader_class, mock_api_client_class, mock_api_handler_class
):
"""Test handling of ValidationException"""
from trcli.data_classes.validation_exception import ValidationException
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser to raise ValidationException
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_file.side_effect = ValidationException("CucumberParser", "Validation error occurred")
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", self.test_cucumber_path, "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
assert result.exit_code == 1
assert "validation error" in result.output.lower()
@pytest.mark.cmd_parse_cucumber
@patch("trcli.api.api_request_handler.ApiRequestHandler")
@patch("trcli.api.api_client.APIClient")
@patch("trcli.commands.cmd_parse_cucumber.ResultsUploader")
@patch("trcli.commands.cmd_parse_cucumber.CucumberParser")
def test_parse_cucumber_value_error(
self, mock_parser_class, mock_uploader_class, mock_api_client_class, mock_api_handler_class
):
"""Test handling of ValueError during parsing"""
# Mock API client
mock_api_client = MagicMock()
mock_api_client_class.return_value = mock_api_client
mock_api_client_class.build_uploader_metadata.return_value = {}
# Mock API handler
mock_api_handler = MagicMock()
mock_api_handler_class.return_value = mock_api_handler
# Mock project data resolution
mock_project_data = MagicMock()
mock_project_data.project_id = 1
mock_api_handler.get_project_data.return_value = mock_project_data
# Mock parser to raise ValueError
mock_parser = MagicMock()
mock_parser_class.return_value = mock_parser
mock_parser.parse_file.side_effect = ValueError("Invalid Cucumber JSON structure")
result = self.runner.invoke(
cmd_parse_cucumber.cli,
["--file", self.test_cucumber_path, "--suite-id", "2", "--title", "Test Run"],
obj=self.environment,
)
assert result.exit_code == 1
assert "error parsing" in result.output.lower()
import pytest
import json
from unittest import mock
from unittest.mock import MagicMock, patch, call
from pathlib import Path
from trcli.cli import Environment
from trcli.readers.cucumber_json import CucumberParser
from trcli.data_classes.dataclass_testrail import TestRailSeparatedStep
class TestCucumberBDDMatching:
"""Test class for BDD matching mode functionality in CucumberParser"""
def setup_method(self):
"""Set up test environment"""
self.environment = Environment(cmd="parse_cucumber")
self.environment.host = "https://test.testrail.com"
self.environment.username = "test@example.com"
self.environment.password = "password"
self.environment.project = "Test Project"
self.environment.project_id = 1
self.environment.suite_id = 2
# Create a temporary test file for CucumberParser initialization
import tempfile
self.temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
self.temp_file.write("[]")
self.temp_file.close()
self.environment.file = self.temp_file.name
# Sample Cucumber JSON feature
self.sample_feature = {
"name": "User Login",
"tags": [{"name": "@smoke"}],
"elements": [
{
"type": "scenario",
"name": "Successful login",
"tags": [],
"steps": [
{
"keyword": "Given",
"name": "user is on login page",
"result": {"status": "passed", "duration": 1000000000},
},
{
"keyword": "When",
"name": "user enters valid credentials",
"result": {"status": "passed", "duration": 2000000000},
},
{
"keyword": "Then",
"name": "user is logged in",
"result": {"status": "passed", "duration": 500000000},
},
],
},
{
"type": "scenario",
"name": "Failed login",
"tags": [],
"steps": [
{
"keyword": "Given",
"name": "user is on login page",
"result": {"status": "passed", "duration": 1000000000},
},
{
"keyword": "When",
"name": "user enters invalid credentials",
"result": {"status": "passed", "duration": 2000000000},
},
{
"keyword": "Then",
"name": "error message is shown",
"result": {
"status": "failed",
"duration": 500000000,
"error_message": "Expected error not found",
},
},
],
},
],
}
def teardown_method(self):
"""Clean up temporary files"""
import os
if hasattr(self, "temp_file") and os.path.exists(self.temp_file.name):
os.unlink(self.temp_file.name)
def _create_mock_api_handler(self):
"""Helper to create mock API handler with BDD field resolution"""
mock_api_handler = MagicMock()
# Mock BDD field name resolution (returns default field names)
mock_api_handler.get_bdd_case_field_name.return_value = "custom_testrail_bdd_scenario"
mock_api_handler.get_bdd_result_field_name.return_value = "custom_testrail_bdd_scenario_results"
return mock_api_handler
@pytest.mark.cucumber_bdd_matching
def test_normalize_title_basic(self):
"""Test title normalization removes special characters and normalizes case"""
parser = CucumberParser(self.environment)
assert parser._normalize_title("User Login") == "user login"
assert parser._normalize_title("User-Login") == "user login"
assert parser._normalize_title("User_Login!") == "user login"
assert parser._normalize_title(" User Login ") == "user login"
assert parser._normalize_title("User@#$%Login") == "user login"
@pytest.mark.cucumber_bdd_matching
def test_normalize_title_complex(self):
"""Test title normalization with complex cases"""
parser = CucumberParser(self.environment)
assert parser._normalize_title("E-commerce: Product Checkout") == "e commerce product checkout"
assert parser._normalize_title("API (v2) Authentication") == "api v2 authentication"
assert parser._normalize_title("Test-Case #123") == "test case 123"
@pytest.mark.cucumber_bdd_matching
def test_extract_case_id_from_feature_tags(self):
"""Test case ID extraction from feature-level tags"""
parser = CucumberParser(self.environment)
feature_tags = ["@smoke", "@C123", "@regression"]
scenario_tags = ["@C456"]
# Feature-level tag should take priority
case_id = parser._extract_case_id_from_tags(feature_tags, scenario_tags)
assert case_id == 123
@pytest.mark.cucumber_bdd_matching
def test_extract_case_id_from_scenario_tags(self):
"""Test case ID extraction from scenario-level tags (fallback)"""
parser = CucumberParser(self.environment)
feature_tags = ["@smoke"]
scenario_tags = ["@C456", "@regression"]
# Should use scenario-level tag when no feature-level tag
case_id = parser._extract_case_id_from_tags(feature_tags, scenario_tags)
assert case_id == 456
@pytest.mark.cucumber_bdd_matching
def test_extract_case_id_no_tags(self):
"""Test case ID extraction returns None when no @C tags"""
parser = CucumberParser(self.environment)
feature_tags = ["@smoke", "@regression"]
scenario_tags = ["@fast"]
case_id = parser._extract_case_id_from_tags(feature_tags, scenario_tags)
assert case_id is None
@pytest.mark.cucumber_bdd_matching
def test_extract_case_id_lowercase(self):
"""Test case ID extraction with lowercase @c tag"""
parser = CucumberParser(self.environment)
feature_tags = ["@c789"]
scenario_tags = []
case_id = parser._extract_case_id_from_tags(feature_tags, scenario_tags)
assert case_id == 789
@pytest.mark.cucumber_bdd_matching
def test_extract_case_id_invalid_format(self):
"""Test case ID extraction handles invalid formats gracefully"""
parser = CucumberParser(self.environment)
feature_tags = ["@C", "@Cabc", "@C123abc"]
scenario_tags = []
# Should return None for invalid formats
case_id = parser._extract_case_id_from_tags(feature_tags, scenario_tags)
assert case_id is None
@pytest.mark.cucumber_bdd_matching
def test_find_case_by_title_found(self):
"""Test finding case by title using cached lookup"""
parser = CucumberParser(self.environment)
# Mock API handler
mock_api_handler = MagicMock()
parser._api_handler = mock_api_handler
# Mock find_bdd_case_by_name to return case ID 101
mock_api_handler.find_bdd_case_by_name.return_value = (101, None, [])
case_id = parser._find_case_by_title("User Login", project_id=1, suite_id=2)
assert case_id == 101
# Verify API handler was called correctly
mock_api_handler.find_bdd_case_by_name.assert_called_once_with(
feature_name="User Login", project_id=1, suite_id=2
)
@pytest.mark.cucumber_bdd_matching
def test_find_case_by_title_not_found(self):
"""Test finding case by title returns None when not in cache"""
parser = CucumberParser(self.environment)
# Mock API handler
mock_api_handler = MagicMock()
parser._api_handler = mock_api_handler
# Mock find_bdd_case_by_name to return -1 (not found)
mock_api_handler.find_bdd_case_by_name.return_value = (-1, None, [])
case_id = parser._find_case_by_title("Nonexistent Feature", project_id=1, suite_id=2)
assert case_id is None
@pytest.mark.cucumber_bdd_matching
def test_find_case_by_title_normalization(self):
"""Test case matching with different formatting (normalization happens in API handler)"""
parser = CucumberParser(self.environment)
# Mock API handler
mock_api_handler = MagicMock()
parser._api_handler = mock_api_handler
# Mock find_bdd_case_by_name to always return case ID 101
# (normalization is tested in API handler tests)
mock_api_handler.find_bdd_case_by_name.return_value = (101, None, [])
# Should call API handler with each variation
assert parser._find_case_by_title("User Login", 1, 2) == 101
assert parser._find_case_by_title("User-Login", 1, 2) == 101
assert parser._find_case_by_title("user_login", 1, 2) == 101
assert parser._find_case_by_title("USER LOGIN", 1, 2) == 101
# Verify API handler was called 4 times
assert mock_api_handler.find_bdd_case_by_name.call_count == 4
@pytest.mark.cucumber_bdd_matching
def test_api_handler_builds_cache_correctly(self):
"""Test API handler builds BDD cases cache correctly (integration test)"""
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.api.api_client import APIClient
# Create mock environment and client
mock_env = MagicMock()
mock_env.vlog = MagicMock()
mock_client = MagicMock(spec=APIClient)
mock_client.VERSION = "v2"
# Create API handler with mock suite data
from trcli.data_classes.dataclass_testrail import TestRailSuite
mock_suite = TestRailSuite(name="test", suite_id=2)
api_handler = ApiRequestHandler(
environment=mock_env, api_client=mock_client, suites_data=mock_suite, verify=False
)
# Mock __get_all_cases to return BDD and non-BDD cases
mock_cases = [
{"id": 101, "title": "User Login", "custom_testrail_bdd_scenario": "Scenario: Login"},
{"id": 102, "title": "Product Search", "custom_testrail_bdd_scenario": None}, # Not BDD
{"id": 103, "title": "Checkout Process", "custom_testrail_bdd_scenario": "Scenario: Checkout"},
]
with patch.object(api_handler, "_ApiRequestHandler__get_all_cases", return_value=(mock_cases, None)):
# Call find_bdd_case_by_name which triggers cache build
case_id, error, duplicates = api_handler.find_bdd_case_by_name("User Login", 1, 2)
# Should find case 101
assert case_id == 101
assert error is None
assert duplicates == []
@pytest.mark.cucumber_bdd_matching
def test_api_handler_caching_behavior(self):
"""Test API handler cache is only built once per project/suite"""
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.api.api_client import APIClient
from trcli.data_classes.dataclass_testrail import TestRailSuite
# Create mock environment and client
mock_env = MagicMock()
mock_env.vlog = MagicMock()
mock_client = MagicMock(spec=APIClient)
mock_client.VERSION = "v2"
mock_suite = TestRailSuite(name="test", suite_id=2)
api_handler = ApiRequestHandler(
environment=mock_env, api_client=mock_client, suites_data=mock_suite, verify=False
)
mock_cases = [{"id": 101, "title": "User Login", "custom_testrail_bdd_scenario": "Scenario: Login"}]
with patch.object(
api_handler, "_ApiRequestHandler__get_all_cases", return_value=(mock_cases, None)
) as mock_get_cases:
# First call - should build cache
case_id1, _, _ = api_handler.find_bdd_case_by_name("User Login", 1, 2)
assert mock_get_cases.call_count == 1
# Second call with same project/suite - should use cache
case_id2, _, _ = api_handler.find_bdd_case_by_name("User Login", 1, 2)
assert mock_get_cases.call_count == 1 # No additional call
# Both calls should find the same case
assert case_id1 == case_id2 == 101
@pytest.mark.cucumber_bdd_matching
def test_api_handler_handles_api_error(self):
"""Test API handler handles API errors gracefully"""
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.api.api_client import APIClient
from trcli.data_classes.dataclass_testrail import TestRailSuite
# Create mock environment and client
mock_env = MagicMock()
mock_env.vlog = MagicMock()
mock_client = MagicMock(spec=APIClient)
mock_client.VERSION = "v2"
mock_suite = TestRailSuite(name="test", suite_id=2)
api_handler = ApiRequestHandler(
environment=mock_env, api_client=mock_client, suites_data=mock_suite, verify=False
)
# Mock API error
with patch.object(api_handler, "_ApiRequestHandler__get_all_cases", return_value=([], "API Error")):
case_id, error, duplicates = api_handler.find_bdd_case_by_name("User Login", 1, 2)
# Should return None with error message
assert case_id is None
assert "API Error" in error
assert duplicates == []
@pytest.mark.cucumber_bdd_matching
def test_validate_bdd_case_exists_valid(self):
"""Test validation succeeds for valid BDD case"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock valid BDD case - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {
"id": 101,
"title": "User Login",
"custom_testrail_bdd_scenario": "Scenario: Login",
}
mock_api_handler.client.send_get.return_value = mock_response
is_valid, error_message = parser._validate_bdd_case_exists(101)
assert is_valid is True
assert error_message is None
@pytest.mark.cucumber_bdd_matching
def test_validate_bdd_case_not_found(self):
"""Test validation fails when case not found"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock case not found - mock send_get response
mock_response = MagicMock()
mock_response.error_message = "Case not found"
mock_response.response_text = None
mock_api_handler.client.send_get.return_value = mock_response
is_valid, error_message = parser._validate_bdd_case_exists(999)
assert is_valid is False
assert "not found" in error_message.lower()
@pytest.mark.cucumber_bdd_matching
def test_validate_bdd_case_not_bdd_template(self):
"""Test validation fails when case is not BDD template"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock non-BDD case - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 102, "title": "Regular Test", "custom_testrail_bdd_scenario": None}
mock_api_handler.client.send_get.return_value = mock_response
is_valid, error_message = parser._validate_bdd_case_exists(102)
assert is_valid is False
assert "not a bdd template" in error_message.lower()
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_as_bdd_case_with_tag(self):
"""Test parsing feature as BDD case using @C tag"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock validation - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 123, "custom_testrail_bdd_scenario": "Scenario: Test"}
mock_api_handler.client.send_get.return_value = mock_response
# Add @C tag to feature
feature_with_tag = self.sample_feature.copy()
feature_with_tag["tags"] = [{"name": "@C123"}]
test_case = parser._parse_feature_as_bdd_case(feature_with_tag, project_id=1, suite_id=2)
assert test_case is not None
assert test_case.case_id == 123
assert test_case.result.case_id == 123
# Check BDD scenario results are in result_fields dict
bdd_field_name = "custom_testrail_bdd_scenario_results"
assert bdd_field_name in test_case.result.result_fields
assert len(test_case.result.result_fields[bdd_field_name]) == 2 # Two scenarios
assert test_case.result.status_id == 5 # Failed (one scenario failed)
@pytest.mark.cucumber_bdd_matching
@patch("trcli.readers.cucumber_json.CucumberParser._find_case_by_title")
def test_parse_feature_as_bdd_case_by_title(self, mock_find):
"""Test parsing feature as BDD case using title matching"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock title matching
mock_find.return_value = 456
# Mock validation - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 456, "custom_testrail_bdd_scenario": "Scenario: Test"}
mock_api_handler.client.send_get.return_value = mock_response
test_case = parser._parse_feature_as_bdd_case(self.sample_feature, project_id=1, suite_id=2)
assert test_case is not None
assert test_case.case_id == 456
mock_find.assert_called_once_with("User Login", 1, 2)
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_as_bdd_case_scenario_statuses(self):
"""Test BDD scenario results have correct statuses"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock validation - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 123, "custom_testrail_bdd_scenario": "Scenario: Test"}
mock_api_handler.client.send_get.return_value = mock_response
feature_with_tag = self.sample_feature.copy()
feature_with_tag["tags"] = [{"name": "@C123"}]
test_case = parser._parse_feature_as_bdd_case(feature_with_tag, project_id=1, suite_id=2)
# Check BDD scenario results are in result_fields dict
bdd_field_name = "custom_testrail_bdd_scenario_results"
assert bdd_field_name in test_case.result.result_fields
scenarios = test_case.result.result_fields[bdd_field_name]
# First scenario: passed (results are stored as dicts)
assert scenarios[0]["content"] == "Successful login"
assert scenarios[0]["status_id"] == 1
# Second scenario: failed (results are stored as dicts)
assert scenarios[1]["content"] == "Failed login"
assert scenarios[1]["status_id"] == 5
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_as_bdd_case_elapsed_time(self):
"""Test elapsed time calculation for BDD case"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock validation - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 123, "custom_testrail_bdd_scenario": "Scenario: Test"}
mock_api_handler.client.send_get.return_value = mock_response
feature_with_tag = self.sample_feature.copy()
feature_with_tag["tags"] = [{"name": "@C123"}]
test_case = parser._parse_feature_as_bdd_case(feature_with_tag, project_id=1, suite_id=2)
# Total duration: (1+2+0.5) + (1+2+0.5) = 7 seconds
assert test_case.result.elapsed == "7s"
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_as_bdd_case_not_found(self):
"""Test parsing returns None when case not found"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock no case found (no tag, no title match)
with patch.object(parser, "_find_case_by_title", return_value=None):
test_case = parser._parse_feature_as_bdd_case(self.sample_feature, project_id=1, suite_id=2)
assert test_case is None
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_as_bdd_case_validation_fails(self):
"""Test parsing returns None when validation fails"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock validation failure (not BDD template) - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 123, "custom_testrail_bdd_scenario": None}
mock_api_handler.client.send_get.return_value = mock_response
feature_with_tag = self.sample_feature.copy()
feature_with_tag["tags"] = [{"name": "@C123"}]
test_case = parser._parse_feature_as_bdd_case(feature_with_tag, project_id=1, suite_id=2)
assert test_case is None
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_branching_bdd_mode(self):
"""Test _parse_feature branches correctly to BDD matching mode"""
parser = CucumberParser(self.environment)
mock_api_handler = self._create_mock_api_handler()
parser._api_handler = mock_api_handler
# Mock validation - mock send_get response
mock_response = MagicMock()
mock_response.error_message = None
mock_response.response_text = {"id": 123, "custom_testrail_bdd_scenario": "Scenario: Test"}
mock_api_handler.client.send_get.return_value = mock_response
feature_with_tag = self.sample_feature.copy()
feature_with_tag["tags"] = [{"name": "@C123"}]
# Call with BDD matching mode enabled
sections = parser._parse_feature(feature_with_tag, bdd_matching_mode=True, project_id=1, suite_id=2)
assert len(sections) == 1
assert len(sections[0].testcases) == 1 # One BDD case (not 2 separate scenarios)
assert sections[0].testcases[0].case_id == 123
@pytest.mark.cucumber_bdd_matching
def test_parse_feature_branching_standard_mode(self):
"""Test _parse_feature uses standard mode when bdd_matching_mode=False"""
parser = CucumberParser(self.environment)
# Call with standard mode
sections = parser._parse_feature(self.sample_feature, bdd_matching_mode=False, project_id=None, suite_id=None)
assert len(sections) == 1
assert len(sections[0].testcases) == 2 # Two separate test cases (one per scenario)
import pytest
from pathlib import Path
from trcli.cli import Environment
from trcli.data_classes.data_parsers import MatchersParser
from trcli.readers.cucumber_json import CucumberParser
class TestCucumberParser:
"""Tests for Cucumber JSON parser"""
@pytest.fixture
def sample_cucumber_path(self):
"""Path to the sample Cucumber JSON file"""
return Path(__file__).parent / "test_data" / "CUCUMBER" / "sample_cucumber.json"
@pytest.fixture
def environment(self, sample_cucumber_path):
"""Create a test environment"""
env = Environment()
env.file = str(sample_cucumber_path)
env.case_matcher = MatchersParser.AUTO
env.suite_name = None
env.verbose = False
return env
@pytest.mark.parse_cucumber
def test_cucumber_parser_basic(self, environment, sample_cucumber_path):
"""Test basic Cucumber JSON parsing"""
parser = CucumberParser(environment)
suites = parser.parse_file()
assert len(suites) == 1
suite = suites[0]
# Check suite structure
assert suite.name == "Cucumber Test Results"
assert len(suite.testsections) == 1
# Check section
section = suite.testsections[0]
assert section.name == "User Login"
assert len(section.testcases) == 2
@pytest.mark.parse_cucumber
def test_cucumber_parser_scenarios(self, environment):
"""Test that scenarios are parsed correctly"""
parser = CucumberParser(environment)
suites = parser.parse_file()
section = suites[0].testsections[0]
cases = section.testcases
# First scenario - passed
case1 = cases[0]
assert "Successful login" in case1.title
assert case1.result.status_id == 1 # Passed
assert len(case1.result.custom_step_results) == 5
# Second scenario - failed
case2 = cases[1]
assert "Failed login" in case2.title
assert case2.result.status_id == 5 # Failed
assert len(case2.result.custom_step_results) == 5
@pytest.mark.parse_cucumber
def test_cucumber_parser_steps(self, environment):
"""Test that steps are parsed with correct status"""
parser = CucumberParser(environment)
suites = parser.parse_file()
section = suites[0].testsections[0]
case1 = section.testcases[0]
# Check steps
steps = case1.result.custom_step_results
assert all(step.status_id == 1 for step in steps) # All passed
# Check step content
assert "Given" in steps[0].content
assert "I am on the login page" in steps[0].content
@pytest.mark.parse_cucumber
def test_cucumber_parser_automation_id(self, environment):
"""Test automation ID generation"""
parser = CucumberParser(environment)
suites = parser.parse_file()
section = suites[0].testsections[0]
case1 = section.testcases[0]
# Check automation ID includes feature name, tags, and scenario name
assert case1.custom_automation_id is not None
assert "User Login" in case1.custom_automation_id
assert "@positive" in case1.custom_automation_id
@pytest.mark.parse_cucumber
def test_cucumber_parser_tags(self, environment):
"""Test that tags are extracted correctly"""
parser = CucumberParser(environment)
suites = parser.parse_file()
section = suites[0].testsections[0]
case1 = section.testcases[0]
# Check tags in case_fields
assert "tags" in case1.case_fields
tags_str = case1.case_fields["tags"]
assert "@smoke" in tags_str
assert "@authentication" in tags_str
assert "@positive" in tags_str
@pytest.mark.parse_cucumber
def test_cucumber_generate_feature_file(self, environment):
"""Test .feature file generation"""
parser = CucumberParser(environment)
feature_content = parser.generate_feature_file()
assert feature_content
assert "Feature: User Login" in feature_content
assert "Scenario: Successful login" in feature_content
assert "Scenario: Failed login" in feature_content
assert "Given I am on the login page" in feature_content
assert "@smoke" in feature_content
@pytest.mark.parse_cucumber
def test_cucumber_parser_elapsed_time(self, environment):
"""Test elapsed time calculation"""
parser = CucumberParser(environment)
suites = parser.parse_file()
section = suites[0].testsections[0]
case1 = section.testcases[0]
# Check elapsed time is calculated (may be None if very short duration)
# The proper_format_for_elapsed in TestRailResult may strip very small values
if case1.result.elapsed is not None:
assert case1.result.elapsed.endswith("s")
@pytest.fixture
def advanced_cucumber_path(self):
"""Path to the advanced Cucumber JSON file with Background, Examples, and Rules"""
return Path(__file__).parent / "test_data" / "CUCUMBER" / "sample_cucumber_advanced.json"
@pytest.fixture
def advanced_environment(self, advanced_cucumber_path):
"""Create a test environment for advanced features"""
env = Environment()
env.file = str(advanced_cucumber_path)
env.case_matcher = MatchersParser.AUTO
env.suite_name = None
env.verbose = False
return env
@pytest.mark.parse_cucumber
def test_cucumber_generate_background(self, advanced_environment):
"""Test Background element generation in .feature file"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
assert "Background: User is logged in" in feature_content
assert "Given I am logged in as a customer" in feature_content
assert "And my shopping cart is empty" in feature_content
@pytest.mark.parse_cucumber
def test_cucumber_generate_scenario_outline_with_examples(self, advanced_environment):
"""Test Scenario Outline with Examples table generation"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
# Check Scenario Outline
assert "Scenario Outline: Add items to cart" in feature_content
# Check Examples section
assert "Examples:" in feature_content
assert "| quantity | product | price |" in feature_content
assert "| 1 | Laptop | $1000 |" in feature_content
assert "| 2 | Mouse | $40 |" in feature_content
assert "| 3 | Keyboard | $150 |" in feature_content
# Check Examples tags
assert "@products" in feature_content
@pytest.mark.parse_cucumber
def test_cucumber_generate_rule_with_nested_elements(self, advanced_environment):
"""Test Rule element with nested Background and Scenario"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
# Check Rule
assert "Rule: Payment validation" in feature_content
assert "@validation" in feature_content
# Check nested Background under Rule
assert "Background: Setup payment environment" in feature_content
assert "Given the payment gateway is available" in feature_content
# Check nested Scenario under Rule
assert "Scenario: Valid credit card payment" in feature_content
assert "When I pay with a valid credit card" in feature_content
assert "Then the payment should be approved" in feature_content
@pytest.mark.parse_cucumber
def test_cucumber_advanced_feature_structure(self, advanced_environment):
"""Test complete feature structure with all elements"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
# Check feature tags and name
assert "@shopping" in feature_content
assert "@cart" in feature_content
assert "Feature: Shopping Cart" in feature_content
# Check feature description
assert "As a customer" in feature_content
assert "I want to manage my shopping cart" in feature_content
# Verify proper ordering: Background before Scenarios
background_pos = feature_content.find("Background:")
scenario_outline_pos = feature_content.find("Scenario Outline:")
assert background_pos < scenario_outline_pos, "Background should appear before Scenario Outline"
@pytest.mark.parse_cucumber
def test_cucumber_multiple_features_in_output(self, advanced_environment):
"""Test that multiple features are separated correctly"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
# Should have both features
assert "Feature: Shopping Cart" in feature_content
assert "Feature: Payment Processing" in feature_content
# Features should be separated by double newline
features = feature_content.split("\n\n")
# Should have at least 2 distinct feature sections
feature_count = feature_content.count("Feature:")
assert feature_count == 2, "Should have exactly 2 features"
@pytest.mark.parse_cucumber
def test_cucumber_indentation_in_generated_feature(self, advanced_environment):
"""Test proper indentation in generated .feature file"""
parser = CucumberParser(advanced_environment)
feature_content = parser.generate_feature_file()
lines = feature_content.split("\n")
# Background should be indented with 2 spaces
background_lines = [l for l in lines if "Background:" in l]
assert any(l.startswith(" Background:") for l in background_lines)
# Steps should be indented with 4 spaces
given_lines = [l for l in lines if l.strip().startswith("Given")]
assert any(l.startswith(" Given") for l in given_lines)
# Examples should be indented with 4 spaces
examples_lines = [l for l in lines if "Examples:" in l]
assert any(l.startswith(" Examples:") for l in examples_lines)
"""
Unit tests for BDD-specific JUnit parsing functionality
Tests the --special-parser bdd mode that groups multiple scenarios
into a single TestRail BDD test case.
"""
import pytest
from unittest.mock import Mock, MagicMock, patch
from pathlib import Path
from trcli.cli import Environment
from trcli.readers.junit_xml import JunitParser
from trcli.data_classes.validation_exception import ValidationException
class TestBDDJunitParser:
"""Test BDD mode for JUnit parser"""
@pytest.fixture
def environment(self):
"""Create mock environment for BDD mode"""
env = Mock(spec=Environment)
env.case_matcher = "auto"
env.special_parser = "bdd"
env.suite_name = None
env.file = None # Required by FileParser
env.params_from_config = {} # Required by JunitParser for custom statuses
env.log = Mock()
env.elog = Mock()
env.vlog = Mock()
return env
@pytest.fixture
def mock_api_validation_success(self):
"""Mock successful API validation (case exists and is BDD)"""
with patch("trcli.api.project_based_client.ProjectBasedClient") as mock_client_class:
mock_client = MagicMock()
mock_api_handler = MagicMock()
mock_response = MagicMock()
# Mock successful get_case response with BDD field
mock_response.error_message = ""
mock_response.response_text = {
"id": 42,
"title": "User Enrollment",
"template_id": 4,
"custom_testrail_bdd_scenario": '[{"content":"Scenario 1"}]',
}
mock_api_handler.client.send_get.return_value = mock_response
# Mock BDD field name resolution (returns default names)
mock_api_handler.get_bdd_case_field_name.return_value = "custom_testrail_bdd_scenario"
mock_api_handler.get_bdd_result_field_name.return_value = "custom_testrail_bdd_scenario_results"
mock_client.api_request_handler = mock_api_handler
mock_client_class.return_value = mock_client
yield mock_client
def test_bdd_mode_detection(self, environment):
"""Test that BDD mode is correctly detected"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testsuite_property.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
assert parser._is_bdd_mode() is True
def test_standard_mode_detection(self, environment):
"""Test that standard mode is detected when not BDD"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testsuite_property.xml"
environment.file = str(test_file)
environment.special_parser = "junit"
parser = JunitParser(environment)
assert parser._is_bdd_mode() is False
def test_extract_case_id_from_testsuite_property(self, environment):
"""Test extracting case ID from testsuite property"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testsuite_property.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
# Parse and check case ID extraction
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
case_id = parser._extract_feature_case_id_from_property(testsuite)
assert case_id == 42
def test_extract_case_id_from_testcase_names(self, environment):
"""Test extracting case ID from testcase names"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testcase_names.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
testcase_ids = parser._extract_case_id_from_testcases(testsuite)
assert len(testcase_ids) == 3
assert all(case_id == 42 for _, case_id in testcase_ids)
def test_validate_consistent_case_ids(self, environment):
"""Test validation passes when all scenarios have same case ID"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testcase_names.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
case_id, errors = parser._extract_and_validate_bdd_case_id(testsuite)
assert case_id == 42
assert len(errors) == 0
def test_validate_inconsistent_case_ids_error(self, environment):
"""Test validation fails when scenarios have different case IDs"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_inconsistent_case_ids.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
case_id, errors = parser._extract_and_validate_bdd_case_id(testsuite)
assert case_id is None
assert len(errors) == 1
assert "Multiple different case IDs" in errors[0]
assert "123" in errors[0] and "124" in errors[0] and "125" in errors[0]
def test_validate_no_case_id_error(self, environment):
"""Test validation fails when no case ID found"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_no_case_id.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
case_id, errors = parser._extract_and_validate_bdd_case_id(testsuite)
assert case_id is None
assert len(errors) == 1
assert "No case ID found" in errors[0]
def test_aggregate_all_pass(self, environment):
"""Test status aggregation when all scenarios pass"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
statuses = [1, 1, 1] # All passed
result = parser._aggregate_scenario_statuses(statuses)
assert result == 1 # Passed
def test_aggregate_one_fail(self, environment):
"""Test status aggregation when one scenario fails (fail-fast)"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
statuses = [1, 5, 1] # One failed
result = parser._aggregate_scenario_statuses(statuses)
assert result == 5 # Failed
def test_aggregate_all_skip(self, environment):
"""Test status aggregation when all scenarios skipped"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
statuses = [4, 4, 4] # All skipped
result = parser._aggregate_scenario_statuses(statuses)
assert result == 4 # Skipped
def test_aggregate_pass_and_skip(self, environment):
"""Test status aggregation with pass and skip (no fails)"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
statuses = [1, 4, 1] # Mixed pass/skip
result = parser._aggregate_scenario_statuses(statuses)
assert result == 4 # Skipped (since some not executed)
def test_aggregate_fail_and_skip(self, environment):
"""Test status aggregation with fail and skip"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
statuses = [5, 4, 1] # Mixed fail/skip/pass
result = parser._aggregate_scenario_statuses(statuses)
assert result == 5 # Failed (failure takes precedence)
def test_format_failure_message(self, environment):
"""Test failure message formatting"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
mock_result = Mock()
mock_result.type = "AssertionError"
mock_result.message = "Expected X but got Y"
mock_result.text = "Details about failure"
message = parser._format_failure_message("Test Scenario", mock_result)
assert "Scenario: Test Scenario" in message
assert "Type: AssertionError" in message
assert "Message: Expected X but got Y" in message
assert "Details:\n Details about failure" in message
def test_format_failure_message_truncation(self, environment):
"""Test failure message truncates long text"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
mock_result = Mock()
mock_result.type = "Error"
mock_result.message = "Error"
mock_result.text = "A" * 600 # Long text
message = parser._format_failure_message("Test", mock_result)
assert "... (truncated)" in message
assert len(message) < 700 # Should be truncated
@patch("trcli.api.project_based_client.ProjectBasedClient")
def test_validate_case_exists_success(self, mock_client_class, environment):
"""Test validation passes when case exists and is BDD"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
mock_client = MagicMock()
mock_api_handler = MagicMock()
mock_response = MagicMock()
mock_response.error_message = ""
mock_response.response_text = {
"id": 42,
"title": "Test Feature",
"custom_testrail_bdd_scenario": '[{"content":"..."}]',
}
mock_api_handler.client.send_get.return_value = mock_response
mock_api_handler.get_bdd_case_field_name.return_value = "custom_testrail_bdd_scenario" # Mock field resolution
mock_client.api_request_handler = mock_api_handler
mock_client_class.return_value = mock_client
parser = JunitParser(environment)
is_valid, error_msg, case_data = parser._validate_bdd_case_exists(42, "Test Feature")
assert is_valid is True
assert error_msg == ""
assert case_data["id"] == 42
@patch("trcli.api.project_based_client.ProjectBasedClient")
def test_validate_case_not_exists(self, mock_client_class, environment):
"""Test validation fails when case doesn't exist"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
mock_client = MagicMock()
mock_api_handler = MagicMock()
mock_response = MagicMock()
mock_response.error_message = "Field :case_id is not a valid test case."
mock_api_handler.client.send_get.return_value = mock_response
mock_client.api_request_handler = mock_api_handler
mock_client_class.return_value = mock_client
parser = JunitParser(environment)
is_valid, error_msg, case_data = parser._validate_bdd_case_exists(999, "Test Feature")
assert is_valid is False
assert "does not exist" in error_msg
assert "C999" in error_msg
@patch("trcli.api.project_based_client.ProjectBasedClient")
def test_validate_case_not_bdd(self, mock_client_class, environment):
"""Test validation fails when case is not BDD template"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
mock_client = MagicMock()
mock_api_handler = MagicMock()
mock_response = MagicMock()
mock_response.error_message = ""
mock_response.response_text = {
"id": 42,
"title": "Regular Test Case",
"custom_testrail_bdd_scenario": None, # Not a BDD case
}
mock_api_handler.client.send_get.return_value = mock_response
mock_api_handler.get_bdd_case_field_name.return_value = "custom_testrail_bdd_scenario" # Mock field resolution
mock_client.api_request_handler = mock_api_handler
mock_client_class.return_value = mock_client
parser = JunitParser(environment)
is_valid, error_msg, case_data = parser._validate_bdd_case_exists(42, "Test Feature")
assert is_valid is False
assert "is NOT a BDD test case" in error_msg
assert "custom_testrail_bdd_scenario" in error_msg
def test_parse_bdd_feature_all_pass(self, environment, mock_api_validation_success):
"""Test parsing BDD feature with all scenarios passing"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_all_pass.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
# Mock the case ID to match test data
mock_api_validation_success.api_request_handler.client.send_get.return_value.response_text["id"] = 100
test_case = parser._parse_bdd_feature_as_single_case(testsuite)
assert test_case is not None
assert test_case.case_id == 100
assert test_case.result.status_id == 1 # Passed
# Check BDD scenario results are in result_fields dict
bdd_field_name = "custom_testrail_bdd_scenario_results"
assert bdd_field_name in test_case.result.result_fields
assert len(test_case.result.result_fields[bdd_field_name]) == 2
assert "Total Scenarios: 2" in test_case.result.comment
assert "Passed: 2" in test_case.result.comment
def test_parse_bdd_feature_mixed_results(self, environment, mock_api_validation_success):
"""Test parsing BDD feature with mixed results (pass/fail/skip)"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_mixed_results.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
# Mock the case ID
mock_api_validation_success.api_request_handler.client.send_get.return_value.response_text["id"] = 25293
test_case = parser._parse_bdd_feature_as_single_case(testsuite)
assert test_case is not None
assert test_case.case_id == 25293
assert test_case.result.status_id == 5 # Failed (fail-fast)
# Check BDD scenario results are in result_fields dict
bdd_field_name = "custom_testrail_bdd_scenario_results"
assert bdd_field_name in test_case.result.result_fields
assert len(test_case.result.result_fields[bdd_field_name]) == 3
# Check step statuses (results are stored as dicts in result_fields)
bdd_results = test_case.result.result_fields[bdd_field_name]
assert bdd_results[0]["status_id"] == 1 # Passed
assert bdd_results[1]["status_id"] == 5 # Failed
assert bdd_results[2]["status_id"] == 4 # Skipped
# Check comment contains summary and failure details
assert "Total Scenarios: 3" in test_case.result.comment
assert "Passed: 1" in test_case.result.comment
assert "Failed: 1" in test_case.result.comment
assert "Skipped: 1" in test_case.result.comment
assert "Failure Details:" in test_case.result.comment
assert "Invalid password" in test_case.result.comment
def test_parse_bdd_feature_no_case_id_returns_none(self, environment):
"""Test that parsing returns None when no case ID found"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_no_case_id.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
test_case = parser._parse_bdd_feature_as_single_case(testsuite)
assert test_case is None
environment.elog.assert_called()
def test_parse_bdd_feature_inconsistent_ids_returns_none(self, environment):
"""Test that parsing returns None when case IDs are inconsistent"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_inconsistent_case_ids.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
test_case = parser._parse_bdd_feature_as_single_case(testsuite)
assert test_case is None
environment.elog.assert_called()
@patch("trcli.api.project_based_client.ProjectBasedClient")
def test_parse_bdd_feature_case_not_exists_raises_exception(self, mock_client_class, environment):
"""Test that parsing raises ValidationException when case doesn't exist"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testsuite_property.xml"
environment.file = str(test_file)
mock_client = MagicMock()
mock_api_handler = MagicMock()
mock_response = MagicMock()
mock_response.error_message = "Case not found"
mock_api_handler.client.send_get.return_value = mock_response
mock_client.api_request_handler = mock_api_handler
mock_client_class.return_value = mock_client
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
with pytest.raises(ValidationException) as exc_info:
parser._parse_bdd_feature_as_single_case(testsuite)
assert "case_id" in str(exc_info.value.field_name)
assert "BDD Feature" in str(exc_info.value.class_name)
def test_parse_sections_bdd_mode(self, environment, mock_api_validation_success):
"""Test that _parse_sections uses BDD mode when enabled"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testsuite_property.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file, parse_func=parser._add_root_element_to_tree)
sections = parser._parse_sections(suite)
assert len(sections) == 1
assert len(sections[0].testcases) == 1 # One BDD test case
assert sections[0].testcases[0].case_id == 42
def test_parse_sections_standard_mode(self, environment):
"""Test that _parse_sections uses standard mode when BDD not enabled"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_valid_testcase_names.xml"
environment.file = str(test_file)
environment.special_parser = "junit" # Standard mode
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file, parse_func=parser._add_root_element_to_tree)
sections = parser._parse_sections(suite)
assert len(sections) == 1
# In standard mode, should have 3 separate test cases
assert len(sections[0].testcases) == 3
def test_elapsed_time_calculation(self, environment, mock_api_validation_success):
"""Test that elapsed time is summed correctly from all scenarios"""
test_file = Path(__file__).parent / "test_data" / "XML" / "bdd_mixed_results.xml"
environment.file = str(test_file)
parser = JunitParser(environment)
from junitparser import JUnitXml
suite = JUnitXml.fromfile(test_file)
testsuite = list(suite)[0]
mock_api_validation_success.api_request_handler.client.send_get.return_value.response_text["id"] = 25293
test_case = parser._parse_bdd_feature_as_single_case(testsuite)
assert test_case.result.elapsed == "2s" # 1.0 + 1.5 + 0.0 = 2.5, rounds to 2 (banker's rounding)
if __name__ == "__main__":
pytest.main([__file__, "-v"])
"""
API Response Cache Module
This module provides a session-scoped caching mechanism for API responses
to reduce redundant API calls and improve performance.
The cache is designed to be:
- Thread-safe
- Session-scoped (per ApiRequestHandler instance)
- Backwards compatible (transparent to existing code)
- Memory-efficient (uses LRU eviction)
"""
from functools import lru_cache
from typing import Any, Tuple, Optional, Callable
from threading import Lock
from beartype.typing import List, Dict
class RequestCache:
"""
Session-scoped cache for API responses.
This cache stores API responses during a single command execution session
to avoid redundant API calls. Each ApiRequestHandler instance should have
its own cache instance.
Key features:
- Automatic cache key generation from endpoint and parameters
- LRU eviction policy to prevent unbounded memory growth
- Thread-safe operations
- Simple invalidation mechanism
"""
def __init__(self, max_size: int = 512):
"""
Initialize the request cache.
Args:
max_size: Maximum number of cached responses (default: 512)
"""
self.max_size = max_size
self._cache: Dict[str, Any] = {}
self._lock = Lock()
self._hit_count = 0
self._miss_count = 0
def _make_cache_key(self, endpoint: str, params: Optional[Tuple] = None) -> str:
"""
Generate a unique cache key from endpoint and parameters.
Args:
endpoint: API endpoint (e.g., "get_cases/123")
params: Optional tuple of parameters
Returns:
String cache key
"""
if params is None:
return endpoint
# Convert params to a sorted tuple to ensure consistent keys
if isinstance(params, dict):
params_tuple = tuple(sorted(params.items()))
elif isinstance(params, (list, tuple)):
params_tuple = tuple(params)
else:
params_tuple = (params,)
return f"{endpoint}::{params_tuple}"
def get(self, endpoint: str, params: Optional[Tuple] = None) -> Optional[Any]:
"""
Retrieve a cached response.
Args:
endpoint: API endpoint
params: Optional parameters
Returns:
Cached response or None if not found
"""
cache_key = self._make_cache_key(endpoint, params)
with self._lock:
if cache_key in self._cache:
self._hit_count += 1
return self._cache[cache_key]
else:
self._miss_count += 1
return None
def set(self, endpoint: str, response: Any, params: Optional[Tuple] = None) -> None:
"""
Store a response in the cache.
Args:
endpoint: API endpoint
response: Response to cache
params: Optional parameters
"""
cache_key = self._make_cache_key(endpoint, params)
with self._lock:
# Implement simple LRU: if cache is full, remove oldest entry
if len(self._cache) >= self.max_size:
# Remove the first (oldest) item
first_key = next(iter(self._cache))
del self._cache[first_key]
self._cache[cache_key] = response
def invalidate(self, endpoint: Optional[str] = None, params: Optional[Tuple] = None) -> None:
"""
Invalidate cache entries.
Args:
endpoint: If provided, invalidate only this endpoint.
If None, clear entire cache.
params: Optional parameters to narrow invalidation
"""
with self._lock:
if endpoint is None:
# Clear entire cache
self._cache.clear()
else:
cache_key = self._make_cache_key(endpoint, params)
if cache_key in self._cache:
del self._cache[cache_key]
def invalidate_pattern(self, pattern: str) -> None:
"""
Invalidate all cache entries matching a pattern.
Args:
pattern: String pattern to match against cache keys
"""
with self._lock:
keys_to_delete = [key for key in self._cache if pattern in key]
for key in keys_to_delete:
del self._cache[key]
def get_or_fetch(
self,
endpoint: str,
fetch_func: Callable[[], Tuple[Any, str]],
params: Optional[Tuple] = None,
force_refresh: bool = False,
) -> Tuple[Any, str]:
"""
Get cached response or fetch if not cached.
This is the main method for integrating caching into existing code.
It transparently handles cache hits/misses and maintains the same
return signature as the original fetch functions.
Args:
endpoint: API endpoint
fetch_func: Function to call if cache miss (should return (data, error))
params: Optional parameters for cache key
force_refresh: If True, bypass cache and fetch fresh data
Returns:
Tuple of (data, error_message) matching API call signature
"""
if not force_refresh:
cached = self.get(endpoint, params)
if cached is not None:
# Return cached result
return cached
# Cache miss or force refresh - fetch fresh data
result = fetch_func()
# Only cache successful responses (no error)
data, error = result
if not error:
self.set(endpoint, result, params)
return result
def get_stats(self) -> Dict[str, int]:
"""
Get cache statistics.
Returns:
Dictionary with hit_count, miss_count, size, and hit_rate
"""
with self._lock:
total = self._hit_count + self._miss_count
hit_rate = (self._hit_count / total * 100) if total > 0 else 0.0
return {
"hit_count": self._hit_count,
"miss_count": self._miss_count,
"size": len(self._cache),
"hit_rate": hit_rate,
}
def clear(self) -> None:
"""Clear all cached data and reset statistics."""
with self._lock:
self._cache.clear()
self._hit_count = 0
self._miss_count = 0
"""
API Utilities - Shared utilities for API handlers
This module provides common utilities to reduce code duplication across handlers:
- Reference parsing and validation
- Response validation
- Type definitions for better type safety
"""
from beartype.typing import List, Tuple, Optional, Literal
from typing_extensions import TypedDict
# ============================================================================
# Type Definitions for Better Type Safety
# ============================================================================
class TestRailResponse(TypedDict, total=False):
"""Type definition for TestRail API responses"""
id: int
name: str
title: str
suite_id: int
section_id: int
case_id: int
refs: str
error: str
# Literal types for strategy parameters
ReferenceStrategy = Literal["add", "update", "delete", "append", "replace"]
# ============================================================================
# Reference Utilities
# ============================================================================
def parse_references(refs_string: str) -> List[str]:
"""
Parse a comma-separated reference string into a list of cleaned references.
Args:
refs_string: Comma-separated string of references (e.g., "REF-1, REF-2, REF-3")
Returns:
List of cleaned, non-empty reference strings
Example:
>>> parse_references("REF-1, , REF-2 ,REF-3")
['REF-1', 'REF-2', 'REF-3']
"""
if not refs_string:
return []
return [ref.strip() for ref in refs_string.split(",") if ref.strip()]
def deduplicate_references(references: List[str]) -> List[str]:
"""
Deduplicate a list of references while preserving order.
Args:
references: List of reference strings
Returns:
List of unique references in original order
Example:
>>> deduplicate_references(['REF-1', 'REF-2', 'REF-1', 'REF-3'])
['REF-1', 'REF-2', 'REF-3']
"""
seen = set()
result = []
for ref in references:
ref_clean = ref.strip()
if ref_clean and ref_clean not in seen:
result.append(ref_clean)
seen.add(ref_clean)
return result
def join_references(references: List[str]) -> str:
"""
Join a list of references into a comma-separated string.
Args:
references: List of reference strings
Returns:
Comma-separated string of references
Example:
>>> join_references(['REF-1', 'REF-2', 'REF-3'])
'REF-1,REF-2,REF-3'
"""
return ",".join(references)
def validate_references_length(refs_string: str, max_length: int) -> Tuple[bool, Optional[str]]:
"""
Validate that a reference string doesn't exceed the maximum length.
Args:
refs_string: Comma-separated string of references
max_length: Maximum allowed length
Returns:
Tuple of (is_valid, error_message)
- is_valid: True if length is valid, False otherwise
- error_message: None if valid, error description if invalid
Example:
>>> validate_references_length("REF-1,REF-2", 2000)
(True, None)
>>> validate_references_length("X" * 2001, 2000)
(False, "Combined references length (2001 characters) exceeds 2000 character limit")
"""
length = len(refs_string)
if length > max_length:
return False, f"Combined references length ({length} characters) exceeds {max_length} character limit"
return True, None
def merge_references(existing_refs: str, new_refs: str, strategy: ReferenceStrategy = "add") -> str:
"""
Merge existing and new references based on the specified strategy.
Args:
existing_refs: Current comma-separated references
new_refs: New comma-separated references to merge
strategy: How to merge references:
- 'add'/'append': Add new refs to existing, avoiding duplicates
- 'update'/'replace': Replace all existing refs with new refs
- 'delete': Remove specified refs from existing
Returns:
Merged comma-separated reference string
Examples:
>>> merge_references("REF-1,REF-2", "REF-3,REF-4", "add")
'REF-1,REF-2,REF-3,REF-4'
>>> merge_references("REF-1,REF-2", "REF-3", "update")
'REF-3'
>>> merge_references("REF-1,REF-2,REF-3", "REF-2", "delete")
'REF-1,REF-3'
"""
if strategy in ("update", "replace"):
# Replace all references with new ones
return new_refs
elif strategy == "delete":
if not new_refs:
# Delete all references
return ""
# Delete specific references
existing_list = parse_references(existing_refs)
refs_to_delete = set(parse_references(new_refs))
remaining = [ref for ref in existing_list if ref not in refs_to_delete]
return join_references(remaining)
else: # strategy in ('add', 'append')
# Add new references to existing ones, avoiding duplicates
if not existing_refs:
return new_refs
existing_list = parse_references(existing_refs)
new_list = parse_references(new_refs)
# Combine, avoiding duplicates while preserving order
combined = existing_list + [ref for ref in new_list if ref not in existing_list]
return join_references(combined)
def calculate_reference_changes(existing_refs: str, new_refs: str) -> Tuple[List[str], List[str]]:
"""
Calculate which references will be added and which are duplicates.
Args:
existing_refs: Current comma-separated references
new_refs: New comma-separated references to process
Returns:
Tuple of (added_refs, skipped_refs)
- added_refs: References that will be newly added
- skipped_refs: References that already exist (duplicates)
Example:
>>> calculate_reference_changes("REF-1,REF-2", "REF-2,REF-3")
(['REF-3'], ['REF-2'])
"""
existing_list = parse_references(existing_refs)
new_list = deduplicate_references(parse_references(new_refs))
added_refs = [ref for ref in new_list if ref not in existing_list]
skipped_refs = [ref for ref in new_list if ref in existing_list]
return added_refs, skipped_refs
# ============================================================================
# Response Validation Utilities
# ============================================================================
def check_response_error(response, default_error_msg: str = "API request failed") -> Optional[str]:
"""
Check if a response contains an error and return the error message.
Args:
response: API response object with error_message attribute
default_error_msg: Default message if error_message is empty
Returns:
Error message string if error exists, None otherwise
Example:
>>> response = MockResponse(error_message="Field not found")
>>> check_response_error(response)
'Field not found'
"""
if hasattr(response, "error_message") and response.error_message:
return response.error_message
return None
def validate_response_field(
response_data: dict, field_name: str, error_prefix: str = "Response"
) -> Tuple[bool, Optional[str]]:
"""
Validate that a required field exists in the response data.
Args:
response_data: Dictionary containing response data
field_name: Name of the required field
error_prefix: Prefix for error message
Returns:
Tuple of (is_valid, error_message)
- is_valid: True if field exists, False otherwise
- error_message: None if valid, error description if invalid
Example:
>>> validate_response_field({"id": 123, "name": "Test"}, "id")
(True, None)
>>> validate_response_field({"name": "Test"}, "id")
(False, "Response missing 'id' field")
"""
if field_name in response_data:
return True, None
return False, f"{error_prefix} missing '{field_name}' field"
# ============================================================================
# Common Patterns
# ============================================================================
def safe_get_nested(data: dict, *keys, default=None):
"""
Safely get a nested value from a dictionary.
Args:
data: Dictionary to search
*keys: Sequence of keys to traverse
default: Default value if key path not found
Returns:
Value at the key path, or default if not found
Example:
>>> data = {"user": {"profile": {"name": "John"}}}
>>> safe_get_nested(data, "user", "profile", "name")
'John'
>>> safe_get_nested(data, "user", "invalid", "key", default="N/A")
'N/A'
"""
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return default
return current
"""
BddHandler - Handles all BDD (Behavior-Driven Development) related operations for TestRail
It manages all BDD operations including:
- Uploading .feature files
- Retrieving BDD test cases
- Getting BDD template IDs
- Creating BDD test cases
"""
from beartype.typing import List, Tuple
from trcli.api.api_client import APIClient
from trcli.cli import Environment
class BddHandler:
"""Handles all BDD-related operations for TestRail"""
def __init__(self, client: APIClient, environment: Environment):
"""
Initialize the BddHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
"""
self.client = client
self.environment = environment
def add_bdd(self, section_id: int, feature_content: str) -> Tuple[List[int], str]:
"""
Upload .feature file to TestRail BDD endpoint
Creates TestRail test case from Gherkin .feature content.
The Gherkin content is sent in the request body as plain text.
Args:
section_id: TestRail section ID where test case will be created
feature_content: Raw .feature file content (Gherkin syntax)
Returns:
Tuple of (case_ids, error_message)
- case_ids: List containing the created test case ID
- error_message: Empty string on success, error details on failure
"""
# Send Gherkin content as file upload (multipart/form-data)
# TestRail expects the .feature file as an attachment
self.environment.vlog(f"Uploading .feature file to add_bdd/{section_id}")
files = {"attachment": ("feature.feature", feature_content, "text/plain")}
response = self.client.send_post(f"add_bdd/{section_id}", payload=None, files=files)
if response.status_code == 200:
# Response is a test case object with 'id' field
if isinstance(response.response_text, dict):
case_id = response.response_text.get("id")
if case_id:
return [case_id], ""
else:
return [], "Response missing 'id' field"
else:
return [], "Unexpected response format"
else:
error_msg = response.error_message or f"Failed to upload feature file (HTTP {response.status_code})"
return [], error_msg
def get_bdd(self, case_id: int) -> Tuple[str, str]:
"""
Retrieve BDD test case as .feature file content
Args:
case_id: TestRail test case ID
Returns:
Tuple of (feature_content, error_message)
- feature_content: .feature file content (Gherkin syntax)
- error_message: Empty string on success, error details on failure
"""
self.environment.vlog(f"Retrieving BDD test case from get_bdd/{case_id}")
response = self.client.send_get(f"get_bdd/{case_id}")
if response.status_code == 200:
# TestRail returns raw Gherkin text (not JSON)
# APIClient treats non-JSON as error and stores str(response.content)
if isinstance(response.response_text, dict):
# Some versions might return JSON with 'feature' field
feature_content = response.response_text.get("feature", "")
elif isinstance(response.response_text, str) and response.response_text.startswith("b'"):
# APIClient converted bytes to string representation: "b'text'"
# Need to extract the actual content
try:
# Remove b' prefix and ' suffix, then decode escape sequences
feature_content = response.response_text[2:-1].encode().decode("unicode_escape")
except (ValueError, AttributeError):
feature_content = response.response_text
else:
# Plain text response
feature_content = response.response_text
return feature_content, ""
else:
error_msg = response.error_message or f"Failed to retrieve BDD test case (HTTP {response.status_code})"
return "", error_msg
def get_bdd_template_id(self, project_id: int) -> Tuple[int, str]:
"""
Get the BDD template ID for a project
Args:
project_id: TestRail project ID
Returns:
Tuple of (template_id, error_message)
- template_id: BDD template ID if found, None otherwise
- error_message: Empty string on success, error details on failure
API Endpoint: GET /api/v2/get_templates/{project_id}
"""
self.environment.vlog(f"Getting templates for project {project_id}")
response = self.client.send_get(f"get_templates/{project_id}")
if response.status_code == 200:
templates = response.response_text
if isinstance(templates, list):
self.environment.vlog(f"Retrieved {len(templates)} template(s) from TestRail")
# Log all available templates for debugging
if templates:
self.environment.vlog("Available templates:")
for template in templates:
template_id = template.get("id")
template_name = template.get("name", "")
self.environment.vlog(f" - ID {template_id}: '{template_name}'")
# Look for BDD template by name
for template in templates:
template_name = template.get("name", "").strip()
template_name_lower = template_name.lower()
template_id = template.get("id")
self.environment.vlog(f"Checking template '{template_name}' (ID: {template_id})")
self.environment.vlog(f" Lowercase: '{template_name_lower}'")
# Check for BDD template (support both US and UK spellings)
if (
"behavior" in template_name_lower
or "behaviour" in template_name_lower
or "bdd" in template_name_lower
):
self.environment.vlog(f" ✓ MATCH: This is the BDD template!")
self.environment.log(f"Found BDD template: '{template_name}' (ID: {template_id})")
return template_id, ""
else:
self.environment.vlog(f" ✗ No match: Does not contain 'behavior', 'behaviour', or 'bdd'")
# Build detailed error message with available templates
error_parts = ["BDD template not found. Please enable BDD template in TestRail project settings."]
if templates:
template_list = ", ".join([f"'{t.get('name', 'Unknown')}'" for t in templates])
error_parts.append(f"Available templates: {template_list}")
error_parts.append("The BDD template name should contain 'behavior', 'behaviour', or 'bdd'.")
else:
error_parts.append("No templates are available in this project.")
return None, "\n".join(error_parts)
else:
return None, "Unexpected response format from get_templates"
else:
error_msg = response.error_message or f"Failed to get templates (HTTP {response.status_code})"
return None, error_msg
def add_case_bdd(
self, section_id: int, title: str, bdd_content: str, template_id: int, tags: List[str] = None
) -> Tuple[int, str]:
"""
Create a BDD test case with Gherkin content
Args:
section_id: TestRail section ID where test case will be created
title: Test case title (scenario name)
bdd_content: Gherkin scenario content
template_id: BDD template ID
tags: Optional list of tags (for refs field)
Returns:
Tuple of (case_id, error_message)
- case_id: Created test case ID if successful, None otherwise
- error_message: Empty string on success, error details on failure
"""
self.environment.vlog(f"Creating BDD test case '{title}' in section {section_id}")
# Build request body
# Note: custom_testrail_bdd_scenario expects an array of lines, not a single string
bdd_lines = bdd_content.split("\n") if bdd_content else []
body = {
"title": title,
"template_id": template_id,
"custom_testrail_bdd_scenario": bdd_lines,
}
# Add tags as references if provided
if tags:
# Filter out @C tags (case IDs) and format others
ref_tags = [tag for tag in tags if not tag.upper().startswith("@C")]
if ref_tags:
body["refs"] = ", ".join(ref_tags)
response = self.client.send_post(f"add_case/{section_id}", body)
if response.status_code == 200:
if isinstance(response.response_text, dict):
case_id = response.response_text.get("id")
if case_id:
self.environment.vlog(f"Created BDD test case ID: {case_id}")
return case_id, ""
else:
return None, "Response missing 'id' field"
else:
return None, "Unexpected response format"
else:
error_msg = response.error_message or f"Failed to create BDD test case (HTTP {response.status_code})"
return None, error_msg
"""
CaseHandler - Handles all test case-related operations for TestRail
This class was extracted from ApiRequestHandler to follow the Single Responsibility Principle.
It manages all test case operations including:
- Adding test cases
- Updating case references
- Updating case automation IDs
- Deleting test cases
- Case helper operations
"""
from concurrent.futures import ThreadPoolExecutor
from beartype.typing import List, Tuple, Dict
from trcli.api.api_client import APIClient, APIClientResult
from trcli.api.api_utils import (
deduplicate_references,
join_references,
parse_references,
validate_references_length,
)
from trcli.cli import Environment
from trcli.constants import OLD_SYSTEM_NAME_AUTOMATION_ID, UPDATED_SYSTEM_NAME_AUTOMATION_ID
from trcli.data_classes.data_parsers import MatchersParser
from trcli.data_classes.dataclass_testrail import TestRailCase
from trcli.data_providers.api_data_provider import ApiDataProvider
from trcli.settings import MAX_WORKERS_ADD_CASE
class CaseHandler:
"""Handles all test case-related operations for TestRail"""
MAX_CASE_REFERENCES_LENGTH = 2000 # TestRail character limit for case refs field
def __init__(
self,
client: APIClient,
environment: Environment,
data_provider: ApiDataProvider,
handle_futures_callback,
retrieve_results_callback,
):
"""
Initialize the CaseHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
:param data_provider: Data provider for case data
:param handle_futures_callback: Callback to handle concurrent futures
:param retrieve_results_callback: Callback to retrieve results after cancellation
"""
self.client = client
self.environment = environment
self.data_provider = data_provider
self.handle_futures = handle_futures_callback
self.retrieve_results_after_cancelling = retrieve_results_callback
# Store active automation ID field (set by parent)
self._active_automation_id_field = None
def add_cases(self) -> Tuple[List[dict], str]:
"""
Add cases that doesn't have ID in DataProvider.
Runs update_data in data_provider for successfully created resources.
:returns: Tuple with list of dict created resources and error string.
"""
add_case_data = self.data_provider.add_cases()
responses = []
error_message = ""
with self.environment.get_progress_bar(
results_amount=len(add_case_data), prefix="Adding test cases"
) as progress_bar:
with ThreadPoolExecutor(max_workers=MAX_WORKERS_ADD_CASE) as executor:
futures = {
executor.submit(
self._add_case_and_update_data,
body,
): body
for body in add_case_data
}
responses, error_message = self.handle_futures(
futures=futures, action_string="add_case", progress_bar=progress_bar
)
if error_message:
# When error_message is present we cannot be sure that responses contains all added items.
# Iterate through futures to get all responses from done tasks (not cancelled)
responses = self.retrieve_results_after_cancelling(futures)
returned_resources = [
{
"case_id": response.response_text["id"],
"section_id": response.response_text["section_id"],
"title": response.response_text["title"],
}
for response in responses
]
return returned_resources, error_message
def _add_case_and_update_data(self, case: TestRailCase) -> APIClientResult:
"""
Helper method to add a single case and update its data
:param case: TestRailCase object to add
:returns: APIClientResult
"""
case_body = case.to_dict()
active_field = self._active_automation_id_field
if active_field == UPDATED_SYSTEM_NAME_AUTOMATION_ID and OLD_SYSTEM_NAME_AUTOMATION_ID in case_body:
case_body[UPDATED_SYSTEM_NAME_AUTOMATION_ID] = case_body.pop(OLD_SYSTEM_NAME_AUTOMATION_ID)
if self.environment.case_matcher != MatchersParser.AUTO and OLD_SYSTEM_NAME_AUTOMATION_ID in case_body:
case_body.pop(OLD_SYSTEM_NAME_AUTOMATION_ID)
response = self.client.send_post(f"add_case/{case_body.pop('section_id')}", case_body)
if response.status_code == 200:
case.case_id = response.response_text["id"]
case.result.case_id = response.response_text["id"]
case.section_id = response.response_text["section_id"]
return response
def update_existing_case_references(
self, case_id: int, junit_refs: str, case_fields: dict = None, strategy: str = "append"
) -> Tuple[bool, str, List[str], List[str], List[str]]:
"""
Update existing case references and custom fields with values from JUnit properties.
:param case_id: ID of the test case
:param junit_refs: References from JUnit testrail_case_field property
:param case_fields: Dictionary of custom case fields to update (e.g., {'custom_preconds': 'value'})
:param strategy: 'append' or 'replace' (applies to refs field only)
:returns: Tuple with (success, error_message, added_refs, skipped_refs, updated_fields)
"""
updated_fields = []
# Handle case where there are no refs but there are case fields to update
if (not junit_refs or not junit_refs.strip()) and not case_fields:
return True, None, [], [], [] # Nothing to process
if not junit_refs or not junit_refs.strip():
# No refs to process, but we have case fields to update
new_refs = None
added_refs = []
skipped_refs = []
else:
# Parse and deduplicate JUnit references using utility function
junit_ref_list = deduplicate_references(parse_references(junit_refs))
if not junit_ref_list:
# If we have case fields, continue; otherwise return error
if not case_fields:
return False, "No valid references found in JUnit property", [], [], []
new_refs = None
added_refs = []
skipped_refs = []
else:
# Get current case data
case_response = self.client.send_get(f"get_case/{case_id}")
if case_response.error_message:
return False, case_response.error_message, [], [], []
existing_refs = case_response.response_text.get("refs", "") or ""
if strategy == "replace":
# Replace strategy: use JUnit refs as-is
new_refs = join_references(junit_ref_list)
added_refs = junit_ref_list
skipped_refs = []
else:
# Append strategy: combine with existing refs, avoiding duplicates
existing_ref_list = parse_references(existing_refs)
# Determine which references are new vs duplicates
added_refs = [ref for ref in junit_ref_list if ref not in existing_ref_list]
skipped_refs = [ref for ref in junit_ref_list if ref in existing_ref_list]
# If no new references to add and no case fields, return current state
if not added_refs and not case_fields:
return True, None, added_refs, skipped_refs, []
# Combine references
combined_list = existing_ref_list + added_refs
new_refs = join_references(combined_list)
# Validate 2000 character limit for test case references
if new_refs:
is_valid, error_msg = validate_references_length(new_refs, self.MAX_CASE_REFERENCES_LENGTH)
if not is_valid:
return False, error_msg, [], [], []
# Build update data with refs and custom case fields
update_data = {}
if new_refs is not None:
update_data["refs"] = new_refs
# Add custom case fields to the update
if case_fields:
for field_name, field_value in case_fields.items():
# Skip special internal fields that shouldn't be updated
if field_name not in ["case_id", "section_id", "result"]:
update_data[field_name] = field_value
updated_fields.append(field_name)
# Only update if we have data to send
if not update_data:
return True, None, added_refs, skipped_refs, updated_fields
# Update the case
update_response = self.client.send_post(f"update_case/{case_id}", update_data)
if update_response.error_message:
return False, update_response.error_message, [], [], []
return True, None, added_refs, skipped_refs, updated_fields
def delete_cases(self, suite_id: int, added_cases: List[Dict]) -> Tuple[Dict, str]:
"""
Delete cases given add_cases response
:param suite_id: suite id
:param added_cases: List of cases to delete
:returns: Tuple with dict created resources and error string.
"""
body = {"case_ids": [case["case_id"] for case in added_cases]}
response = self.client.send_post(f"delete_cases/{suite_id}", payload=body)
return response.response_text, response.error_message
def update_case_automation_id(self, case_id: int, automation_id: str) -> Tuple[bool, str]:
"""
Update the automation_id field of a test case
Args:
case_id: TestRail test case ID
automation_id: Automation ID value to set
Returns:
Tuple of (success, error_message)
- success: True if update succeeded, False otherwise
- error_message: Empty string on success, error details on failure
"""
self.environment.vlog(f"Setting automation_id '{automation_id}' on case {case_id}")
update_data = {"custom_automation_id": automation_id}
update_response = self.client.send_post(f"update_case/{case_id}", update_data)
if update_response.status_code == 200:
return True, ""
else:
error_msg = (
update_response.error_message or f"Failed to update automation_id (HTTP {update_response.status_code})"
)
return False, error_msg
"""
CaseMatcherFactory - Strategy pattern implementation for TestRail case matching
Matching Strategies:
- AutomationIdMatcher: Matches cases by automation_id field
- NameMatcher: Matches cases by name (requires case_id in test data)
- PropertyMatcher: Matches cases by custom property (requires case_id in test data)
"""
import html
from abc import ABC, abstractmethod
from beartype.typing import Tuple, List, Dict, Set
from trcli.cli import Environment
from trcli.constants import OLD_SYSTEM_NAME_AUTOMATION_ID, UPDATED_SYSTEM_NAME_AUTOMATION_ID
from trcli.data_classes.data_parsers import MatchersParser
from trcli.data_classes.dataclass_testrail import TestRailSuite
from trcli.data_providers.api_data_provider import ApiDataProvider
class CaseMatcher(ABC):
"""Abstract base class for case matching strategies"""
def __init__(self, environment: Environment, data_provider: ApiDataProvider):
"""
Initialize the case matcher
:param environment: Environment configuration
:param data_provider: Data provider for accessing test case data
"""
self.environment = environment
self.data_provider = data_provider
@abstractmethod
def check_missing_cases(
self,
project_id: int,
suite_id: int,
suites_data: TestRailSuite,
get_all_cases_callback,
validate_case_ids_callback,
) -> Tuple[bool, str]:
"""
Check for missing test cases using the specific matching strategy
:param project_id: TestRail project ID
:param suite_id: TestRail suite ID
:param suites_data: Test suite data from provider
:param get_all_cases_callback: Callback to fetch all cases from TestRail
:param validate_case_ids_callback: Callback to validate case IDs exist
:returns: Tuple (has_missing_cases, error_message)
"""
pass
class AutomationIdMatcher(CaseMatcher):
"""Matches test cases by automation_id field"""
def check_missing_cases(
self,
project_id: int,
suite_id: int,
suites_data: TestRailSuite,
get_all_cases_callback,
validate_case_ids_callback,
) -> Tuple[bool, str]:
"""
Match cases using automation_id field
:param project_id: TestRail project ID
:param suite_id: TestRail suite ID
:param suites_data: Test suite data from provider
:param get_all_cases_callback: Callback to fetch all cases from TestRail
:param validate_case_ids_callback: Callback to validate case IDs exist
:returns: Tuple (has_missing_cases, error_message)
"""
missing_cases_number = 0
# Fetch all cases from TestRail
returned_cases, error_message = get_all_cases_callback(project_id, suite_id)
if error_message:
return False, error_message
# Build lookup dictionary: automation_id -> case data
test_cases_by_aut_id = {}
for case in returned_cases:
aut_case_id = case.get(OLD_SYSTEM_NAME_AUTOMATION_ID) or case.get(UPDATED_SYSTEM_NAME_AUTOMATION_ID)
if aut_case_id:
aut_case_id = html.unescape(aut_case_id)
test_cases_by_aut_id[aut_case_id] = case
# Match test cases from report with TestRail cases
test_case_data = []
for section in suites_data.testsections:
for test_case in section.testcases:
aut_id = test_case.custom_automation_id
if aut_id in test_cases_by_aut_id.keys():
case = test_cases_by_aut_id[aut_id]
test_case_data.append(
{
"case_id": case["id"],
"section_id": case["section_id"],
"title": case["title"],
OLD_SYSTEM_NAME_AUTOMATION_ID: aut_id,
}
)
else:
missing_cases_number += 1
# Update data provider with matched cases
self.data_provider.update_data(case_data=test_case_data)
if missing_cases_number:
self.environment.log(f"Found {missing_cases_number} test cases not matching any TestRail case.")
return missing_cases_number > 0, ""
class IdBasedMatcher(CaseMatcher):
"""Base class for matchers that rely on case_id being present in test data (NAME, PROPERTY)"""
def check_missing_cases(
self,
project_id: int,
suite_id: int,
suites_data: TestRailSuite,
get_all_cases_callback,
validate_case_ids_callback,
) -> Tuple[bool, str]:
"""
Validate that case IDs exist in TestRail
For NAME/PROPERTY matchers, the test data must already contain case_id.
This method validates those IDs exist in TestRail.
:param project_id: TestRail project ID
:param suite_id: TestRail suite ID
:param suites_data: Test suite data from provider
:param get_all_cases_callback: Callback to fetch all cases from TestRail
:param validate_case_ids_callback: Callback to validate case IDs exist
:returns: Tuple (has_missing_cases, error_message)
"""
missing_cases_number = 0
nonexistent_ids = []
case_ids_to_validate = set()
# Collect all unique case IDs that need validation
for section in suites_data.testsections:
for test_case in section.testcases:
if not test_case.case_id:
missing_cases_number += 1
else:
case_ids_to_validate.add(int(test_case.case_id))
total_tests_in_report = missing_cases_number + len(case_ids_to_validate)
if missing_cases_number:
self.environment.log(f"Found {missing_cases_number} test cases without case ID in the report file.")
# Smart validation strategy based on report size
# Threshold: 1000 cases (same as skip validation threshold for consistency)
if case_ids_to_validate:
# Skip validation for large reports with all IDs (most efficient)
if missing_cases_number == 0 and total_tests_in_report >= 1000:
# All tests have IDs and report is large: Skip validation (trust IDs)
self.environment.log(
f"Skipping validation of {len(case_ids_to_validate)} case IDs "
f"(all tests have IDs, trusting they exist). "
f"If you encounter errors, ensure all case IDs in your test report exist in TestRail."
)
nonexistent_ids = []
# Fetch all for large reports with missing IDs
elif total_tests_in_report >= 1000:
# Large report (>=1000 cases) with some missing IDs: Fetch all cases and validate locally
# This is more efficient than individual validation for large batches
self.environment.log(
f"Large report detected ({total_tests_in_report} cases). "
f"Fetching all cases from TestRail for efficient validation..."
)
returned_cases, error_message = get_all_cases_callback(project_id, suite_id)
if error_message:
return False, error_message
# Build lookup dictionary from fetched cases
all_case_ids = {case["id"] for case in returned_cases}
# Validate locally (O(1) lookup)
nonexistent_ids = [cid for cid in case_ids_to_validate if cid not in all_case_ids]
if nonexistent_ids:
self.environment.elog(
f"Nonexistent case IDs found in the report file: {nonexistent_ids[:20]}"
f"{' ...' if len(nonexistent_ids) > 20 else ''}"
)
return False, "Case IDs not in TestRail project or suite were detected in the report file."
# Individual validation for small reports
else:
# Small report (<1000 cases): Use individual validation
# This is more efficient for small batches
self.environment.log(f"Validating {len(case_ids_to_validate)} case IDs exist in TestRail...")
validated_ids = validate_case_ids_callback(suite_id, list(case_ids_to_validate))
nonexistent_ids = [cid for cid in case_ids_to_validate if cid not in validated_ids]
if nonexistent_ids:
self.environment.elog(f"Nonexistent case IDs found in the report file: {nonexistent_ids}")
return False, "Case IDs not in TestRail project or suite were detected in the report file."
return missing_cases_number > 0, ""
class NameMatcher(IdBasedMatcher):
"""Matches test cases by name (case_id must be present in test data)"""
pass
class PropertyMatcher(IdBasedMatcher):
"""Matches test cases by custom property (case_id must be present in test data)"""
pass
class CaseMatcherFactory:
"""Factory for creating appropriate case matcher based on configuration"""
@staticmethod
def create_matcher(
matcher_type: MatchersParser, environment: Environment, data_provider: ApiDataProvider
) -> CaseMatcher:
"""
Create the appropriate case matcher based on the matcher type
:param matcher_type: Type of matcher to create (AUTO, NAME, PROPERTY). If None, defaults to AUTO.
:param environment: Environment configuration
:param data_provider: Data provider for accessing test case data
:returns: Concrete CaseMatcher instance
:raises ValueError: If matcher_type is not recognized
"""
# Default to AUTO if matcher_type is None (e.g., for parse_openapi command)
if matcher_type is None or matcher_type == MatchersParser.AUTO:
return AutomationIdMatcher(environment, data_provider)
elif matcher_type == MatchersParser.NAME:
return NameMatcher(environment, data_provider)
elif matcher_type == MatchersParser.PROPERTY:
return PropertyMatcher(environment, data_provider)
else:
raise ValueError(f"Unknown matcher type: {matcher_type}")
"""
LabelManager - Handles all label-related operations for TestRail
It manages all label operations including:
- Creating, retrieving, updating, and deleting labels
- Adding labels to test cases and tests
- Filtering cases and tests by labels
- Retrieving labels for specific tests
"""
from beartype.typing import List, Union, Tuple, Dict
from trcli.api.api_client import APIClient
from trcli.cli import Environment
class LabelManager:
"""Handles all label-related operations for TestRail"""
MAX_LABELS_PER_ENTITY = 10 # TestRail limit
MAX_LABEL_TITLE_LENGTH = 20 # TestRail limit
def __init__(self, client: APIClient, environment: Environment):
"""
Initialize the LabelManager
:param client: APIClient instance for making API calls
:param environment: Environment configuration
"""
self.client = client
self.environment = environment
def add_label(self, project_id: int, title: str) -> Tuple[dict, str]:
"""
Add a new label to the project
:param project_id: ID of the project
:param title: Title of the label (max 20 characters)
:returns: Tuple with created label data and error string
"""
payload = {"title": title}
response = self.client.send_post(f"add_label/{project_id}", payload=payload)
return response.response_text, response.error_message
def update_label(self, label_id: int, project_id: int, title: str) -> Tuple[dict, str]:
"""
Update an existing label
:param label_id: ID of the label to update
:param project_id: ID of the project
:param title: New title for the label (max 20 characters)
:returns: Tuple with updated label data and error string
"""
payload = {"project_id": project_id, "title": title}
response = self.client.send_post(f"update_label/{label_id}", payload=payload)
return response.response_text, response.error_message
def get_label(self, label_id: int) -> Tuple[dict, str]:
"""
Get a specific label by ID
:param label_id: ID of the label to retrieve
:returns: Tuple with label data and error string
"""
response = self.client.send_get(f"get_label/{label_id}")
return response.response_text, response.error_message
def get_labels(self, project_id: int, offset: int = 0, limit: int = 250) -> Tuple[dict, str]:
"""
Get all labels for a project with pagination
:param project_id: ID of the project
:param offset: Offset for pagination
:param limit: Limit for pagination
:returns: Tuple with labels data (including pagination info) and error string
"""
params = []
if offset > 0:
params.append(f"offset={offset}")
if limit != 250:
params.append(f"limit={limit}")
url = f"get_labels/{project_id}"
if params:
url += "&" + "&".join(params)
response = self.client.send_get(url)
return response.response_text, response.error_message
def delete_label(self, label_id: int) -> Tuple[bool, str]:
"""
Delete a single label
:param label_id: ID of the label to delete
:returns: Tuple with success status and error string
"""
response = self.client.send_post(f"delete_label/{label_id}")
success = response.status_code == 200
return success, response.error_message
def delete_labels(self, label_ids: List[int]) -> Tuple[bool, str]:
"""
Delete multiple labels
:param label_ids: List of label IDs to delete
:returns: Tuple with success status and error string
"""
payload = {"label_ids": label_ids}
response = self.client.send_post("delete_labels", payload=payload)
success = response.status_code == 200
return success, response.error_message
def add_labels_to_cases(
self,
case_ids: List[int],
title: str,
project_id: int,
suite_id: int = None,
get_all_cases_callback=None,
) -> Tuple[dict, str]:
"""
Add a label to multiple test cases
:param case_ids: List of test case IDs
:param title: Label title (max 20 characters)
:param project_id: Project ID for validation
:param suite_id: Suite ID (optional)
:param get_all_cases_callback: Callback function to get all cases (injected dependency)
:returns: Tuple with response data and error string
"""
# Initialize results structure
results = {"successful_cases": [], "failed_cases": [], "max_labels_reached": [], "case_not_found": []}
# Check if project is multi-suite by getting all cases without suite_id
all_cases_no_suite, error_message = get_all_cases_callback(project_id, None)
if error_message:
return results, error_message
# Check if project has multiple suites
suite_ids = set()
for case in all_cases_no_suite:
if "suite_id" in case and case["suite_id"]:
suite_ids.add(case["suite_id"])
# If project has multiple suites and no suite_id provided, require it
if len(suite_ids) > 1 and suite_id is None:
return results, "This project is multisuite, suite id is required"
# Get all cases to validate that the provided case IDs exist
all_cases, error_message = get_all_cases_callback(project_id, suite_id)
if error_message:
return results, error_message
# Create a set of existing case IDs for quick lookup
existing_case_ids = {case["id"] for case in all_cases}
# Validate case IDs and separate valid from invalid ones
invalid_case_ids = [case_id for case_id in case_ids if case_id not in existing_case_ids]
valid_case_ids = [case_id for case_id in case_ids if case_id in existing_case_ids]
# Record invalid case IDs
for case_id in invalid_case_ids:
results["case_not_found"].append(case_id)
# If no valid case IDs, return early
if not valid_case_ids:
return results, ""
# Check if label exists or create it
existing_labels, error_message = self.get_labels(project_id)
if error_message:
return results, error_message
# Find existing label with the same title
label_id = None
for label in existing_labels.get("labels", []):
if label.get("title") == title:
label_id = label.get("id")
break
# Create label if it doesn't exist
if label_id is None:
label_data, error_message = self.add_label(project_id, title)
if error_message:
return results, error_message
label_info = label_data.get("label", label_data)
label_id = label_info.get("id")
# Collect case data and validate constraints
cases_to_update = []
for case_id in valid_case_ids:
# Get current case to check existing labels
case_response = self.client.send_get(f"get_case/{case_id}")
if case_response.status_code != 200:
results["failed_cases"].append(
{"case_id": case_id, "error": f"Could not retrieve case {case_id}: {case_response.error_message}"}
)
continue
case_data = case_response.response_text
current_labels = case_data.get("labels", [])
# Check if label already exists on this case
if any(label.get("id") == label_id for label in current_labels):
results["successful_cases"].append(
{"case_id": case_id, "message": f"Label '{title}' already exists on case {case_id}"}
)
continue
# Check maximum labels limit
if len(current_labels) >= self.MAX_LABELS_PER_ENTITY:
results["max_labels_reached"].append(case_id)
continue
# Prepare case for update
existing_label_ids = [label.get("id") for label in current_labels if label.get("id")]
updated_label_ids = existing_label_ids + [label_id]
cases_to_update.append({"case_id": case_id, "labels": updated_label_ids})
# Update cases using appropriate endpoint
if len(cases_to_update) == 1:
# Single case: use update_case/{case_id}
case_info = cases_to_update[0]
case_update_data = {"labels": case_info["labels"]}
update_response = self.client.send_post(f"update_case/{case_info['case_id']}", payload=case_update_data)
if update_response.status_code == 200:
results["successful_cases"].append(
{
"case_id": case_info["case_id"],
"message": f"Successfully added label '{title}' to case {case_info['case_id']}",
}
)
else:
results["failed_cases"].append(
{"case_id": case_info["case_id"], "error": update_response.error_message}
)
elif len(cases_to_update) > 1:
# Multiple cases: use update_cases/{suite_id}
# Need to determine suite_id from the cases
case_suite_id = suite_id
if not case_suite_id:
# Get suite_id from the first case if not provided
first_case = all_cases[0] if all_cases else None
case_suite_id = first_case.get("suite_id") if first_case else None
if not case_suite_id:
# Fall back to individual updates if no suite_id available
for case_info in cases_to_update:
case_update_data = {"labels": case_info["labels"]}
update_response = self.client.send_post(
f"update_case/{case_info['case_id']}", payload=case_update_data
)
if update_response.status_code == 200:
results["successful_cases"].append(
{
"case_id": case_info["case_id"],
"message": f"Successfully added label '{title}' to case {case_info['case_id']}",
}
)
else:
results["failed_cases"].append(
{"case_id": case_info["case_id"], "error": update_response.error_message}
)
else:
# Batch update using update_cases/{suite_id}
batch_update_data = {
"case_ids": [case_info["case_id"] for case_info in cases_to_update],
"labels": cases_to_update[0]["labels"], # Assuming same labels for all cases
}
batch_response = self.client.send_post(f"update_cases/{case_suite_id}", payload=batch_update_data)
if batch_response.status_code == 200:
for case_info in cases_to_update:
results["successful_cases"].append(
{
"case_id": case_info["case_id"],
"message": f"Successfully added label '{title}' to case {case_info['case_id']}",
}
)
else:
# If batch update fails, fall back to individual updates
for case_info in cases_to_update:
case_update_data = {"labels": case_info["labels"]}
update_response = self.client.send_post(
f"update_case/{case_info['case_id']}", payload=case_update_data
)
if update_response.status_code == 200:
results["successful_cases"].append(
{
"case_id": case_info["case_id"],
"message": f"Successfully added label '{title}' to case {case_info['case_id']}",
}
)
else:
results["failed_cases"].append(
{"case_id": case_info["case_id"], "error": update_response.error_message}
)
return results, ""
def get_cases_by_label(
self,
project_id: int,
suite_id: int = None,
label_ids: List[int] = None,
label_title: str = None,
get_all_cases_callback=None,
) -> Tuple[List[dict], str]:
"""
Get test cases filtered by label ID or title
:param project_id: Project ID
:param suite_id: Suite ID (optional)
:param label_ids: List of label IDs to filter by
:param label_title: Label title to filter by
:param get_all_cases_callback: Callback function to get all cases (injected dependency)
:returns: Tuple with list of matching cases and error string
"""
# Get all cases first
all_cases, error_message = get_all_cases_callback(project_id, suite_id)
if error_message:
return [], error_message
# If filtering by title, first get the label ID
target_label_ids = label_ids or []
if label_title and not target_label_ids:
labels_data, error_message = self.get_labels(project_id)
if error_message:
return [], error_message
for label in labels_data.get("labels", []):
if label.get("title") == label_title:
target_label_ids.append(label.get("id"))
if not target_label_ids:
return [], "" # No label found is a valid case with 0 results
# Filter cases that have any of the target labels
matching_cases = []
for case in all_cases:
case_labels = case.get("labels", [])
case_label_ids = [label.get("id") for label in case_labels]
# Check if any of the target label IDs are present in this case
if any(label_id in case_label_ids for label_id in target_label_ids):
matching_cases.append(case)
return matching_cases, ""
def add_labels_to_tests(
self, test_ids: List[int], titles: Union[str, List[str]], project_id: int
) -> Tuple[dict, str]:
"""
Add labels to multiple tests
:param test_ids: List of test IDs
:param titles: Label title(s) - can be a single string or list of strings (max 20 characters each)
:param project_id: Project ID for validation
:returns: Tuple with response data and error string
"""
# Initialize results structure
results = {"successful_tests": [], "failed_tests": [], "max_labels_reached": [], "test_not_found": []}
# Normalize titles to a list
if isinstance(titles, str):
title_list = [titles]
else:
title_list = titles
# At this point, title_list should already be validated by the CLI
# Just ensure we have clean titles
title_list = [title.strip() for title in title_list if title.strip()]
if not title_list:
return {}, "No valid labels provided"
# Validate test IDs by getting run information for each test
valid_test_ids = []
for test_id in test_ids:
# Get test information to validate it exists
test_response = self.client.send_get(f"get_test/{test_id}")
if test_response.status_code != 200:
results["test_not_found"].append(test_id)
continue
test_data = test_response.response_text
# Validate that the test belongs to the correct project
run_id = test_data.get("run_id")
if run_id:
run_response = self.client.send_get(f"get_run/{run_id}")
if run_response.status_code == 200:
run_data = run_response.response_text
if run_data.get("project_id") == project_id:
valid_test_ids.append(test_id)
else:
results["test_not_found"].append(test_id)
else:
results["test_not_found"].append(test_id)
else:
results["test_not_found"].append(test_id)
# If no valid test IDs, return early
if not valid_test_ids:
return results, ""
# Check if labels exist or create them
existing_labels, error_message = self.get_labels(project_id)
if error_message:
return results, error_message
# Process each title to get/create label IDs
label_ids = []
label_id_to_title = {} # Map label IDs to their titles
for title in title_list:
# Find existing label with the same title
label_id = None
for label in existing_labels.get("labels", []):
if label.get("title") == title:
label_id = label.get("id")
break
# Create label if it doesn't exist
if label_id is None:
label_data, error_message = self.add_label(project_id, title)
if error_message:
return results, error_message
label_info = label_data.get("label", label_data)
label_id = label_info.get("id")
if label_id:
label_ids.append(label_id)
label_id_to_title[label_id] = title
# Collect test data and validate constraints
tests_to_update = []
for test_id in valid_test_ids:
# Get current test to check existing labels
test_response = self.client.send_get(f"get_test/{test_id}")
if test_response.status_code != 200:
results["failed_tests"].append(
{"test_id": test_id, "error": f"Could not retrieve test {test_id}: {test_response.error_message}"}
)
continue
test_data = test_response.response_text
current_labels = test_data.get("labels", [])
current_label_ids = [label.get("id") for label in current_labels if label.get("id")]
new_label_ids = []
already_exists_titles = []
for label_id in label_ids:
if label_id not in current_label_ids:
new_label_ids.append(label_id)
else:
if label_id in label_id_to_title:
already_exists_titles.append(label_id_to_title[label_id])
if not new_label_ids:
results["successful_tests"].append(
{
"test_id": test_id,
"message": f"All labels already exist on test {test_id}: {', '.join(already_exists_titles)}",
}
)
continue
# Check maximum labels limit
if len(current_label_ids) + len(new_label_ids) > self.MAX_LABELS_PER_ENTITY:
results["max_labels_reached"].append(test_id)
continue
# Prepare test for update
updated_label_ids = current_label_ids + new_label_ids
new_label_titles = []
for label_id in new_label_ids:
if label_id in label_id_to_title:
new_label_titles.append(label_id_to_title[label_id])
tests_to_update.append(
{
"test_id": test_id,
"labels": updated_label_ids,
"new_labels": new_label_ids,
"new_label_titles": new_label_titles,
}
)
# Update tests using appropriate endpoint
if len(tests_to_update) == 1:
# Single test: use update_test/{test_id}
test_info = tests_to_update[0]
test_update_data = {"labels": test_info["labels"]}
update_response = self.client.send_post(f"update_test/{test_info['test_id']}", payload=test_update_data)
if update_response.status_code == 200:
new_label_titles = test_info.get("new_label_titles", [])
new_label_count = len(new_label_titles)
if new_label_count == 1:
message = f"Successfully added label '{new_label_titles[0]}' to test {test_info['test_id']}"
elif new_label_count > 1:
message = f"Successfully added {new_label_count} labels ({', '.join(new_label_titles)}) to test {test_info['test_id']}"
else:
message = f"No new labels added to test {test_info['test_id']}"
results["successful_tests"].append({"test_id": test_info["test_id"], "message": message})
else:
results["failed_tests"].append(
{"test_id": test_info["test_id"], "error": update_response.error_message}
)
else:
# Multiple tests: use individual updates to ensure each test gets its specific labels
for test_info in tests_to_update:
test_update_data = {"labels": test_info["labels"]}
update_response = self.client.send_post(f"update_test/{test_info['test_id']}", payload=test_update_data)
if update_response.status_code == 200:
new_label_titles = test_info.get("new_label_titles", [])
new_label_count = len(new_label_titles)
if new_label_count == 1:
message = f"Successfully added label '{new_label_titles[0]}' to test {test_info['test_id']}"
elif new_label_count > 1:
message = f"Successfully added {new_label_count} labels ({', '.join(new_label_titles)}) to test {test_info['test_id']}"
else:
message = f"No new labels added to test {test_info['test_id']}"
results["successful_tests"].append({"test_id": test_info["test_id"], "message": message})
else:
results["failed_tests"].append(
{"test_id": test_info["test_id"], "error": update_response.error_message}
)
return results, ""
def get_tests_by_label(
self, project_id: int, label_ids: List[int] = None, label_title: str = None, run_ids: List[int] = None
) -> Tuple[List[dict], str]:
"""
Get tests filtered by label ID or title from specific runs
:param project_id: Project ID
:param label_ids: List of label IDs to filter by
:param label_title: Label title to filter by
:param run_ids: List of run IDs to filter tests from (optional, defaults to all runs)
:returns: Tuple with list of matching tests and error string
"""
# If filtering by title, first get the label ID
target_label_ids = label_ids or []
if label_title and not target_label_ids:
labels_data, error_message = self.get_labels(project_id)
if error_message:
return [], error_message
for label in labels_data.get("labels", []):
if label.get("title") == label_title:
target_label_ids.append(label.get("id"))
if not target_label_ids:
return [], "" # No label found is a valid case with 0 results
# Get runs for the project (either all runs or specific run IDs)
if run_ids:
# Use specific run IDs - validate they exist by getting run details
runs = []
for run_id in run_ids:
run_response = self.client.send_get(f"get_run/{run_id}")
if run_response.status_code == 200:
runs.append(run_response.response_text)
else:
return [], f"Run ID {run_id} not found or inaccessible"
else:
# Get all runs for the project
runs_response = self.client.send_get(f"get_runs/{project_id}")
if runs_response.status_code != 200:
return [], runs_response.error_message
runs_data = runs_response.response_text
runs = runs_data.get("runs", []) if isinstance(runs_data, dict) else runs_data
# Collect all tests from all runs
matching_tests = []
for run in runs:
run_id = run.get("id")
if not run_id:
continue
# Get tests for this run
tests_response = self.client.send_get(f"get_tests/{run_id}")
if tests_response.status_code != 200:
continue # Skip this run if we can't get tests
tests_data = tests_response.response_text
tests = tests_data.get("tests", []) if isinstance(tests_data, dict) else tests_data
# Filter tests that have any of the target labels
for test in tests:
test_labels = test.get("labels", [])
test_label_ids = [label.get("id") for label in test_labels]
# Check if any of the target label IDs are present in this test
if any(label_id in test_label_ids for label_id in target_label_ids):
matching_tests.append(test)
return matching_tests, ""
def get_test_labels(self, test_ids: List[int]) -> Tuple[List[dict], str]:
"""
Get labels for specific tests
:param test_ids: List of test IDs to get labels for
:returns: Tuple with list of test label information and error string
"""
results = []
for test_id in test_ids:
# Get test information
test_response = self.client.send_get(f"get_test/{test_id}")
if test_response.status_code != 200:
results.append({"test_id": test_id, "error": f"Test {test_id} not found or inaccessible", "labels": []})
continue
test_data = test_response.response_text
test_labels = test_data.get("labels", [])
results.append(
{
"test_id": test_id,
"title": test_data.get("title", "Unknown"),
"status_id": test_data.get("status_id"),
"labels": test_labels,
"error": None,
}
)
return results, ""
"""
ReferenceManager - Handles all reference-related operations for TestRail test cases
It manages all reference operations including:
- Adding references to test cases
- Updating references on test cases
- Deleting references from test cases
"""
from beartype.typing import List, Tuple, Optional
from trcli.api.api_client import APIClient
from trcli.api.api_utils import (
deduplicate_references,
join_references,
merge_references,
validate_references_length,
check_response_error,
)
from trcli.cli import Environment
class ReferenceManager:
"""Handles all reference-related operations for TestRail test cases"""
MAX_REFERENCES_LENGTH = 2000 # TestRail character limit for refs field
def __init__(self, client: APIClient, environment: Environment):
"""
Initialize the ReferenceManager
:param client: APIClient instance for making API calls
:param environment: Environment configuration
"""
self.client = client
self.environment = environment
def add_case_references(self, case_id: int, references: List[str]) -> Tuple[bool, str]:
"""
Add references to a test case (appends to existing references)
:param case_id: ID of the test case
:param references: List of references to add
:returns: Tuple with success status and error string
"""
# Get current test case to retrieve existing references
case_response = self.client.send_get(f"get_case/{case_id}")
if case_response.status_code != 200:
error = check_response_error(case_response)
return False, (
f"Failed to retrieve test case {case_id}: {error}"
if error
else f"Failed to retrieve test case {case_id}"
)
existing_refs = case_response.response_text.get("refs", "") or ""
# Deduplicate and merge with existing references
deduplicated_input = deduplicate_references(references)
new_refs_string = merge_references(existing_refs, join_references(deduplicated_input), strategy="add")
# Validate total character limit
is_valid, error_msg = validate_references_length(new_refs_string, self.MAX_REFERENCES_LENGTH)
if not is_valid:
return False, error_msg
# Update the test case with new references
update_response = self.client.send_post(f"update_case/{case_id}", {"refs": new_refs_string})
if update_response.status_code == 200:
return True, ""
return False, update_response.error_message or "Failed to update references"
def update_case_references(self, case_id: int, references: List[str]) -> Tuple[bool, str]:
"""
Update references on a test case by replacing existing ones
:param case_id: ID of the test case
:param references: List of references to replace existing ones
:returns: Tuple with success status and error string
"""
# Deduplicate and join references
deduplicated_refs = deduplicate_references(references)
new_refs_string = join_references(deduplicated_refs)
# Validate total character limit
is_valid, error_msg = validate_references_length(new_refs_string, self.MAX_REFERENCES_LENGTH)
if not is_valid:
return False, error_msg
# Update the test case with new references
update_response = self.client.send_post(f"update_case/{case_id}", {"refs": new_refs_string})
if update_response.status_code == 200:
return True, ""
return False, update_response.error_message or "Failed to update references"
def delete_case_references(self, case_id: int, specific_references: Optional[List[str]] = None) -> Tuple[bool, str]:
"""
Delete all or specific references from a test case
:param case_id: ID of the test case
:param specific_references: List of specific references to delete (None to delete all)
:returns: Tuple with success status and error string
"""
if specific_references is None:
# Delete all references by setting refs to empty string
new_refs_string = ""
else:
# Get current test case to retrieve existing references
case_response = self.client.send_get(f"get_case/{case_id}")
if case_response.status_code != 200:
error = check_response_error(case_response)
return False, (
f"Failed to retrieve test case {case_id}: {error}"
if error
else f"Failed to retrieve test case {case_id}"
)
existing_refs = case_response.response_text.get("refs", "") or ""
if not existing_refs:
# No references to delete
return True, ""
# Use utility to delete specific references
new_refs_string = merge_references(existing_refs, join_references(specific_references), strategy="delete")
# Update the test case
update_response = self.client.send_post(f"update_case/{case_id}", {"refs": new_refs_string})
if update_response.status_code == 200:
return True, ""
return False, update_response.error_message or "Failed to delete references"
"""
ResultHandler - Handles all test result-related operations for TestRail
It manages all test result operations including:
- Adding test results
- Uploading attachments to results
- Retrieving results after cancellation
"""
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from beartype.typing import List, Tuple, Dict
from trcli.api.api_client import APIClient
from trcli.cli import Environment
from trcli.constants import FAULT_MAPPING
from trcli.data_providers.api_data_provider import ApiDataProvider
from trcli.settings import MAX_WORKERS_ADD_RESULTS
class ResultHandler:
"""Handles all test result-related operations for TestRail"""
def __init__(
self,
client: APIClient,
environment: Environment,
data_provider: ApiDataProvider,
get_all_tests_in_run_callback,
handle_futures_callback,
):
"""
Initialize the ResultHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
:param data_provider: Data provider for result data
:param get_all_tests_in_run_callback: Callback to fetch all tests in a run
:param handle_futures_callback: Callback to handle concurrent futures
"""
self.client = client
self.environment = environment
self.data_provider = data_provider
self.__get_all_tests_in_run = get_all_tests_in_run_callback
self.handle_futures = handle_futures_callback
def upload_attachments(self, report_results: List[Dict], results: List[Dict], run_id: int):
"""
Getting test result id and upload attachments for it.
:param report_results: List of test results with attachments from report
:param results: List of created results from TestRail
:param run_id: Run ID
"""
tests_in_run, error = self.__get_all_tests_in_run(run_id)
if not error:
failed_uploads = []
for report_result in report_results:
case_id = report_result["case_id"]
test_id = next((test["id"] for test in tests_in_run if test["case_id"] == case_id), None)
result_id = next((result["id"] for result in results if result["test_id"] == test_id), None)
for file_path in report_result.get("attachments"):
try:
with open(file_path, "rb") as file:
response = self.client.send_post(
f"add_attachment_to_result/{result_id}", files={"attachment": file}
)
# Check if upload was successful
if response.status_code != 200:
file_name = os.path.basename(file_path)
# Handle 413 Request Entity Too Large specifically
if response.status_code == 413:
error_msg = FAULT_MAPPING["attachment_too_large"].format(
file_name=file_name, case_id=case_id
)
self.environment.elog(error_msg)
failed_uploads.append(f"{file_name} (case {case_id})")
else:
# Handle other HTTP errors
error_msg = FAULT_MAPPING["attachment_upload_failed"].format(
file_path=file_name,
case_id=case_id,
error_message=response.error_message or f"HTTP {response.status_code}",
)
self.environment.elog(error_msg)
failed_uploads.append(f"{file_name} (case {case_id})")
except FileNotFoundError:
self.environment.elog(f"Attachment file not found: {file_path} (case {case_id})")
failed_uploads.append(f"{file_path} (case {case_id})")
except Exception as ex:
file_name = os.path.basename(file_path) if os.path.exists(file_path) else file_path
self.environment.elog(f"Error uploading attachment '{file_name}' for case {case_id}: {ex}")
failed_uploads.append(f"{file_name} (case {case_id})")
# Provide a summary if there were failed uploads
if failed_uploads:
self.environment.log(f"\nWarning: {len(failed_uploads)} attachment(s) failed to upload.")
else:
self.environment.elog(f"Unable to upload attachments due to API request error: {error}")
def add_results(self, run_id: int) -> Tuple[List, str, int]:
"""
Adds one or more new test results.
:param run_id: run id
:returns: Tuple with dict created resources, error string, and results count.
"""
responses = []
error_message = ""
# Get pre-validated user IDs if available
user_ids = getattr(self.environment, "_validated_user_ids", [])
add_results_data_chunks = self.data_provider.add_results_for_cases(self.environment.batch_size, user_ids)
# Get assigned count from data provider
assigned_count = getattr(self.data_provider, "_assigned_count", 0)
results_amount = sum([len(results["results"]) for results in add_results_data_chunks])
with self.environment.get_progress_bar(results_amount=results_amount, prefix="Adding results") as progress_bar:
with ThreadPoolExecutor(max_workers=MAX_WORKERS_ADD_RESULTS) as executor:
futures = {
executor.submit(self.client.send_post, f"add_results_for_cases/{run_id}", body): body
for body in add_results_data_chunks
}
responses, error_message = self.handle_futures(
futures=futures,
action_string="add_results",
progress_bar=progress_bar,
)
if error_message:
# When error_message is present we cannot be sure that responses contains all added items.
# Iterate through futures to get all responses from done tasks (not cancelled)
responses = ResultHandler.retrieve_results_after_cancelling(futures)
responses = [response.response_text for response in responses]
results = [result for results_list in responses for result in results_list]
report_results_w_attachments = []
for results_data_chunk in add_results_data_chunks:
for test_result in results_data_chunk["results"]:
if test_result["attachments"]:
report_results_w_attachments.append(test_result)
if report_results_w_attachments:
attachments_count = 0
for result in report_results_w_attachments:
attachments_count += len(result["attachments"])
self.environment.log(
f"Uploading {attachments_count} attachments " f"for {len(report_results_w_attachments)} test results."
)
self.upload_attachments(report_results_w_attachments, results, run_id)
else:
self.environment.log(f"No attachments found to upload.")
# Log assignment results if assignment was performed
if user_ids:
total_failed = getattr(self.data_provider, "_total_failed_count", assigned_count)
if assigned_count > 0:
self.environment.log(f"Assigning failed results: {assigned_count}/{total_failed}, Done.")
else:
self.environment.log(f"Assigning failed results: 0/0, Done.")
return responses, error_message, progress_bar.n
@staticmethod
def retrieve_results_after_cancelling(futures) -> list:
"""
Retrieve results from futures after cancellation has been triggered.
:param futures: Dictionary of futures
:returns: List of successful responses
"""
responses = []
for future in as_completed(futures):
if not future.cancelled():
response = future.result()
if not response.error_message:
responses.append(response)
return responses
"""
RunHandler - Handles all test run-related operations for TestRail
It manages all test run operations including:
- Creating test runs
- Updating test runs
- Managing run references
- Closing and deleting runs
"""
from beartype.typing import List, Tuple, Dict
from trcli.api.api_client import APIClient
from trcli.api.api_utils import (
deduplicate_references,
join_references,
merge_references,
parse_references,
validate_references_length,
)
from trcli.cli import Environment
from trcli.data_providers.api_data_provider import ApiDataProvider
class RunHandler:
"""Handles all test run-related operations for TestRail"""
MAX_RUN_REFERENCES_LENGTH = 250 # TestRail character limit for run refs field
def __init__(
self,
client: APIClient,
environment: Environment,
data_provider: ApiDataProvider,
get_all_tests_in_run_callback,
):
"""
Initialize the RunHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
:param data_provider: Data provider for run data
:param get_all_tests_in_run_callback: Callback to fetch all tests in a run
"""
self.client = client
self.environment = environment
self.data_provider = data_provider
self.__get_all_tests_in_run = get_all_tests_in_run_callback
def add_run(
self,
project_id: int,
run_name: str,
milestone_id: int = None,
start_date: str = None,
end_date: str = None,
plan_id: int = None,
config_ids: List[int] = None,
assigned_to_id: int = None,
include_all: bool = False,
refs: str = None,
case_ids: List[int] = None,
) -> Tuple[int, str]:
"""
Creates a new test run.
:param project_id: project_id
:param run_name: run name
:param milestone_id: milestone id
:param start_date: start date
:param end_date: end date
:param plan_id: plan id (if adding to plan)
:param config_ids: configuration ids
:param assigned_to_id: user id to assign
:param include_all: include all cases
:param refs: references
:param case_ids: specific case ids
:returns: Tuple with run id and error string.
"""
add_run_data = self.data_provider.add_run(
run_name,
case_ids=case_ids,
start_date=start_date,
end_date=end_date,
milestone_id=milestone_id,
assigned_to_id=assigned_to_id,
include_all=include_all,
refs=refs,
)
if not plan_id:
response = self.client.send_post(f"add_run/{project_id}", add_run_data)
run_id = response.response_text.get("id")
else:
if config_ids:
add_run_data["config_ids"] = config_ids
entry_data = {
"name": add_run_data["name"],
"suite_id": add_run_data["suite_id"],
"config_ids": config_ids,
"runs": [add_run_data],
}
else:
entry_data = add_run_data
response = self.client.send_post(f"add_plan_entry/{plan_id}", entry_data)
run_id = response.response_text["runs"][0]["id"]
return run_id, response.error_message
def update_run(
self,
run_id: int,
run_name: str,
start_date: str = None,
end_date: str = None,
milestone_id: int = None,
refs: str = None,
refs_action: str = "add",
) -> Tuple[dict, str]:
"""
Updates an existing run
:param run_id: run id
:param run_name: run name
:param start_date: start date
:param end_date: end date
:param milestone_id: milestone id
:param refs: references to manage
:param refs_action: action to perform ('add', 'update', 'delete')
:returns: Tuple with run and error string.
"""
run_response = self.client.send_get(f"get_run/{run_id}")
if run_response.error_message:
return None, run_response.error_message
existing_description = run_response.response_text.get("description", "")
existing_refs = run_response.response_text.get("refs", "")
add_run_data = self.data_provider.add_run(
run_name, start_date=start_date, end_date=end_date, milestone_id=milestone_id
)
add_run_data["description"] = existing_description # Retain the current description
# Handle references based on action
if refs is not None:
updated_refs = self._manage_references(existing_refs, refs, refs_action)
add_run_data["refs"] = updated_refs
else:
add_run_data["refs"] = existing_refs # Keep existing refs if none provided
existing_include_all = run_response.response_text.get("include_all", False)
add_run_data["include_all"] = existing_include_all
if not existing_include_all:
# Only manage explicit case_ids when include_all=False
run_tests, error_message = self.__get_all_tests_in_run(run_id)
if error_message:
return None, f"Failed to get tests in run: {error_message}"
run_case_ids = [test["case_id"] for test in run_tests]
report_case_ids = add_run_data["case_ids"]
joint_case_ids = list(set(report_case_ids + run_case_ids))
add_run_data["case_ids"] = joint_case_ids
else:
# include_all=True: TestRail includes all suite cases automatically
# Do NOT send case_ids array (TestRail ignores it anyway)
add_run_data.pop("case_ids", None)
plan_id = run_response.response_text["plan_id"]
config_ids = run_response.response_text["config_ids"]
if not plan_id:
update_response = self.client.send_post(f"update_run/{run_id}", add_run_data)
elif plan_id and config_ids:
update_response = self.client.send_post(f"update_run_in_plan_entry/{run_id}", add_run_data)
else:
response = self.client.send_get(f"get_plan/{plan_id}")
entry_id = next(
(
run["entry_id"]
for entry in response.response_text["entries"]
for run in entry["runs"]
if run["id"] == run_id
),
None,
)
update_response = self.client.send_post(f"update_plan_entry/{plan_id}/{entry_id}", add_run_data)
run_response = self.client.send_get(f"get_run/{run_id}")
return run_response.response_text, update_response.error_message
def _manage_references(self, existing_refs: str, new_refs: str, action: str) -> str:
"""
Manage references based on the specified action.
:param existing_refs: current references in the run
:param new_refs: new references to process
:param action: 'add', 'update', or 'delete'
:returns: updated references string
"""
# Use shared utility function for reference management
return merge_references(existing_refs or "", new_refs, strategy=action)
def append_run_references(self, run_id: int, references: List[str]) -> Tuple[Dict, List[str], List[str], str]:
"""
Append references to a test run, avoiding duplicates.
:param run_id: ID of the test run
:param references: List of references to append
:returns: Tuple with (run_data, added_refs, skipped_refs, error_message)
"""
# Get current run data
run_response = self.client.send_get(f"get_run/{run_id}")
if run_response.error_message:
return None, [], [], run_response.error_message
existing_refs = run_response.response_text.get("refs", "") or ""
# Deduplicate input references using utility function
deduplicated_input = deduplicate_references(references)
# Parse existing references and calculate changes
existing_list = parse_references(existing_refs)
added_refs = [ref for ref in deduplicated_input if ref not in existing_list]
skipped_refs = [ref for ref in deduplicated_input if ref in existing_list]
# If no new references to add, return current state
if not added_refs:
return run_response.response_text, added_refs, skipped_refs, None
# Combine references using utility function
combined_refs = merge_references(existing_refs, join_references(deduplicated_input), strategy="add")
# Validate character limit
is_valid, error_msg = validate_references_length(combined_refs, self.MAX_RUN_REFERENCES_LENGTH)
if not is_valid:
return None, [], [], error_msg
update_data = {"refs": combined_refs}
# Determine the correct API endpoint based on plan membership
plan_id = run_response.response_text.get("plan_id")
config_ids = run_response.response_text.get("config_ids")
if not plan_id:
# Standalone run
update_response = self.client.send_post(f"update_run/{run_id}", update_data)
elif plan_id and config_ids:
# Run in plan with configurations
update_response = self.client.send_post(f"update_run_in_plan_entry/{run_id}", update_data)
else:
# Run in plan without configurations - need to use plan entry endpoint
plan_response = self.client.send_get(f"get_plan/{plan_id}")
if plan_response.error_message:
return None, [], [], f"Failed to get plan details: {plan_response.error_message}"
# Find the entry_id for this run
entry_id = None
for entry in plan_response.response_text.get("entries", []):
for run in entry.get("runs", []):
if run["id"] == run_id:
entry_id = entry["id"]
break
if entry_id:
break
if not entry_id:
return None, [], [], f"Could not find plan entry for run {run_id}"
update_response = self.client.send_post(f"update_plan_entry/{plan_id}/{entry_id}", update_data)
if update_response.error_message:
return None, [], [], update_response.error_message
updated_run_response = self.client.send_get(f"get_run/{run_id}")
return updated_run_response.response_text, added_refs, skipped_refs, updated_run_response.error_message
def close_run(self, run_id: int) -> Tuple[dict, str]:
"""
Closes an existing test run and archives its tests & results.
:param run_id: run id
:returns: Tuple with dict created resources and error string.
"""
body = {"run_id": run_id}
response = self.client.send_post(f"close_run/{run_id}", body)
return response.response_text, response.error_message
def delete_run(self, run_id: int) -> Tuple[dict, str]:
"""
Delete run given run id
:param run_id: run id
:returns: Tuple with dict created resources and error string.
"""
response = self.client.send_post(f"delete_run/{run_id}", payload={})
return response.response_text, response.error_message
"""
SectionHandler - Handles all section-related operations for TestRail
It manages all section operations including:
- Checking for missing sections
- Adding new sections
- Deleting sections
"""
from beartype.typing import List, Tuple, Dict
from trcli.api.api_client import APIClient
from trcli.cli import Environment
from trcli.constants import FAULT_MAPPING
from trcli.data_classes.dataclass_testrail import TestRailSuite
from trcli.data_providers.api_data_provider import ApiDataProvider
class SectionHandler:
"""Handles all section-related operations for TestRail"""
def __init__(
self,
client: APIClient,
environment: Environment,
data_provider: ApiDataProvider,
get_all_sections_callback,
):
"""
Initialize the SectionHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
:param data_provider: Data provider for updating section data
:param get_all_sections_callback: Callback to fetch all sections from TestRail
"""
self.client = client
self.environment = environment
self.data_provider = data_provider
self.__get_all_sections = get_all_sections_callback
def check_missing_section_ids(self, project_id: int, suite_id: int, suites_data: TestRailSuite) -> Tuple[bool, str]:
"""
Check what section id's are missing in DataProvider.
:param project_id: project_id
:param suite_id: suite_id
:param suites_data: Test suite data from provider
:returns: Tuple with list missing section ID and error string.
"""
returned_sections, error_message = self.__get_all_sections(project_id, suite_id)
if not error_message:
missing_test_sections = False
sections_by_id = {section["id"]: section for section in returned_sections}
sections_by_name = {section["name"]: section for section in returned_sections}
section_data = []
for section in suites_data.testsections:
if self.environment.section_id:
if section.section_id in sections_by_id.keys():
section_json = sections_by_id[section.section_id]
section_data.append(
{
"section_id": section_json["id"],
"suite_id": section_json["suite_id"],
"name": section_json["name"],
}
)
else:
missing_test_sections = True
if section.name in sections_by_name.keys():
section_json = sections_by_name[section.name]
section_data.append(
{
"section_id": section_json["id"],
"suite_id": section_json["suite_id"],
"name": section_json["name"],
}
)
else:
missing_test_sections = True
self.data_provider.update_data(section_data=section_data)
return missing_test_sections, error_message
else:
return False, error_message
def add_sections(self, project_id: int, verify_callback) -> Tuple[List[Dict], str]:
"""
Add sections that doesn't have ID in DataProvider.
Runs update_data in data_provider for successfully created resources.
:param project_id: project_id
:param verify_callback: callback to verify returned data matches request
:returns: Tuple with list of dict created resources and error string.
"""
add_sections_data = self.data_provider.add_sections_data()
responses = []
error_message = ""
for body in add_sections_data:
response = self.client.send_post(f"add_section/{project_id}", body)
if not response.error_message:
responses.append(response)
if not verify_callback(body, response.response_text):
responses.append(response)
error_message = FAULT_MAPPING["data_verification_error"]
break
else:
error_message = response.error_message
break
returned_resources = [
{
"section_id": response.response_text["id"],
"suite_id": response.response_text["suite_id"],
"name": response.response_text["name"],
}
for response in responses
]
(
self.data_provider.update_data(section_data=returned_resources)
if len(returned_resources) > 0
else "Update skipped"
)
return returned_resources, error_message
def delete_sections(self, added_sections: List[Dict]) -> Tuple[List, str]:
"""
Delete section given add_sections response
:param added_sections: List of sections to delete
:returns: Tuple with dict created resources and error string.
"""
responses = []
error_message = ""
for section in added_sections:
response = self.client.send_post(f"delete_section/{section['section_id']}", payload={})
if not response.error_message:
responses.append(response.response_text)
else:
error_message = response.error_message
break
return responses, error_message
"""
SuiteHandler - Handles all suite-related operations for TestRail
It manages all suite operations including:
- Checking if suites exist
- Resolving suite IDs by name
- Getting suite IDs for projects
- Adding new suites
- Deleting suites
"""
from beartype.typing import List, Tuple, Dict
from trcli.api.api_client import APIClient
from trcli.cli import Environment
from trcli.constants import FAULT_MAPPING
from trcli.data_providers.api_data_provider import ApiDataProvider
class SuiteHandler:
"""Handles all suite-related operations for TestRail"""
def __init__(
self,
client: APIClient,
environment: Environment,
data_provider: ApiDataProvider,
get_all_suites_callback,
):
"""
Initialize the SuiteHandler
:param client: APIClient instance for making API calls
:param environment: Environment configuration
:param data_provider: Data provider for updating suite data
:param get_all_suites_callback: Callback to fetch all suites from TestRail
"""
self.client = client
self.environment = environment
self.data_provider = data_provider
self.__get_all_suites = get_all_suites_callback
def check_suite_id(self, project_id: int, suite_id: int) -> Tuple[bool, str]:
"""
Check if suite exists using get_suites endpoint
:param project_id: project id
:param suite_id: suite id to check
:returns: Tuple (exists, error_message)
"""
suites_data, error = self.__get_all_suites(project_id)
if not error:
available_suites = [suite for suite in suites_data if suite["id"] == suite_id]
return (
(True, "")
if len(available_suites) > 0
else (False, FAULT_MAPPING["missing_suite"].format(suite_id=suite_id))
)
else:
return None, error
def resolve_suite_id_using_name(self, project_id: int, suite_name: str) -> Tuple[int, str]:
"""
Get suite ID matching suite name or returns -1 if unable to match any suite.
:param project_id: project id
:param suite_name: suite name to match
:returns: tuple with id of the suite and error message
"""
suite_id = -1
suites_data, error = self.__get_all_suites(project_id)
if not error:
for suite in suites_data:
if suite["name"] == suite_name:
suite_id = suite["id"]
self.data_provider.update_data([{"suite_id": suite["id"], "name": suite["name"]}])
break
return (
(suite_id, "")
if suite_id != -1
else (-1, FAULT_MAPPING["missing_suite_by_name"].format(suite_name=suite_name))
)
else:
return -1, error
def get_suite_ids(self, project_id: int) -> Tuple[List[int], str]:
"""
Get suite IDs for requested project_id.
:param project_id: project id
:returns: tuple with list of suite ids and error string
"""
available_suites = []
returned_resources = []
suites_data, error = self.__get_all_suites(project_id)
if not error:
for suite in suites_data:
available_suites.append(suite["id"])
returned_resources.append(
{
"suite_id": suite["id"],
"name": suite["name"],
}
)
if returned_resources:
self.data_provider.update_data(suite_data=returned_resources)
else:
print("Update skipped")
return (
(available_suites, "")
if len(available_suites) > 0
else ([], FAULT_MAPPING["no_suites_found"].format(project_id=project_id))
)
else:
return [], error
def add_suites(self, project_id: int, verify_callback) -> Tuple[List[Dict], str]:
"""
Adds suites that doesn't have ID's in DataProvider.
Runs update_data in data_provider for successfully created resources.
:param project_id: project_id
:param verify_callback: callback to verify returned data matches request
:returns: Tuple with list of dict created resources and error string.
"""
add_suite_data = self.data_provider.add_suites_data()
responses = []
error_message = ""
for body in add_suite_data:
response = self.client.send_post(f"add_suite/{project_id}", body)
if not response.error_message:
responses.append(response)
if not verify_callback(body, response.response_text):
responses.append(response)
error_message = FAULT_MAPPING["data_verification_error"]
break
else:
error_message = response.error_message
break
returned_resources = [
{
"suite_id": response.response_text["id"],
"name": response.response_text["name"],
}
for response in responses
]
(
self.data_provider.update_data(suite_data=returned_resources)
if len(returned_resources) > 0
else "Update skipped"
)
return returned_resources, error_message
def delete_suite(self, suite_id: int) -> Tuple[dict, str]:
"""
Delete suite given suite id
:param suite_id: suite id
:returns: Tuple with dict created resources and error string.
"""
response = self.client.send_post(f"delete_suite/{suite_id}", payload={})
return response.response_text, response.error_message
import click
from pathlib import Path
from trcli.cli import pass_environment, Environment, CONTEXT_SETTINGS
from trcli.constants import FAULT_MAPPING
from trcli.api.api_client import APIClient
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.data_classes.dataclass_testrail import TestRailSuite
import trcli
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option(
"--case-id",
type=click.IntRange(min=1),
metavar="",
required=True,
help="TestRail test case ID to export as .feature file.",
)
@click.option(
"--output",
type=click.Path(),
metavar="",
help="Output path for the .feature file. If not specified, prints to stdout.",
)
@click.pass_context
@pass_environment
def cli(environment: Environment, context: click.Context, case_id: int, output: str, **kwargs):
"""Export BDD test case from TestRail as .feature file
This command retrieves a test case from TestRail's BDD endpoint
and exports it as a Gherkin .feature file.
The test case must have been created via the BDD import functionality
for this command to work.
Mapping Rules (TestRail to .feature):
- Test Case name → Feature:
- Preconditions field → Free text after Feature:
- BDD Scenario field → Background:/Scenario:/Scenario Outline:/Rule:
- Reference field → @Tags before Feature: (@ added)
- BDD field tags → @Tags before scenarios
Examples:
# Export to file
trcli export_gherkin --case-id 456 --output login.feature --project-id 1
# Print to stdout
trcli export_gherkin --case-id 456 --project-id 1
"""
environment.cmd = "export_gherkin"
environment.set_parameters(context)
environment.check_for_required_parameters()
try:
environment.vlog(f"Target case ID: {case_id}")
environment.vlog(f"API endpoint: GET /api/v2/get_bdd/{case_id}")
# Initialize API client
environment.log("Connecting to TestRail...")
# Create APIClient
uploader_metadata = APIClient.build_uploader_metadata(version=trcli.__version__)
api_client = APIClient(
host_name=environment.host,
verify=not environment.insecure,
verbose_logging_function=environment.vlog,
logging_function=environment.log,
uploader_metadata=uploader_metadata,
)
# Set credentials after initialization
api_client.username = environment.username
api_client.password = environment.password
api_client.api_key = environment.key
# Create minimal suite for ApiRequestHandler (BDD operations don't need suite data)
minimal_suite = TestRailSuite(name="BDD Export", testsections=[])
# Create ApiRequestHandler
api_request_handler = ApiRequestHandler(
environment=environment,
api_client=api_client,
suites_data=minimal_suite,
)
# Get BDD test case
environment.log(f"Retrieving BDD test case {case_id}...")
feature_content, error_message = api_request_handler.get_bdd(case_id)
if error_message:
environment.elog(f"Error retrieving test case: {error_message}")
exit(1)
if not feature_content or not feature_content.strip():
environment.elog(f"Error: No BDD content found for case ID {case_id}")
environment.elog("This test case may not have been created via BDD import.")
exit(1)
# Output results
if output:
output_path = Path(output)
if environment.verbose:
environment.log(f"Writing feature file to: {output_path}")
# Create parent directory if it doesn't exist
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(feature_content)
environment.log(f"\n✓ Successfully exported test case {case_id}")
environment.log(f" File: {output_path}")
environment.log(f" Size: {len(feature_content)} characters")
else:
# Print to stdout
print(feature_content)
except PermissionError:
environment.elog(f"Error: Permission denied writing to file: {output}")
exit(1)
except IOError as e:
environment.elog(f"Error writing file: {str(e)}")
exit(1)
except Exception as e:
environment.elog(f"Unexpected error: {str(e)}")
if environment.verbose:
import traceback
environment.elog(traceback.format_exc())
exit(1)
import click
from pathlib import Path
from trcli.cli import pass_environment, Environment, CONTEXT_SETTINGS
from trcli.constants import FAULT_MAPPING
from trcli.api.api_client import APIClient
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.data_classes.dataclass_testrail import TestRailSuite
import trcli
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option(
"-f",
"--file",
type=click.Path(exists=True),
metavar="",
required=True,
help="Path to Gherkin .feature file to upload.",
)
@click.option(
"--section-id",
type=click.IntRange(min=1),
metavar="",
required=False,
help="TestRail section ID where test cases will be created (required for create mode).",
)
@click.option(
"--case-id",
type=click.IntRange(min=1),
metavar="",
required=False,
help="TestRail case ID to update (required with --update flag).",
)
@click.option("--json-output", is_flag=True, help="Output case IDs in JSON format.")
@click.option("--update", is_flag=True, help="Update existing BDD test case instead of creating new one.")
@click.pass_context
@pass_environment
def cli(environment: Environment, context: click.Context, file: str, section_id: int, case_id: int, **kwargs):
"""Upload or update Gherkin .feature file in TestRail
This command uploads a Gherkin/BDD .feature file directly to TestRail,
which will create or update test cases based on the scenarios in the file.
Two modes:
- Create mode (default): Requires --section-id, creates new test case(s)
- Update mode (--update): Requires --case-id, updates existing test case
TestRail will parse the .feature file and automatically create/update test cases
for each scenario, maintaining the BDD structure in TestRail's native format.
Mapping Rules (.feature to TestRail):
- Feature: → Test Case name
- Free text after Feature: → Preconditions field
- Background:/Scenario:/Scenario Outline:/Rule: → BDD Scenario field
- Examples: (under Scenario Outline/Rule) → Same BDD field as parent
- @Tags before Feature: → Reference field (@ stripped)
- @Tags before scenarios → BDD field
Examples:
# Create new test case (requires --section-id)
trcli import_gherkin -f login.feature --section-id 123 --project-id 1
# Update existing test case (requires --case-id)
trcli import_gherkin -f login.feature --case-id 456 --update --project-id 1
"""
environment.cmd = "import_gherkin"
environment.set_parameters(context)
environment.check_for_required_parameters()
json_output = kwargs.get("json_output", False)
update_mode = kwargs.get("update", False)
# Validate mutually exclusive parameters
if update_mode:
if not case_id:
environment.elog("Error: --case-id is required when using --update flag")
exit(1)
if section_id:
environment.elog("Error: --section-id cannot be used with --update flag (use --case-id instead)")
exit(1)
else:
if not section_id:
environment.elog("Error: --section-id is required for create mode")
exit(1)
if case_id:
environment.elog("Error: --case-id can only be used with --update flag")
exit(1)
try:
# Read the feature file
feature_path = Path(file)
if environment.verbose:
environment.log(f"Reading feature file: {feature_path}")
with open(feature_path, "r", encoding="utf-8") as f:
feature_content = f.read()
if not feature_content.strip():
environment.elog("Error: Feature file is empty")
exit(1)
endpoint_name = "update_bdd" if update_mode else "add_bdd"
target_id = case_id if update_mode else section_id
id_type = "case ID" if update_mode else "section ID"
environment.vlog(f"Feature file size: {len(feature_content)} characters")
environment.vlog(f"Target {id_type}: {target_id}")
environment.vlog(f"API endpoint: POST /api/v2/{endpoint_name}/{target_id}")
# Initialize API client
environment.log("Connecting to TestRail...")
# Create APIClient
uploader_metadata = APIClient.build_uploader_metadata(version=trcli.__version__)
api_client = APIClient(
host_name=environment.host,
verify=not environment.insecure,
verbose_logging_function=environment.vlog,
logging_function=environment.log,
uploader_metadata=uploader_metadata,
)
# Set credentials after initialization
api_client.username = environment.username
api_client.password = environment.password
api_client.api_key = environment.key
# Create minimal suite for ApiRequestHandler (BDD operations don't need suite data)
minimal_suite = TestRailSuite(name="BDD Import", testsections=[])
# Create ApiRequestHandler
api_request_handler = ApiRequestHandler(
environment=environment,
api_client=api_client,
suites_data=minimal_suite,
)
# Upload or update feature file based on mode
if update_mode:
if not json_output:
environment.log(f"Updating existing BDD test case (C{case_id}) in TestRail...")
case_ids, error_message = api_request_handler.update_bdd(case_id, feature_content)
else:
if not json_output:
environment.log(f"Uploading feature file to TestRail...")
case_ids, error_message = api_request_handler.add_bdd(section_id, feature_content)
if error_message:
action = "updating" if update_mode else "uploading"
environment.elog(f"Error {action} feature file: {error_message}")
exit(1)
if not case_ids:
action = "updated" if update_mode else "uploaded"
environment.log("Warning: No case IDs returned from TestRail")
environment.log(f"Feature file was {action} but no cases were created/updated.")
exit(0)
# Display results
if kwargs.get("json_output"):
import json
print(json.dumps({"case_ids": case_ids, "count": len(case_ids)}, indent=2))
else:
if update_mode:
environment.log(f"\nSuccessfully updated feature file!")
environment.log(f" Updated {len(case_ids)} test case(s)")
else:
environment.log(f"\nSuccessfully uploaded feature file!")
environment.log(f" Created {len(case_ids)} test case(s)")
environment.log(f" Case IDs: {', '.join(map(str, case_ids))}")
except FileNotFoundError:
environment.elog(f"Error: Feature file not found: {file}")
exit(1)
except PermissionError:
environment.elog(f"Error: Permission denied reading feature file: {file}")
exit(1)
except UnicodeDecodeError:
environment.elog(f"Error: Feature file must be UTF-8 encoded: {file}")
exit(1)
except Exception as e:
environment.elog(f"Unexpected error: {str(e)}")
if environment.verbose:
import traceback
environment.elog(traceback.format_exc())
exit(1)
import click
import json
from trcli.api.results_uploader import ResultsUploader
from trcli.cli import pass_environment, Environment, CONTEXT_SETTINGS
from trcli.commands.results_parser_helpers import bdd_parser_options, print_config
from trcli.constants import FAULT_MAPPING, ProjectErrors
from trcli.data_classes.validation_exception import ValidationException
from trcli.readers.cucumber_json import CucumberParser
@click.command(context_settings=CONTEXT_SETTINGS)
@bdd_parser_options
@click.option(
"-v",
"--verbose",
is_flag=True,
help="Enable verbose logging output.",
)
@click.pass_context
@pass_environment
def cli(environment: Environment, context: click.Context, *args, **kwargs):
"""Parse Cucumber JSON results and upload to TestRail
This command parses Cucumber JSON test results and uploads them to TestRail
using BDD matching mode. Features are matched to TestRail BDD test cases by
feature name only (case-insensitive, whitespace-normalized).
BDD Matching:
- Matches Cucumber features to TestRail BDD test cases by feature name
- Auto-creates missing BDD test cases by default (use -n to disable)
- Sections are auto-created based on feature names
- Does not use automation_id or case-matcher (BDD uses feature name matching only)
"""
environment.cmd = "parse_cucumber"
environment.set_parameters(context)
environment.check_for_required_parameters()
# Set verbose mode if requested
if kwargs.get("verbose"):
environment.verbose = True
print_config(environment)
try:
# Setup API client and handler (needed for both modes)
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.api.api_client import APIClient
import trcli
environment.vlog("Initializing API client...")
uploader_metadata = APIClient.build_uploader_metadata(version=trcli.__version__)
api_client = APIClient(
host_name=environment.host,
verify=not environment.insecure,
verbose_logging_function=environment.vlog,
logging_function=environment.log,
uploader_metadata=uploader_metadata,
)
# Set credentials
api_client.username = environment.username
api_client.password = environment.password
api_client.api_key = environment.key
# Create minimal suite for ApiRequestHandler
from trcli.data_classes.dataclass_testrail import TestRailSuite
minimal_suite = TestRailSuite(name="Cucumber BDD", testsections=[])
if environment.suite_id:
minimal_suite.suite_id = environment.suite_id
# Create ApiRequestHandler
api_handler = ApiRequestHandler(
environment=environment,
api_client=api_client,
suites_data=minimal_suite,
)
# Resolve project to get actual project_id (for use in BDD parsing)
environment.log("Checking project. ", new_line=False)
project_data = api_handler.get_project_data(environment.project, environment.project_id)
# Validate project was found
if project_data.project_id == ProjectErrors.not_existing_project:
environment.elog(f"\n{project_data.error_message}")
exit(1)
elif project_data.project_id == ProjectErrors.other_error:
environment.elog(f"\nError checking project: {project_data.error_message}")
exit(1)
elif project_data.project_id == ProjectErrors.multiple_project_same_name:
environment.elog(f"\nError checking project: {project_data.error_message}")
exit(1)
environment.log("Done.")
resolved_project_id = project_data.project_id
# BDD Matching Mode: Set API handler for validation and caching
parser = CucumberParser(environment)
parser.set_api_handler(api_handler)
# Determine auto-creation behavior:
# - With -n flag (auto_creation_response == False): Only match existing features
# - With -y flag (auto_creation_response == True): Auto-create missing features
# - Without flag (auto_creation_response == None): Auto-create by default for BDD
auto_create = environment.auto_creation_response != False
if environment.auto_creation_response == False:
environment.vlog("Auto-creation disabled: Will only match existing BDD test cases")
else:
environment.vlog("Auto-creation enabled: Will create missing BDD test cases")
parsed_suites = parser.parse_file(
bdd_matching_mode=True,
project_id=resolved_project_id,
suite_id=environment.suite_id,
auto_create=auto_create,
)
# Handle auto-creation of features in BDD matching mode
# auto_creation_response != False means: -y flag OR no flag (default to auto-create)
if environment.auto_creation_response != False:
# Check if there are any features that need to be created (case_id=-1)
features_to_create = []
for suite in parsed_suites:
for section in suite.testsections:
for test_case in section.testcases:
if test_case.case_id == -1:
features_to_create.append({"section": section, "test_case": test_case})
if features_to_create:
environment.log(f"\n=== Auto-Creating {len(features_to_create)} Missing BDD Test Case(s) ===")
# Load Cucumber JSON to access raw feature data
with open(environment.file, "r", encoding="utf-8") as f:
cucumber_data = json.load(f)
# Get BDD template ID
environment.log("Getting BDD template ID...")
bdd_template_id, error_message = api_handler.get_bdd_template_id(resolved_project_id)
if error_message:
environment.elog(f"Error getting BDD template: {error_message}")
exit(1)
environment.vlog(f"Using BDD template ID: {bdd_template_id}")
# Create each missing feature
created_case_ids = {} # Map feature name -> case_id
for feature in cucumber_data:
feature_name = feature.get("name", "Untitled Feature")
normalized_name = parser._normalize_title(feature_name)
# Check if this feature needs creation
needs_creation = any(
parser._normalize_title(item["section"].name) == normalized_name for item in features_to_create
)
if not needs_creation:
continue
# Auto-create or fetch section for this feature
# Use feature name as section name (matching parse behavior)
section_name = feature_name
section_id = None
# Try to find existing section by name
environment.vlog(f"Looking for section '{section_name}'...")
sections, error = api_handler._ApiRequestHandler__get_all_sections(
project_id=resolved_project_id, suite_id=environment.suite_id
)
if error:
environment.elog(f"Error fetching sections: {error}")
exit(1)
for s in sections:
if s.get("name") == section_name:
section_id = s.get("id")
environment.vlog(f" Found existing section ID: {section_id}")
break
# Create section if not found
if section_id is None:
environment.log(f"Creating section '{section_name}'...")
# Use send_post to create section directly
section_body = {"suite_id": environment.suite_id, "name": section_name}
response = api_handler.client.send_post(f"add_section/{resolved_project_id}", section_body)
if response.error_message:
environment.elog(f"Error creating section: {response.error_message}")
exit(1)
section_id = response.response_text.get("id")
environment.vlog(f" Created section ID: {section_id}")
# Generate feature content
environment.vlog(f"Generating .feature file for '{feature_name}'")
feature_content = parser._generate_feature_content(feature)
# Upload feature via add_bdd endpoint
environment.log(f"Uploading feature '{feature_name}'...")
returned_case_ids, error_message = api_handler.add_bdd(
section_id=section_id, feature_content=feature_content
)
if error_message:
environment.elog(f"Error creating BDD test case: {error_message}")
exit(1)
if not returned_case_ids or len(returned_case_ids) == 0:
environment.elog(f"Error: add_bdd did not return a case ID")
exit(1)
case_id = returned_case_ids[0]
created_case_ids[normalized_name] = case_id
environment.log(f"Created case ID: C{case_id}")
environment.log(f"Successfully created {len(created_case_ids)} BDD test case(s)")
environment.vlog("Clearing BDD cache to include newly created cases...")
api_handler._bdd_case_cache.clear()
# Also clear the RequestCache for get_cases so fresh data is fetched
# The RequestCache caches get_cases API responses, so newly created cases
# won't be visible until we invalidate this cache
api_handler._cache.invalidate_pattern(f"get_cases/{resolved_project_id}")
# Re-parse with the newly created case IDs
environment.vlog("Re-parsing to match newly created cases...")
parser_for_results = CucumberParser(environment)
parser_for_results.set_api_handler(api_handler)
# Re-parse in BDD matching mode (cache will rebuild with new cases)
parsed_suites = parser_for_results.parse_file(
bdd_matching_mode=True,
project_id=resolved_project_id,
suite_id=environment.suite_id,
auto_create=False, # No need to mark for creation again
)
environment.vlog(f"Re-parsed successfully with {len(created_case_ids)} newly created case(s)")
# Ensure all suites have suite_id set from environment
for suite in parsed_suites:
if environment.suite_id and not suite.suite_id:
suite.suite_id = environment.suite_id
run_id = None
for suite in parsed_suites:
result_uploader = ResultsUploader(environment=environment, suite=suite)
# Set project to avoid duplicate "Checking project" call
result_uploader.project = project_data
result_uploader.upload_results()
if run_id is None and hasattr(result_uploader, "last_run_id"):
run_id = result_uploader.last_run_id
# Summary
if run_id:
environment.log(f"Results uploaded successfully to run ID: {run_id}")
else:
environment.log("Results processing completed")
except FileNotFoundError:
environment.elog(f"Error: Cucumber JSON file not found: {environment.file}")
exit(1)
except json.JSONDecodeError as e:
environment.elog(f"Error: Invalid JSON format in file: {environment.file}")
environment.elog(f" {str(e)}")
exit(1)
except ValidationException as e:
environment.elog(f"Validation error: {str(e)}")
exit(1)
except ValueError as e:
environment.elog(f"Error parsing Cucumber JSON: {str(e)}")
exit(1)
except Exception as e:
environment.elog(f"Unexpected error: {str(e)}")
if environment.verbose:
import traceback
environment.elog(traceback.format_exc())
exit(1)
"""
TRCLI Logging Module - Core Edition
Zero-dependency, vendor-neutral logging infrastructure for TRCLI.
Simplified to include only essential features for CLI tools.
Provides:
- Structured logging (NDJSON and text formats)
- File output with automatic rotation
- Flexible configuration (file, env vars, CLI flags)
- Credential sanitization
- Correlation ID support
- Zero external dependencies
Usage:
from trcli.logging import get_logger
logger = get_logger("trcli.module")
logger.info("Operation completed", duration=1.5, items=100)
Configuration:
# Via environment variables
export TRCLI_LOG_LEVEL=DEBUG
export TRCLI_LOG_FORMAT=json
export TRCLI_LOG_FILE=/var/log/trcli/app.log
# Via configuration file
from trcli.logging.config import LoggingConfig
LoggingConfig.setup_logging(config_path="trcli_config.yml")
"""
from trcli.logging.structured_logger import LoggerFactory, StructuredLogger, LogLevel
__all__ = [
"LoggerFactory",
"StructuredLogger",
"LogLevel",
"get_logger",
]
def get_logger(name: str) -> StructuredLogger:
"""
Get a logger instance with default configuration.
Args:
name: Logger name (usually module path like "trcli.api.client")
Returns:
StructuredLogger instance
Example:
logger = get_logger("trcli.api")
logger.info("Request completed", status_code=200, duration=1.5)
"""
return LoggerFactory.get_logger(name)
"""
Configuration System - Simple logging configuration for TRCLI
Provides centralized configuration loading from multiple sources
with precedence handling and environment variable substitution.
"""
import os
import sys
import re
from pathlib import Path
from typing import Dict, Any, Optional
class LoggingConfig:
"""
Centralized logging configuration for TRCLI.
Reads from file, environment variables, or CLI flags with
proper precedence handling.
Example configuration file (trcli_config.yml):
logging:
level: INFO
format: json # json or text
output: file # stderr, stdout, file
file_path: /var/log/trcli/app.log
max_bytes: 10485760 # 10MB
backup_count: 5
"""
DEFAULT_CONFIG = {
"level": "INFO",
"format": "json", # json or text
"output": "stderr", # stderr, stdout, file
"file_path": None,
"max_bytes": 10485760, # 10MB
"backup_count": 5,
}
@classmethod
def load(cls, config_path: Optional[str] = None) -> Dict[str, Any]:
"""
Load configuration from multiple sources.
Precedence: CLI > Environment > File > Default
Args:
config_path: Path to configuration file
Returns:
Configuration dictionary
Example:
config = LoggingConfig.load("trcli_config.yml")
"""
config = cls.DEFAULT_CONFIG.copy()
# 1. Load from file
if config_path and Path(config_path).exists():
file_config = cls._load_from_file(config_path)
if file_config and "logging" in file_config:
config.update(file_config["logging"])
# 2. Override with environment variables
config = cls._apply_env_overrides(config)
# 3. Substitute environment variables in values
config = cls._substitute_env_vars(config)
return config
@classmethod
def _load_from_file(cls, config_path: str) -> Optional[Dict[str, Any]]:
"""
Load configuration from YAML file.
Args:
config_path: Path to configuration file
Returns:
Configuration dictionary or None if error
"""
try:
# Try to import yaml
import yaml
with open(config_path) as f:
return yaml.safe_load(f)
except ImportError:
# YAML not available, try simple parsing
sys.stderr.write(
"Warning: PyYAML not installed, using simple config parser. "
"Install PyYAML for full configuration support.\n"
)
return cls._load_simple_config(config_path)
except Exception as e:
sys.stderr.write(f"Error loading config file {config_path}: {e}\n")
return None
@classmethod
def _load_simple_config(cls, config_path: str) -> Optional[Dict[str, Any]]:
"""
Load configuration using simple key=value parser.
Fallback for when PyYAML is not available.
Args:
config_path: Path to configuration file
Returns:
Configuration dictionary or None if error
"""
try:
config = {"logging": {}}
with open(config_path) as f:
for line in f:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith("#"):
continue
# Parse key=value
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
# Convert known numeric values
if key in ["max_bytes", "backup_count"]:
try:
value = int(value)
except ValueError:
pass
config["logging"][key] = value
return config
except Exception as e:
sys.stderr.write(f"Error parsing config file {config_path}: {e}\n")
return None
@classmethod
def _apply_env_overrides(cls, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply environment variable overrides.
Environment variables:
TRCLI_LOG_LEVEL: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
TRCLI_LOG_FORMAT: Output format (json, text)
TRCLI_LOG_OUTPUT: Output destination (stderr, stdout, file)
TRCLI_LOG_FILE: Log file path
TRCLI_LOG_MAX_BYTES: Max log file size before rotation
TRCLI_LOG_BACKUP_COUNT: Number of backup files to keep
Args:
config: Configuration dictionary
Returns:
Updated configuration dictionary
"""
# Simple overrides
env_mappings = {
"TRCLI_LOG_LEVEL": "level",
"TRCLI_LOG_FORMAT": "format",
"TRCLI_LOG_OUTPUT": "output",
"TRCLI_LOG_FILE": "file_path",
}
for env_var, config_key in env_mappings.items():
if env_var in os.environ:
config[config_key] = os.environ[env_var]
# Numeric overrides
if "TRCLI_LOG_MAX_BYTES" in os.environ:
try:
config["max_bytes"] = int(os.environ["TRCLI_LOG_MAX_BYTES"])
except ValueError:
pass
if "TRCLI_LOG_BACKUP_COUNT" in os.environ:
try:
config["backup_count"] = int(os.environ["TRCLI_LOG_BACKUP_COUNT"])
except ValueError:
pass
return config
@classmethod
def _substitute_env_vars(cls, config: Any) -> Any:
"""
Recursively substitute environment variables in configuration.
Supports ${VAR_NAME} syntax.
Example:
file_path: /var/log/${ENVIRONMENT}/trcli.log
With ENVIRONMENT=production, becomes:
file_path: /var/log/production/trcli.log
Args:
config: Configuration value (string, dict, list, etc.)
Returns:
Configuration with substituted values
"""
if isinstance(config, str):
# Substitute environment variables
def replace_env(match):
var_name = match.group(1)
return os.environ.get(var_name, match.group(0))
return re.sub(r"\$\{([^}]+)\}", replace_env, config)
elif isinstance(config, dict):
return {k: cls._substitute_env_vars(v) for k, v in config.items()}
elif isinstance(config, list):
return [cls._substitute_env_vars(item) for item in config]
else:
return config
@classmethod
def setup_logging(cls, config_path: Optional[str] = None, **overrides):
"""
Setup logging based on configuration.
Args:
config_path: Path to configuration file
**overrides: Configuration overrides (e.g., level="DEBUG")
Example:
LoggingConfig.setup_logging(
config_path="trcli_config.yml",
level="DEBUG",
format="text"
)
"""
from trcli.logging.structured_logger import LoggerFactory
from trcli.logging.file_handler import RotatingFileHandler
# Load configuration
config = cls.load(config_path)
config.update(overrides)
# Determine output stream
output_type = config.get("output", "stderr")
if output_type == "stdout":
stream = sys.stdout
elif output_type == "stderr":
stream = sys.stderr
elif output_type == "file":
file_path = config.get("file_path")
if not file_path:
sys.stderr.write("Warning: file output selected but no file_path specified, using stderr\n")
stream = sys.stderr
else:
stream = RotatingFileHandler(
file_path, max_bytes=config.get("max_bytes", 10485760), backup_count=config.get("backup_count", 5)
)
else:
stream = sys.stderr
# Configure logger factory
LoggerFactory.configure(
level=config.get("level", "INFO"), format_style=config.get("format", "json"), stream=stream
)
@classmethod
def validate(cls, config: Dict[str, Any]) -> tuple:
"""
Validate configuration.
Args:
config: Configuration dictionary
Returns:
Tuple of (is_valid, error_message)
Example:
is_valid, error = LoggingConfig.validate(config)
if not is_valid:
print(f"Invalid configuration: {error}")
"""
# Validate log level
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
level = config.get("level", "INFO").upper()
if level not in valid_levels:
return False, f"Invalid log level '{level}'. Must be one of: {', '.join(valid_levels)}"
# Validate format
valid_formats = ["json", "text"]
format_style = config.get("format", "json")
if format_style not in valid_formats:
return False, f"Invalid format '{format_style}'. Must be one of: {', '.join(valid_formats)}"
# Validate output
valid_outputs = ["stderr", "stdout", "file"]
output = config.get("output", "stderr")
if output not in valid_outputs:
return False, f"Invalid output '{output}'. Must be one of: {', '.join(valid_outputs)}"
# Validate file output config
if output == "file" and not config.get("file_path"):
return False, "file_path required when output is 'file'"
return True, ""
"""
File Handler - Zero-dependency rotating file handler for TRCLI
Provides file output with automatic rotation based on file size,
without requiring any external dependencies.
Features:
- Automatic log rotation when file reaches max size
- Configurable number of backup files
- Thread-safe write operations
- Automatic directory creation
- Zero external dependencies (Python stdlib only)
Usage:
from trcli.logging.file_handler import RotatingFileHandler
handler = RotatingFileHandler(
filepath="/var/log/trcli/app.log",
max_bytes=10485760, # 10MB
backup_count=5
)
handler.write("Log message\n")
handler.close()
"""
import os
from pathlib import Path
from threading import Lock
from typing import Optional
class RotatingFileHandler:
"""
Simple rotating file handler without external dependencies.
Rotates log files when they reach a specified size, keeping a
configurable number of backup files.
Example:
handler = RotatingFileHandler("/var/log/trcli/app.log", max_bytes=10485760)
handler.write('{"timestamp": "2024-01-20", "message": "Test"}\n')
handler.close()
"""
def __init__(
self, filepath: str, max_bytes: int = 10485760, backup_count: int = 5, encoding: str = "utf-8" # 10MB
):
"""
Initialize rotating file handler.
Args:
filepath: Path to log file
max_bytes: Maximum file size before rotation (default: 10MB)
backup_count: Number of backup files to keep (default: 5)
encoding: File encoding (default: utf-8)
"""
self.filepath = Path(filepath)
self.max_bytes = max_bytes
self.backup_count = backup_count
self.encoding = encoding
self._file = None
self._lock = Lock()
self._ensure_directory()
def _ensure_directory(self):
"""Create log directory if it doesn't exist"""
self.filepath.parent.mkdir(parents=True, exist_ok=True)
def write(self, content: str):
"""
Write content to file with automatic rotation.
Args:
content: Content to write (should include newline if needed)
Example:
handler.write("Log entry\n")
"""
with self._lock:
# Check if rotation needed before writing
if self._should_rotate():
self._rotate()
# Open file if not already open
if self._file is None or self._file.closed:
self._file = open(self.filepath, "a", encoding=self.encoding)
# Write content
self._file.write(content)
self._file.flush()
os.fsync(self._file.fileno()) # Ensure data is written to disk
def _should_rotate(self) -> bool:
"""
Check if file should be rotated based on size.
Returns:
True if rotation needed, False otherwise
"""
# If file doesn't exist, no rotation needed
if not self.filepath.exists():
return False
# Check file size
try:
file_size = self.filepath.stat().st_size
return file_size >= self.max_bytes
except OSError:
# If we can't check size, assume no rotation needed
return False
def _rotate(self):
"""
Rotate log files.
Closes current file, renames existing backup files, and
moves current file to .1.
Rotation pattern:
app.log -> app.log.1
app.log.1 -> app.log.2
app.log.2 -> app.log.3
...
app.log.N -> deleted (if N >= backup_count)
"""
# Close current file
if self._file and not self._file.closed:
self._file.close()
self._file = None
# Delete oldest backup if it exists
oldest_backup = Path(f"{self.filepath}.{self.backup_count}")
if oldest_backup.exists():
try:
oldest_backup.unlink()
except OSError:
pass # Ignore errors deleting old backups
# Rotate existing backup files
for i in range(self.backup_count - 1, 0, -1):
src = Path(f"{self.filepath}.{i}")
dst = Path(f"{self.filepath}.{i + 1}")
if src.exists():
try:
src.replace(dst)
except OSError:
pass # Ignore errors during rotation
# Move current file to .1
if self.filepath.exists():
try:
self.filepath.replace(Path(f"{self.filepath}.1"))
except OSError:
pass # Ignore errors moving current file
def flush(self):
"""
Flush file buffer.
Ensures all buffered data is written to disk.
"""
with self._lock:
if self._file and not self._file.closed:
self._file.flush()
os.fsync(self._file.fileno())
def close(self):
"""
Close file handle.
Should be called when done writing to ensure data is flushed.
"""
with self._lock:
if self._file and not self._file.closed:
self._file.flush()
self._file.close()
self._file = None
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
def __del__(self):
"""Destructor - ensure file is closed"""
try:
self.close()
except Exception:
pass # Ignore errors in destructor
class MultiFileHandler:
"""
Write to multiple files simultaneously.
Useful for writing to multiple locations (e.g., local file + shared storage).
Example:
handler = MultiFileHandler([
RotatingFileHandler("/var/log/trcli/app.log"),
RotatingFileHandler("/mnt/shared/logs/app.log")
])
handler.write("Log entry\n")
"""
def __init__(self, handlers: list):
"""
Initialize multi-file handler.
Args:
handlers: List of file handlers
"""
self.handlers = handlers
def write(self, content: str):
"""
Write content to all handlers.
Args:
content: Content to write
"""
for handler in self.handlers:
try:
handler.write(content)
except Exception:
# Continue writing to other handlers even if one fails
pass
def flush(self):
"""Flush all handlers"""
for handler in self.handlers:
try:
handler.flush()
except Exception:
pass
def close(self):
"""Close all handlers"""
for handler in self.handlers:
try:
handler.close()
except Exception:
pass
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
"""
Structured Logger - Zero-dependency structured logging for TRCLI
Provides structured logging with NDJSON (Newline-Delimited JSON) output format,
compatible with all major log aggregation platforms (ELK, Splunk, CloudWatch, etc.)
Features:
- Standard log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- Structured fields (queryable, filterable, aggregatable)
- Correlation ID support for request tracing
- Context propagation (automatic field inheritance)
- Human-readable and JSON output formats
- Zero external dependencies (Python stdlib only)
Usage:
from trcli.logging.structured_logger import LoggerFactory
logger = LoggerFactory.get_logger("trcli.api")
# Simple logging
logger.info("Operation completed", duration=1.5, count=100)
# With correlation context
ctx_logger = logger.with_context(correlation_id="abc-123")
ctx_logger.info("Processing started")
ctx_logger.info("Processing finished")
"""
import json
import sys
import traceback
from datetime import datetime, timezone
from typing import Dict, Any, Optional, TextIO
from enum import IntEnum
class LogLevel(IntEnum):
"""Standard log levels compatible with Python logging and syslog"""
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
class StructuredLogger:
"""
Zero-dependency structured logger using standard Python libraries only.
Outputs NDJSON format compatible with all major observability platforms.
Example:
logger = StructuredLogger("trcli.api", level=LogLevel.INFO)
logger.info("Request completed", status_code=200, duration_ms=150)
# Output:
# {"timestamp":"2024-01-20T10:15:30.123456Z","level":"INFO","logger":"trcli.api","message":"Request completed","status_code":200,"duration_ms":150}
"""
def __init__(
self, name: str, level: LogLevel = LogLevel.INFO, output_stream: TextIO = None, format_style: str = "json"
):
"""
Initialize structured logger.
Args:
name: Logger name (usually module path)
level: Minimum log level to output
output_stream: Output stream (default: sys.stderr)
format_style: Output format - "json" or "text"
"""
self.name = name
self.level = level
self.output_stream = output_stream or sys.stderr
self.format_style = format_style
self._context: Dict[str, Any] = {}
self._sensitive_keys = {
"password",
"passwd",
"pwd",
"secret",
"token",
"api_key",
"apikey",
"authorization",
"auth",
"credential",
"key",
}
def _should_log(self, level: LogLevel) -> bool:
"""Check if message should be logged based on level"""
return level.value >= self.level.value
def _sanitize_value(self, key: str, value: Any) -> Any:
"""
Sanitize sensitive values to prevent credential leakage.
Args:
key: Field name
value: Field value
Returns:
Sanitized value (original or masked)
"""
# Check if key contains sensitive terms
key_lower = str(key).lower()
for sensitive_key in self._sensitive_keys:
if sensitive_key in key_lower:
# Mask sensitive data
if isinstance(value, str):
if len(value) <= 4:
return "***"
# Show first 2 and last 2 chars
return f"{value[:2]}***{value[-2:]}"
return "***REDACTED***"
return value
def _format_log(self, level: LogLevel, message: str, extra: Optional[Dict[str, Any]] = None) -> str:
"""
Format log entry according to configured style.
Args:
level: Log level
message: Log message
extra: Additional structured fields
Returns:
Formatted log string
"""
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": level.name,
"logger": self.name,
"message": message,
}
# Add context (correlation IDs, etc.)
if self._context:
for key, value in self._context.items():
log_entry[key] = self._sanitize_value(key, value)
# Add extra fields
if extra:
for key, value in extra.items():
log_entry[key] = self._sanitize_value(key, value)
if self.format_style == "json":
# NDJSON: one JSON object per line
return json.dumps(log_entry, default=str)
else:
# Human-readable format
timestamp = log_entry["timestamp"]
level_str = f"[{log_entry['level']}]".ljust(10)
logger_str = self.name.ljust(20)
msg = log_entry["message"]
# Format extra fields
extra_parts = []
for key, value in log_entry.items():
if key not in ["timestamp", "level", "logger", "message"]:
extra_parts.append(f"{key}={value}")
extra_str = ""
if extra_parts:
extra_str = " | " + " ".join(extra_parts)
return f"{timestamp} {level_str} {logger_str} | {msg}{extra_str}"
def _write(self, level: LogLevel, message: str, extra: Optional[Dict[str, Any]] = None, exc_info: bool = False):
"""
Write log entry to output stream.
Args:
level: Log level
message: Log message
extra: Additional structured fields
exc_info: Include exception traceback
"""
if not self._should_log(level):
return
# Add exception info if requested
if exc_info:
if extra is None:
extra = {}
exc_type, exc_value, exc_tb = sys.exc_info()
if exc_type is not None:
extra["exception"] = {
"type": exc_type.__name__,
"message": str(exc_value),
"traceback": "".join(traceback.format_tb(exc_tb)),
}
log_line = self._format_log(level, message, extra)
try:
self.output_stream.write(log_line + "\n")
self.output_stream.flush()
except Exception:
# Fallback to stderr if output stream fails
if self.output_stream != sys.stderr:
sys.stderr.write(f"Logging error: failed to write to output stream\n")
sys.stderr.write(log_line + "\n")
def debug(self, message: str, **extra):
"""
Log debug message.
Args:
message: Log message
**extra: Additional structured fields
Example:
logger.debug("Processing item", item_id=123, status="pending")
"""
self._write(LogLevel.DEBUG, message, extra)
def info(self, message: str, **extra):
"""
Log info message.
Args:
message: Log message
**extra: Additional structured fields
Example:
logger.info("Operation completed", duration=1.5, items=100)
"""
self._write(LogLevel.INFO, message, extra)
def warning(self, message: str, **extra):
"""
Log warning message.
Args:
message: Log message
**extra: Additional structured fields
Example:
logger.warning("Slow operation detected", duration=30.5, threshold=10.0)
"""
self._write(LogLevel.WARNING, message, extra)
def error(self, message: str, exc_info: bool = False, **extra):
"""
Log error message.
Args:
message: Log message
exc_info: Include exception traceback
**extra: Additional structured fields
Example:
logger.error("Upload failed", exc_info=True, run_id=12345)
"""
self._write(LogLevel.ERROR, message, extra, exc_info=exc_info)
def critical(self, message: str, exc_info: bool = False, **extra):
"""
Log critical message.
Args:
message: Log message
exc_info: Include exception traceback
**extra: Additional structured fields
Example:
logger.critical("System failure", exc_info=True, component="api")
"""
self._write(LogLevel.CRITICAL, message, extra, exc_info=exc_info)
def with_context(self, **context) -> "StructuredLogger":
"""
Return a logger with additional context fields.
Context fields are automatically added to all log entries from
the returned logger instance.
Args:
**context: Context fields to add
Returns:
New logger instance with context
Example:
ctx_logger = logger.with_context(correlation_id="abc-123", user="admin")
ctx_logger.info("Request started")
ctx_logger.info("Request completed")
# Both logs will include correlation_id and user fields
"""
new_logger = StructuredLogger(self.name, self.level, self.output_stream, self.format_style)
new_logger._context = {**self._context, **context}
new_logger._sensitive_keys = self._sensitive_keys
return new_logger
def set_context(self, **context):
"""
Set context for all subsequent logs from this logger.
Args:
**context: Context fields to add
Example:
logger.set_context(request_id="req-123")
logger.info("Processing")
# Log will include request_id
"""
self._context.update(context)
def clear_context(self):
"""
Clear all context fields.
Example:
logger.clear_context()
"""
self._context = {}
def set_level(self, level: LogLevel):
"""
Set minimum log level.
Args:
level: New log level
Example:
logger.set_level(LogLevel.DEBUG)
"""
self.level = level
class LoggerFactory:
"""
Factory for creating loggers with consistent configuration.
Provides centralized configuration for all loggers in the application.
"""
_default_level = LogLevel.INFO
_default_format = "json"
_default_stream = sys.stderr
_loggers: Dict[str, StructuredLogger] = {}
@classmethod
def configure(cls, level: str = "INFO", format_style: str = "json", stream: TextIO = None):
"""
Configure default logger settings.
Args:
level: Log level name (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format_style: Output format - "json" or "text"
stream: Output stream (default: sys.stderr)
Example:
LoggerFactory.configure(level="DEBUG", format_style="text")
"""
level_upper = level.upper()
if level_upper in LogLevel.__members__:
cls._default_level = LogLevel[level_upper]
else:
raise ValueError(f"Invalid log level: {level}. Must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
if format_style not in ["json", "text"]:
raise ValueError(f"Invalid format style: {format_style}. Must be 'json' or 'text'")
cls._default_format = format_style
if stream:
cls._default_stream = stream
# Update existing loggers
for logger in cls._loggers.values():
logger.level = cls._default_level
logger.format_style = cls._default_format
logger.output_stream = cls._default_stream
@classmethod
def get_logger(cls, name: str) -> StructuredLogger:
"""
Get a logger instance with default configuration.
Returns cached logger if already created for this name.
Args:
name: Logger name (usually module path)
Returns:
StructuredLogger instance
Example:
logger = LoggerFactory.get_logger("trcli.api")
"""
if name not in cls._loggers:
cls._loggers[name] = StructuredLogger(name, cls._default_level, cls._default_stream, cls._default_format)
return cls._loggers[name]
@classmethod
def reset(cls):
"""
Reset factory to defaults and clear all cached loggers.
Useful for testing.
"""
cls._default_level = LogLevel.INFO
cls._default_format = "json"
cls._default_stream = sys.stderr
cls._loggers = {}
import json
from pathlib import Path
from beartype.typing import List, Dict, Any, Optional, Tuple
from trcli.cli import Environment
from trcli.data_classes.data_parsers import MatchersParser, TestRailCaseFieldsOptimizer
from trcli.data_classes.dataclass_testrail import (
TestRailCase,
TestRailSuite,
TestRailSection,
TestRailResult,
TestRailSeparatedStep,
)
from trcli.readers.file_parser import FileParser
class CucumberParser(FileParser):
"""Parser for Cucumber JSON results format"""
def __init__(self, environment: Environment):
super().__init__(environment)
self.case_matcher = environment.case_matcher
self._bdd_case_cache = None # Cache for BDD cases (populated on first use)
self._api_handler = None # Will be set when BDD matching mode is needed
def parse_file(
self,
bdd_matching_mode: bool = False,
project_id: Optional[int] = None,
suite_id: Optional[int] = None,
auto_create: bool = False,
) -> List[TestRailSuite]:
"""Parse Cucumber JSON results file and convert to TestRailSuite structure
Args:
bdd_matching_mode: If True, use BDD matching mode (group scenarios under existing BDD cases)
project_id: TestRail project ID (required for BDD matching mode)
suite_id: TestRail suite ID (required for BDD matching mode)
auto_create: If True, mark features for auto-creation when not found (BDD matching mode only)
Returns:
List of TestRailSuite objects with test cases and results
"""
self.env.log(f"Parsing Cucumber JSON file: {self.filename}")
if bdd_matching_mode:
self.env.log("Using BDD matching mode (matching against existing BDD test cases)")
if not project_id or not suite_id:
raise ValueError("project_id and suite_id are required for BDD matching mode")
# Read and parse the JSON file
with open(self.filepath, "r", encoding="utf-8") as f:
cucumber_data = json.load(f)
# Cucumber JSON is typically an array of features
if not isinstance(cucumber_data, list):
raise ValueError("Cucumber JSON must be an array of features")
# Parse features into TestRail structure
sections = []
for feature in cucumber_data:
feature_sections = self._parse_feature(feature, bdd_matching_mode, project_id, suite_id, auto_create)
sections.extend(feature_sections)
# Generate appropriate message based on mode
if bdd_matching_mode:
# In BDD matching mode: count scenarios from original data
scenario_count = sum(
sum(
1
for element in feature.get("elements", [])
if element.get("type", "") in ("scenario", "scenario_outline")
)
for feature in cucumber_data
)
feature_word = "feature file" if len(cucumber_data) == 1 else "feature files"
self.env.log(f"Processed {scenario_count} scenarios in {len(cucumber_data)} {feature_word}.")
else:
# Standard mode: count test cases and sections
cases_count = sum(len(section.testcases) for section in sections)
self.env.log(f"Processed {cases_count} test cases in {len(sections)} sections.")
# Create suite
suite_name = self.env.suite_name if self.env.suite_name else "Cucumber Test Results"
testrail_suite = TestRailSuite(
name=suite_name,
testsections=sections,
source=self.filename,
)
return [testrail_suite]
def _parse_feature(
self,
feature: Dict[str, Any],
bdd_matching_mode: bool = False,
project_id: Optional[int] = None,
suite_id: Optional[int] = None,
auto_create: bool = False,
) -> List[TestRailSection]:
"""Parse a single Cucumber feature into TestRail sections
Args:
feature: Feature object from Cucumber JSON
bdd_matching_mode: If True, parse as single BDD case (group scenarios)
project_id: TestRail project ID (required for BDD matching mode)
suite_id: TestRail suite ID (required for BDD matching mode)
auto_create: If True, mark cases for auto-creation when not found
Returns:
List of TestRailSection objects
"""
feature_name = feature.get("name", "Untitled Feature")
feature_tags = self._extract_tags(feature.get("tags", []))
# Create a section for this feature
section = TestRailSection(name=feature_name, testcases=[])
# Branch: BDD matching mode vs. standard mode
if bdd_matching_mode:
# BDD Matching Mode: Parse feature as single BDD case with grouped scenarios
test_case = self._parse_feature_as_bdd_case(feature, project_id, suite_id, auto_create)
if test_case:
section.testcases.append(test_case)
else:
# Standard Mode: Parse each scenario as separate test case
for element in feature.get("elements", []):
element_type = element.get("type", "")
if element_type in ("scenario", "scenario_outline"):
test_case = self._parse_scenario(element, feature_name, feature_tags)
if test_case:
section.testcases.append(test_case)
return [section] if section.testcases else []
def _parse_scenario(
self, scenario: Dict[str, Any], feature_name: str, feature_tags: List[str]
) -> Optional[TestRailCase]:
"""Parse a Cucumber scenario into TestRailCase
Args:
scenario: Scenario object from Cucumber JSON
feature_name: Name of the parent feature
feature_tags: Tags from the parent feature
Returns:
TestRailCase object or None
"""
scenario_name = scenario.get("name", "Untitled Scenario")
scenario_tags = self._extract_tags(scenario.get("tags", []))
all_tags = feature_tags + scenario_tags
# Build automation ID
automation_id = self._build_automation_id(feature_name, all_tags, scenario_name)
# Extract case ID if using matcher
case_id = None
if self.case_matcher == MatchersParser.NAME:
case_id, scenario_name = MatchersParser.parse_name_with_id(scenario_name)
elif self.case_matcher == MatchersParser.PROPERTY:
# Look for @C<id> tag pattern
for tag in all_tags:
if tag.startswith("@C") or tag.startswith("@c"):
try:
case_id = int(tag[2:])
break
except ValueError:
pass
# Parse steps and determine overall status
steps = scenario.get("steps", [])
step_results, overall_status = self._parse_steps(steps)
# Calculate elapsed time
elapsed_time = self._calculate_elapsed_time(steps)
# Build comment from failures
comment = self._build_comment_from_failures(steps)
# Create result object
result = TestRailResult(
case_id=case_id,
status_id=overall_status,
comment=comment,
elapsed=elapsed_time,
custom_step_results=step_results,
)
# Create test case
test_case = TestRailCase(
title=TestRailCaseFieldsOptimizer.extract_last_words(
scenario_name, TestRailCaseFieldsOptimizer.MAX_TESTCASE_TITLE_LENGTH
),
case_id=case_id,
result=result,
custom_automation_id=automation_id,
case_fields={"tags": ", ".join(all_tags)} if all_tags else {},
)
return test_case
def _parse_steps(self, steps: List[Dict[str, Any]]) -> tuple:
"""Parse Cucumber steps into TestRail step results
Args:
steps: List of step objects from Cucumber JSON
Returns:
Tuple of (list of TestRailSeparatedStep, overall_status_id)
"""
step_results = []
overall_status = 1 # Passed by default
for step in steps:
keyword = step.get("keyword", "").strip()
step_name = step.get("name", "")
step_content = f"{keyword} {step_name}".strip()
# Determine step status
result = step.get("result", {})
result_status = result.get("status", "").lower()
# Map Cucumber status to TestRail status ID
# 1=Passed, 3=Untested, 4=Skipped, 5=Failed
if result_status == "passed":
step_status_id = 1
elif result_status == "failed":
step_status_id = 5
overall_status = 5 # Test failed
elif result_status == "skipped":
step_status_id = 4
if overall_status == 1: # Only update if not already failed
overall_status = 4
elif result_status == "pending":
step_status_id = 3
if overall_status == 1:
overall_status = 3
elif result_status == "undefined":
step_status_id = 3
if overall_status == 1:
overall_status = 3
else:
step_status_id = 3
# Create step result
tr_step = TestRailSeparatedStep(content=step_content)
tr_step.status_id = step_status_id
step_results.append(tr_step)
return step_results, overall_status
def _calculate_elapsed_time(self, steps: List[Dict[str, Any]]) -> Optional[str]:
"""Calculate total elapsed time from steps
Args:
steps: List of step objects
Returns:
Elapsed time string or None
"""
total_duration = 0
for step in steps:
result = step.get("result", {})
duration = result.get("duration", 0)
if duration:
total_duration += duration
if total_duration > 0:
# Convert nanoseconds to seconds
total_seconds = total_duration / 1_000_000_000
# Always return at least 1s if there was any duration
if total_seconds >= 1:
return f"{round(total_seconds)}s"
else:
return "1s"
return None
def _build_comment_from_failures(self, steps: List[Dict[str, Any]]) -> str:
"""Build comment string from failed steps
Args:
steps: List of step objects
Returns:
Comment string describing failures
"""
failures = []
for step in steps:
result = step.get("result", {})
if result.get("status", "").lower() == "failed":
keyword = step.get("keyword", "").strip()
step_name = step.get("name", "")
error_message = result.get("error_message", "")
failure_text = f"Failed: {keyword} {step_name}"
if error_message:
failure_text += f"\n Error: {error_message}"
failures.append(failure_text)
return "\n\n".join(failures) if failures else ""
def _extract_tags(self, tags: List[Dict[str, str]]) -> List[str]:
"""Extract tag names from Cucumber tag objects
Args:
tags: List of tag objects with 'name' field
Returns:
List of tag name strings
"""
return [tag.get("name", "") for tag in tags if tag.get("name")]
def _build_automation_id(self, feature_name: str, tags: List[str], scenario_name: str) -> str:
"""Build automation ID from feature, tags, and scenario name
Args:
feature_name: Feature name
tags: List of tags
scenario_name: Scenario name
Returns:
Automation ID string
"""
parts = [feature_name]
# Add tags if present
if tags:
parts.extend(tags)
# Add scenario name
parts.append(scenario_name)
return ".".join(parts)
def generate_feature_file(self) -> str:
"""Generate .feature file content from parsed Cucumber JSON
This reconstructs Gherkin syntax from the Cucumber JSON results.
Useful for creating/updating BDD test cases in TestRail.
Returns:
Feature file content as string
"""
with open(self.filepath, "r", encoding="utf-8") as f:
cucumber_data = json.load(f)
if not isinstance(cucumber_data, list) or not cucumber_data:
return ""
# Generate feature files (one per feature in JSON)
feature_files = []
for feature in cucumber_data:
feature_content = self._generate_feature_content(feature)
if feature_content:
feature_files.append(feature_content)
return "\n\n".join(feature_files)
def generate_scenario_gherkin(self, feature: Dict[str, Any], scenario: Dict[str, Any]) -> Tuple[str, List[str]]:
"""Generate Gherkin content for a single scenario with feature context
This creates a complete .feature file containing just one scenario,
including the feature header, tags, and description.
Args:
feature: Feature object from Cucumber JSON
scenario: Scenario object from Cucumber JSON
Returns:
Tuple of (gherkin_content, all_tags)
- gherkin_content: Complete Gherkin .feature file for single scenario
- all_tags: List of all tags (feature + scenario)
"""
lines = []
# Collect all tags (feature + scenario)
feature_tags = self._extract_tags(feature.get("tags", []))
scenario_tags = self._extract_tags(scenario.get("tags", []))
all_tags = feature_tags + scenario_tags
# Feature tags
if feature_tags:
lines.append(" ".join(feature_tags))
# Feature header
feature_name = feature.get("name", "Untitled Feature")
feature_description = feature.get("description", "")
lines.append(f"Feature: {feature_name}")
if feature_description:
for desc_line in feature_description.split("\n"):
if desc_line.strip():
lines.append(f" {desc_line.strip()}")
lines.append("") # Empty line after feature header
# Background (if exists in feature) - include for context
background = None
for element in feature.get("elements", []):
if element.get("type") == "background":
background = element
break
if background:
background_content = self._generate_background_content(background)
if background_content:
lines.append(background_content)
lines.append("")
# Scenario tags
if scenario_tags:
lines.append(" " + " ".join(scenario_tags))
# Scenario content
scenario_type = scenario.get("type", "scenario")
scenario_name = scenario.get("name", "Untitled Scenario")
if scenario_type == "scenario_outline":
lines.append(f" Scenario Outline: {scenario_name}")
else:
lines.append(f" Scenario: {scenario_name}")
# Steps
for step in scenario.get("steps", []):
keyword = step.get("keyword", "").strip()
step_name = step.get("name", "")
lines.append(f" {keyword} {step_name}")
# Examples table (for Scenario Outline)
if scenario_type == "scenario_outline":
examples = scenario.get("examples", [])
if examples:
for example_group in examples:
lines.append("") # Empty line before examples
# Examples tags (if any)
example_tags = self._extract_tags(example_group.get("tags", []))
if example_tags:
lines.append(" " + " ".join(example_tags))
# Examples keyword
lines.append(" Examples:")
# Examples table
rows = example_group.get("rows", [])
if rows:
for row in rows:
cells = row.get("cells", [])
if cells:
row_content = " | ".join(cells)
lines.append(f" | {row_content} |")
return "\n".join(lines), all_tags
def _generate_feature_content(self, feature: Dict[str, Any]) -> str:
"""Generate Gherkin feature content from Cucumber feature object
Args:
feature: Feature object from Cucumber JSON
Returns:
Gherkin feature content as string
"""
lines = []
# Feature tags
feature_tags = self._extract_tags(feature.get("tags", []))
if feature_tags:
lines.append(" ".join(feature_tags))
# Feature header
feature_name = feature.get("name", "Untitled Feature")
feature_description = feature.get("description", "")
lines.append(f"Feature: {feature_name}")
if feature_description:
for desc_line in feature_description.split("\n"):
if desc_line.strip():
lines.append(f" {desc_line.strip()}")
lines.append("") # Empty line after feature header
# Process elements in order: Background first, then scenarios/rules
for element in feature.get("elements", []):
element_type = element.get("type", "")
if element_type == "background":
background_content = self._generate_background_content(element)
if background_content:
lines.append(background_content)
lines.append("") # Empty line after background
elif element_type in ("scenario", "scenario_outline"):
scenario_content = self._generate_scenario_content(element)
if scenario_content:
lines.append(scenario_content)
lines.append("") # Empty line between scenarios
elif element_type == "rule":
rule_content = self._generate_rule_content(element)
if rule_content:
lines.append(rule_content)
lines.append("") # Empty line after rule
return "\n".join(lines)
def _generate_scenario_content(self, scenario: Dict[str, Any]) -> str:
"""Generate Gherkin scenario content
Args:
scenario: Scenario object from Cucumber JSON
Returns:
Gherkin scenario content as string
"""
lines = []
# Scenario tags
scenario_tags = self._extract_tags(scenario.get("tags", []))
if scenario_tags:
lines.append(" " + " ".join(scenario_tags))
# Scenario header
scenario_type = scenario.get("type", "scenario")
scenario_name = scenario.get("name", "Untitled Scenario")
if scenario_type == "scenario_outline":
lines.append(f" Scenario Outline: {scenario_name}")
else:
lines.append(f" Scenario: {scenario_name}")
# Steps
for step in scenario.get("steps", []):
keyword = step.get("keyword", "").strip()
step_name = step.get("name", "")
lines.append(f" {keyword} {step_name}")
# Examples table (for Scenario Outline)
if scenario_type == "scenario_outline":
examples = scenario.get("examples", [])
if examples:
for example_group in examples:
lines.append("") # Empty line before examples
# Examples tags (if any)
example_tags = self._extract_tags(example_group.get("tags", []))
if example_tags:
lines.append(" " + " ".join(example_tags))
# Examples keyword
lines.append(" Examples:")
# Examples table
rows = example_group.get("rows", [])
if rows:
for row in rows:
cells = row.get("cells", [])
if cells:
row_content = " | ".join(cells)
lines.append(f" | {row_content} |")
return "\n".join(lines)
def _generate_background_content(self, background: Dict[str, Any]) -> str:
"""Generate Gherkin background content
Args:
background: Background object from Cucumber JSON
Returns:
Gherkin background content as string
"""
lines = []
# Background header
background_name = background.get("name", "")
if background_name:
lines.append(f" Background: {background_name}")
else:
lines.append(" Background:")
# Steps
for step in background.get("steps", []):
keyword = step.get("keyword", "").strip()
step_name = step.get("name", "")
lines.append(f" {keyword} {step_name}")
return "\n".join(lines)
def _generate_rule_content(self, rule: Dict[str, Any]) -> str:
"""Generate Gherkin rule content
Args:
rule: Rule object from Cucumber JSON
Returns:
Gherkin rule content as string
"""
lines = []
# Rule tags (if any)
rule_tags = self._extract_tags(rule.get("tags", []))
if rule_tags:
lines.append(" " + " ".join(rule_tags))
# Rule header
rule_name = rule.get("name", "Untitled Rule")
lines.append(f" Rule: {rule_name}")
# Rule description (if any)
rule_description = rule.get("description", "")
if rule_description:
for desc_line in rule_description.split("\n"):
if desc_line.strip():
lines.append(f" {desc_line.strip()}")
# Process children in order: Background first, then scenarios
for element in rule.get("children", []):
element_type = element.get("type", "")
if element_type == "background":
lines.append("")
background_content = self._generate_background_content(element)
# Indent background under rule
for line in background_content.split("\n"):
lines.append(" " + line if line else "")
elif element_type in ("scenario", "scenario_outline"):
lines.append("")
scenario_content = self._generate_scenario_content(element)
# Indent scenario under rule
for line in scenario_content.split("\n"):
lines.append(" " + line if line else "")
return "\n".join(lines)
def _normalize_title(self, title: str) -> str:
"""Normalize title for robust matching (delegates to API handler for consistency)
Converts to lowercase, strips whitespace, and removes special characters.
Hyphens, underscores, and special chars are converted to spaces for word boundaries.
Args:
title: The title to normalize
Returns:
Normalized title string
"""
# Use shared normalization from API handler for consistency
from trcli.api.api_request_handler import ApiRequestHandler
return ApiRequestHandler._normalize_feature_name(title)
def set_api_handler(self, api_handler):
"""Set API handler for BDD matching mode
Args:
api_handler: ApiRequestHandler instance for API calls
"""
self._api_handler = api_handler
def _find_case_by_title(self, feature_name: str, project_id: int, suite_id: int) -> Optional[int]:
"""Find BDD case by feature name using cached index (delegates to API handler)
Args:
feature_name: Feature name from Cucumber JSON
project_id: TestRail project ID
suite_id: TestRail suite ID
Returns:
Case ID if found, None otherwise (also None if error or duplicates)
"""
if self._api_handler is None:
self.env.elog("Error: API handler not set. Cannot find case by title.")
return None
# Use shared API handler method for consistency
case_id, error, duplicates = self._api_handler.find_bdd_case_by_name(
feature_name=feature_name, project_id=project_id, suite_id=suite_id
)
# Handle errors
if error:
self.env.elog(f"Error finding case by title: {error}")
return None
# Handle duplicates
if duplicates:
case_ids_str = ", ".join([f"C{cid}" for cid in duplicates])
self.env.elog(f"Warning: Multiple BDD cases found with title '{feature_name}': {case_ids_str}")
self.env.elog(f" Cannot proceed - please ensure unique feature names in TestRail")
return None
# Handle not found (case_id == -1)
if case_id == -1:
return None
# Success
return case_id
def _extract_case_id_from_tags(self, feature_tags: List[str], scenario_tags: List[str]) -> Optional[int]:
"""Extract case ID from @C<id> tags
Priority: Feature-level tags > Scenario-level tags
This ensures feature-level @C123 tag applies to all scenarios.
Args:
feature_tags: Tags from feature level
scenario_tags: Tags from scenario level
Returns:
Case ID if found, None otherwise
"""
# Priority 1: Feature-level tags (applies to all scenarios)
for tag in feature_tags:
if tag.startswith("@C") or tag.startswith("@c"):
try:
return int(tag[2:])
except ValueError:
pass
# Priority 2: Scenario-level tags (fallback)
for tag in scenario_tags:
if tag.startswith("@C") or tag.startswith("@c"):
try:
return int(tag[2:])
except ValueError:
pass
return None
def _validate_bdd_case_exists(self, case_id: int) -> Tuple[bool, Optional[str]]:
"""Validate that case exists and is a BDD template case
Args:
case_id: TestRail case ID to validate
Returns:
Tuple of (is_valid, error_message)
- is_valid: True if case exists and is BDD template
- error_message: Error description if validation fails, None otherwise
"""
if self._api_handler is None:
return False, "API handler not set"
try:
# Fetch case details from TestRail API (use api_handler's client)
response = self._api_handler.client.send_get(f"get_case/{case_id}")
# Check if request failed or returned no data
if response.error_message or not response.response_text:
error_msg = response.error_message if response.error_message else "Case not found"
return False, f"Case C{case_id} not found: {error_msg}"
case_data = response.response_text
# Resolve BDD case field name dynamically
bdd_field_name = self._api_handler.get_bdd_case_field_name()
# Validate it's a BDD template case (has BDD scenarios field with content)
if not case_data.get(bdd_field_name):
return False, f"Case C{case_id} is not a BDD template case (missing field: {bdd_field_name})"
return True, None
except Exception as e:
return False, f"Error validating case C{case_id}: {str(e)}"
def _parse_feature_as_bdd_case(
self, feature: Dict[str, Any], project_id: int, suite_id: int, auto_create: bool = False
) -> Optional[TestRailCase]:
"""Parse Cucumber feature as single BDD test case with multiple scenario results
This method is used in BDD matching mode (WITHOUT --upload-feature).
It groups all scenarios from a feature under a single BDD test case.
Workflow:
1. Extract case ID from @C<id> tags (feature > scenario priority)
2. Fallback to feature name matching via cached lookup
3. If not found and auto_create=True: Return special marker for auto-creation
4. Validate case exists and is BDD template
5. Parse all scenarios as BDD scenario results
6. Aggregate status (fail-fast: any scenario failure → feature fails)
7. Create single TestRailCase with custom_testrail_bdd_scenario_results
Args:
feature: Feature object from Cucumber JSON
project_id: TestRail project ID
suite_id: TestRail suite ID
auto_create: If True, mark for auto-creation when not found
Returns:
TestRailCase with BDD scenario results, or None if case not found and auto_create=False
Returns TestRailCase with case_id=-1 if not found and auto_create=True (marker for creation)
"""
feature_name = feature.get("name", "Untitled Feature")
feature_tags = self._extract_tags(feature.get("tags", []))
# Step 1: Try to extract case ID from tags
case_id = None
for tag in feature_tags:
if tag.startswith("@C") or tag.startswith("@c"):
try:
case_id = int(tag[2:])
self.env.vlog(f"Found case ID from feature tag: C{case_id}")
break
except ValueError:
pass
# Step 2: Fallback to feature name matching (cached lookup)
if case_id is None:
case_id = self._find_case_by_title(feature_name, project_id, suite_id)
if case_id:
self.env.vlog(f"Found case ID from feature name '{feature_name}': C{case_id}")
# Step 3: Handle case not found
if case_id is None:
if auto_create:
self.env.log(f"Feature '{feature_name}' not found in TestRail - will auto-create")
# Return special marker (case_id=-1) to indicate this needs creation
# Store feature data for later creation
case_id = -1 # Marker for auto-creation
else:
self.env.elog(f"Error: No BDD case found for feature '{feature_name}'")
self.env.elog(f" Add @C<id> tag to feature or ensure case exists with title '{feature_name}'")
return None
# Step 4: Validate case exists (skip validation if marked for creation)
if case_id != -1:
is_valid, error_message = self._validate_bdd_case_exists(case_id)
if not is_valid:
self.env.elog(f"Error validating case for feature '{feature_name}': {error_message}")
return None
# Step 4: Parse all scenarios as BDD scenario results
bdd_scenario_results = []
overall_status = 1 # Passed by default (fail-fast logic applied below)
total_elapsed = 0
for element in feature.get("elements", []):
element_type = element.get("type", "")
if element_type in ("scenario", "scenario_outline"):
scenario_name = element.get("name", "Untitled Scenario")
scenario_tags = self._extract_tags(element.get("tags", []))
# Parse steps to determine scenario status
steps = element.get("steps", [])
_, scenario_status = self._parse_steps(steps)
# Calculate elapsed time for this scenario
scenario_elapsed = 0
for step in steps:
result = step.get("result", {})
duration = result.get("duration", 0)
if duration:
scenario_elapsed += duration
total_elapsed += scenario_elapsed
# Create BDD scenario result (using TestRailSeparatedStep structure)
bdd_scenario = TestRailSeparatedStep(content=scenario_name)
bdd_scenario.status_id = scenario_status
bdd_scenario_results.append(bdd_scenario)
# Fail-fast: If any scenario fails, entire feature fails
if scenario_status == 5: # Failed
overall_status = 5
elif scenario_status == 4 and overall_status != 5: # Skipped
overall_status = 4
elif scenario_status == 3 and overall_status == 1: # Untested/Pending
overall_status = 3
# Step 5: Calculate elapsed time (pass as numeric seconds, TestRailResult.__post_init__ will format)
elapsed_time = None
if total_elapsed > 0:
total_seconds = total_elapsed / 1_000_000_000
elapsed_time = str(total_seconds) # Pass as string number, will be formatted by __post_init__
# Step 6: Build comment from failures (aggregate all scenario failures)
comment_parts = []
for element in feature.get("elements", []):
if element.get("type", "") in ("scenario", "scenario_outline"):
scenario_name = element.get("name", "Untitled Scenario")
steps = element.get("steps", [])
# Check if scenario failed
scenario_failed = False
for step in steps:
result = step.get("result", {})
if result.get("status", "").lower() == "failed":
scenario_failed = True
break
if scenario_failed:
failure_comment = self._build_comment_from_failures(steps)
if failure_comment:
comment_parts.append(f"Scenario: {scenario_name}\n{failure_comment}")
comment = "\n\n".join(comment_parts) if comment_parts else ""
# Step 7: Create result with BDD scenario results
# Resolve BDD result field name dynamically
bdd_result_field_name = self._api_handler.get_bdd_result_field_name()
result = TestRailResult(
case_id=case_id,
status_id=overall_status,
comment=comment,
elapsed=elapsed_time,
)
# Add BDD scenario results to result_fields dict (for serialization)
# Convert TestRailSeparatedStep objects to dicts for API
result.result_fields[bdd_result_field_name] = [
{"content": step.content, "status_id": step.status_id} for step in bdd_scenario_results
]
# Step 8: Create test case
test_case = TestRailCase(
title=TestRailCaseFieldsOptimizer.extract_last_words(
feature_name, TestRailCaseFieldsOptimizer.MAX_TESTCASE_TITLE_LENGTH
),
case_id=case_id,
result=result,
)
self.env.vlog(
f"Parsed feature '{feature_name}' as BDD case C{case_id} "
f"with {len(bdd_scenario_results)} scenarios (status: {overall_status})"
)
return test_case
+1
-1
Metadata-Version: 2.4
Name: trcli
Version: 1.12.6
Version: 1.13.0
License-File: LICENSE.md

@@ -5,0 +5,0 @@ Requires-Dist: click<8.2.2,>=8.1.0

@@ -15,6 +15,7 @@ from setuptools import setup

"trcli.api",
"trcli.logging",
],
include_package_data=True,
install_requires=[
"click>=8.1.0,<8.2.2", # Note: click version 8.2.2 is yanked as of Aug 2, 2025!
"click>=8.1.0,<8.2.2", # Note: click version 8.2.2 is yanked as of Aug 2, 2025!
"pyyaml>=6.0.0,<7.0.0",

@@ -28,3 +29,3 @@ "junitparser>=3.1.0,<4.0.0",

"beartype>=0.17.0,<1.0.0",
"prance" # Does not use semantic versioning
"prance", # Does not use semantic versioning
],

@@ -31,0 +32,0 @@ entry_points="""

@@ -289,4 +289,6 @@ import pytest

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
) as mock_get_labels, patch.object(self.labels_handler, "add_label") as mock_add_label, patch.object(
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels, patch.object(
self.labels_handler.label_manager, "add_label"
) as mock_add_label, patch.object(
self.labels_handler.client, "send_get"

@@ -311,8 +313,12 @@ ) as mock_send_get, patch.object(

mock_send_get.side_effect = [
MagicMock(status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 1"}), # Case 1
MagicMock(status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 2"}), # Case 2
MagicMock(
status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 1"}, error_message=""
), # Case 1
MagicMock(
status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 2"}, error_message=""
), # Case 2
]
# Mock update_cases batch response (for multiple cases)
mock_send_post.return_value = MagicMock(status_code=200)
mock_send_post.return_value = MagicMock(status_code=200, error_message="")

@@ -345,4 +351,6 @@ # Test the method

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
) as mock_get_labels, patch.object(self.labels_handler, "add_label") as mock_add_label, patch.object(
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels, patch.object(
self.labels_handler.label_manager, "add_label"
) as mock_add_label, patch.object(
self.labels_handler.client, "send_get"

@@ -364,7 +372,7 @@ ) as mock_send_get, patch.object(

mock_send_get.return_value = MagicMock(
status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 1"}
status_code=200, response_text={"labels": [], "suite_id": 1, "title": "Case 1"}, error_message=""
)
# Mock update_case response (for single case)
mock_send_post.return_value = MagicMock(status_code=200)
mock_send_post.return_value = MagicMock(status_code=200, error_message="")

@@ -396,4 +404,6 @@ # Test the method with single case

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
) as mock_get_labels, patch.object(self.labels_handler, "add_label") as mock_add_label, patch.object(
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels, patch.object(
self.labels_handler.label_manager, "add_label"
) as mock_add_label, patch.object(
self.labels_handler.client, "send_get"

@@ -412,7 +422,7 @@ ) as mock_send_get, patch.object(

mock_send_get.return_value = MagicMock(
status_code=200, response_text={"labels": [], "section_id": 1, "title": "Case 1"}
status_code=200, response_text={"labels": [], "section_id": 1, "title": "Case 1"}, error_message=""
)
# Mock add_label_to_case response
mock_send_post.return_value = MagicMock(status_code=200)
mock_send_post.return_value = MagicMock(status_code=200, error_message="")

@@ -437,3 +447,3 @@ # Test the method

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels, patch.object(self.labels_handler.client, "send_get") as mock_send_get:

@@ -449,3 +459,5 @@

existing_labels = [{"id": i, "title": f"label-{i}"} for i in range(1, 11)]
mock_send_get.return_value = MagicMock(status_code=200, response_text={"labels": existing_labels})
mock_send_get.return_value = MagicMock(
status_code=200, response_text={"labels": existing_labels}, error_message=""
)

@@ -470,3 +482,3 @@ # Test the method

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels, patch.object(self.labels_handler.client, "send_get") as mock_send_get:

@@ -482,3 +494,3 @@

mock_send_get.return_value = MagicMock(
status_code=200, response_text={"labels": [{"id": 5, "title": "test-label"}]}
status_code=200, response_text={"labels": [{"id": 5, "title": "test-label"}]}, error_message=""
)

@@ -553,3 +565,3 @@

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels:

@@ -582,3 +594,3 @@

with patch.object(self.labels_handler, "_ApiRequestHandler__get_all_cases") as mock_get_cases, patch.object(
self.labels_handler, "get_labels"
self.labels_handler.label_manager, "get_labels"
) as mock_get_labels:

@@ -585,0 +597,0 @@

@@ -33,2 +33,4 @@ import json

api_request = ApiRequestHandler(environment, api_client, test_input, verify)
# Clear cache for each test to ensure isolation
api_request._cache.clear()
return api_request

@@ -41,3 +43,6 @@

def api_request_handler(handler_maker):
yield handler_maker()
handler = handler_maker()
yield handler
# Clean up cache after test
handler._cache.clear()

@@ -47,3 +52,6 @@

def api_request_handler_verify(handler_maker):
yield handler_maker(verify=True)
handler = handler_maker(verify=True)
yield handler
# Clean up cache after test
handler._cache.clear()

@@ -54,3 +62,6 @@

json_path = Path(__file__).parent / "test_data/json/update_case_result_single_with_id.json"
yield handler_maker(custom_json=json_path, verify=False)
handler = handler_maker(custom_json=json_path, verify=False)
yield handler
# Clean up cache after test
handler._cache.clear()

@@ -835,2 +846,41 @@

@pytest.mark.api_handler
def test_add_bdd_success(self, api_request_handler: ApiRequestHandler, requests_mock):
"""Test successful .feature file upload via add_bdd endpoint"""
section_id = 123
feature_content = "@smoke\nFeature: User Login\n Scenario: Successful login"
# API returns standard TestRail test case JSON with 'id' field
# File upload uses multipart/form-data
mocked_response = {"id": 101, "title": "Successful login", "section_id": 123, "template_id": 1}
requests_mock.post(
create_url(f"add_bdd/{section_id}"),
json=mocked_response,
)
case_ids, error = api_request_handler.add_bdd(section_id, feature_content)
assert case_ids == [101], "Should return list with single case ID"
assert error == "", "There should be no error"
@pytest.mark.api_handler
def test_get_bdd_success(self, api_request_handler: ApiRequestHandler, requests_mock):
"""Test successful .feature file retrieval via get_bdd endpoint"""
case_id = 456
expected_feature = "@smoke\nFeature: User Login"
# API returns raw Gherkin text
mocked_response = expected_feature
requests_mock.get(
create_url(f"get_bdd/{case_id}"),
text=mocked_response,
)
feature_content, error = api_request_handler.get_bdd(case_id)
assert feature_content == expected_feature, "Should return feature content"
assert error == "", "There should be no error"
@pytest.mark.api_handler
def test_update_run_with_include_all_false_standalone(self, api_request_handler: ApiRequestHandler, requests_mock):

@@ -1160,1 +1210,66 @@ """Test update_run for standalone run with include_all=false"""

api_request_handler.upload_attachments(report_results, results, run_id)
@pytest.mark.api_handler
def test_caching_reduces_api_calls(self, api_request_handler: ApiRequestHandler, requests_mock):
"""Test that caching reduces the number of API calls for repeated requests"""
mocked_response = {
"offset": 0,
"limit": 250,
"size": 2,
"_links": {"next": None, "prev": None},
"projects": [
{"id": 1, "name": "DataHub", "suite_mode": 1},
{"id": 2, "name": "Test Project", "suite_mode": 1},
],
}
# Set up mock
mock_get = requests_mock.get(create_url("get_projects"), json=mocked_response)
# First call should hit the API
result1 = api_request_handler.get_project_data("Test Project")
assert result1.project_id == 2
assert mock_get.call_count == 1, "First call should hit the API"
# Second call should use cache
result2 = api_request_handler.get_project_data("Test Project")
assert result2.project_id == 2
assert mock_get.call_count == 1, "Second call should use cache, not hit API again"
# Third call with different name should still use cache (same endpoint)
result3 = api_request_handler.get_project_data("DataHub")
assert result3.project_id == 1
assert mock_get.call_count == 1, "Third call should still use cached data"
@pytest.mark.api_handler
def test_cache_stats(self, api_request_handler: ApiRequestHandler, requests_mock):
"""Test that cache statistics are tracked correctly"""
mocked_response = {
"offset": 0,
"limit": 250,
"size": 1,
"_links": {"next": None, "prev": None},
"projects": [{"id": 1, "name": "Test Project", "suite_mode": 1}],
}
requests_mock.get(create_url("get_projects"), json=mocked_response)
# Check initial stats
stats = api_request_handler._cache.get_stats()
assert stats["hit_count"] == 0
assert stats["miss_count"] == 0
assert stats["size"] == 0
# Make first call (cache miss)
api_request_handler.get_project_data("Test Project")
stats = api_request_handler._cache.get_stats()
assert stats["miss_count"] == 1
assert stats["hit_count"] == 0
assert stats["size"] == 1
# Make second call (cache hit)
api_request_handler.get_project_data("Test Project")
stats = api_request_handler._cache.get_stats()
assert stats["miss_count"] == 1
assert stats["hit_count"] == 1
assert stats["hit_rate"] == 50.0 # 1 hit out of 2 total requests

@@ -44,14 +44,8 @@ import pytest

junit_file_parser = mocker.patch.object(JunitParser, "parse_file")
api_request_handler = mocker.patch(
"trcli.api.project_based_client.ApiRequestHandler"
)
results_uploader = ResultsUploader(
environment=environment, suite=junit_file_parser
)
api_request_handler = mocker.patch("trcli.api.project_based_client.ApiRequestHandler")
results_uploader = ResultsUploader(environment=environment, suite=junit_file_parser)
yield environment, api_request_handler, results_uploader
@pytest.mark.results_uploader
def test_project_name_missing_in_test_rail(
self, result_uploader_data_provider, mocker
):
def test_project_name_missing_in_test_rail(self, result_uploader_data_provider, mocker):
"""The purpose of this test is to check that proper message will be printed and trcli will terminate

@@ -71,5 +65,3 @@ with proper code when project with name provided does not exist in TestRail."""

)
expected_elog_calls = [
mocker.call(f"\n{environment.project} project doesn't exist.")
]
expected_elog_calls = [mocker.call(f"\n{environment.project} project doesn't exist.")]

@@ -80,6 +72,4 @@ with pytest.raises(SystemExit) as exception:

environment.elog.assert_has_calls(expected_elog_calls)
assert exception.type == SystemExit, f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.type == SystemExit
), f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.value.code == exit_code

@@ -100,5 +90,3 @@ ), f"Expected exit code {exit_code}, but got {exception.value.code} instead."

)
def test_error_during_checking_of_project(
self, error_type, error_message, result_uploader_data_provider, mocker
):
def test_error_during_checking_of_project(self, error_type, error_message, result_uploader_data_provider, mocker):
"""The purpose of this test is to check that proper message would be printed and trcli tool will

@@ -119,8 +107,3 @@ terminate with proper code when errors occurs during project name check."""

expected_log_calls = [
mocker.call(
"\n"
+ FAULT_MAPPING["error_checking_project"].format(
error_message=error_message
)
)
mocker.call("\n" + FAULT_MAPPING["error_checking_project"].format(error_message=error_message))
]

@@ -131,6 +114,4 @@ with pytest.raises(SystemExit) as exception:

environment.elog.assert_has_calls(expected_log_calls)
assert exception.type == SystemExit, f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.type == SystemExit
), f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.value.code == exit_code

@@ -145,5 +126,3 @@ ), f"Expected exit code {exit_code}, but got {exception.value.code} instead."

)
def test_upload_results_flow(
self, failing_function, result_uploader_data_provider, mocker
):
def test_upload_results_flow(self, failing_function, result_uploader_data_provider, mocker):
"""The purpose of those tests is to check that proper message would be printed and trcli tool

@@ -180,16 +159,10 @@ will terminate with proper code when one of the functions in the flow fails."""

assert exception.type == SystemExit, f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.type == SystemExit
), f"Expected SystemExit exception, but got {exception.type} instead."
assert (
exception.value.code == exit_code
), f"Expected exit code {exit_code}, but got {exception.value.code} instead."
@pytest.mark.parametrize(
"run_id", [None, 101], ids=["No run ID provided", "Run ID provided"]
)
@pytest.mark.parametrize("run_id", [None, 101], ids=["No run ID provided", "Run ID provided"])
@pytest.mark.results_uploader
def test_upload_results_successful(
self, run_id, result_uploader_data_provider, mocker
):
def test_upload_results_successful(self, run_id, result_uploader_data_provider, mocker):
"""The purpose of this test is to check if during successful run of upload_results proper messages

@@ -210,5 +183,3 @@ would be printed."""

)
upload_results_inner_functions_mocker(
results_uploader=results_uploader, mocker=mocker, failing_functions=[]
)
upload_results_inner_functions_mocker(results_uploader=results_uploader, mocker=mocker, failing_functions=[])
results_uploader.api_request_handler.check_automation_id_field.return_value = None

@@ -218,15 +189,13 @@ results_uploader.api_request_handler.check_missing_test_cases_ids.return_value = ([], "")

expected_log_calls = []
# Note: Empty section removal messages are no longer expected because
# the new logic skips section/case creation when all cases have IDs
if not run_id:
calls = {
2: mocker.call("Removing unnecessary empty sections that may have been created earlier. ", new_line=False),
3: mocker.call("Removed 1 unused/empty section(s)."),
4: mocker.call("Creating test run. ", new_line=False),
5: mocker.call("Closing run. ", new_line=False),
2: mocker.call("Creating test run. ", new_line=False),
3: mocker.call("Closing run. ", new_line=False),
}
else:
calls = {
2: mocker.call("Removing unnecessary empty sections that may have been created earlier. ", new_line=False),
3: mocker.call("Removed 1 unused/empty section(s)."),
4: mocker.call("Updating test run. ", new_line=False),
5: mocker.call("Closing run. ", new_line=False),
2: mocker.call("Updating test run. ", new_line=False),
3: mocker.call("Closing run. ", new_line=False),
}

@@ -239,5 +208,3 @@

@pytest.mark.results_uploader
def test_add_missing_sections_no_missing_sections(
self, result_uploader_data_provider
):
def test_add_missing_sections_no_missing_sections(self, result_uploader_data_provider):
"""The purpose of this test is to check that add_missing_sections will return empty list

@@ -294,8 +261,4 @@ and proper return code when there are no missing sections."""

)
results_uploader.environment.get_prompt_response_for_auto_creation.return_value = (
user_response
)
results_uploader.api_request_handler.data_provider.check_section_names_duplicates.return_value = (
False
)
results_uploader.environment.get_prompt_response_for_auto_creation.return_value = user_response
results_uploader.api_request_handler.data_provider.check_section_names_duplicates.return_value = False
results_uploader.api_request_handler.add_sections.return_value = (

@@ -328,11 +291,7 @@ expected_added_sections,

environment.get_prompt_response_for_auto_creation.assert_called_with(
PROMPT_MESSAGES["create_missing_sections"].format(
project_name=environment.project
)
PROMPT_MESSAGES["create_missing_sections"].format(project_name=environment.project)
)
@pytest.mark.results_uploader
def test_add_missing_sections_error_checking(
self, result_uploader_data_provider, mocker
):
def test_add_missing_sections_error_checking(self, result_uploader_data_provider, mocker):
"""The purpose of this test is to check that add_missing_sections will return empty list

@@ -397,5 +356,3 @@ and -1 as a result code when check_missing_section_ids will fail. Proper message will be printed."""

)
results_uploader.environment.get_prompt_response_for_auto_creation.return_value = (
user_response
)
results_uploader.environment.get_prompt_response_for_auto_creation.return_value = user_response
results_uploader.api_request_handler.add_cases.return_value = (

@@ -429,11 +386,7 @@ expected_added_test_cases,

environment.get_prompt_response_for_auto_creation.assert_called_with(
PROMPT_MESSAGES["create_missing_test_cases"].format(
project_name=environment.project
)
PROMPT_MESSAGES["create_missing_test_cases"].format(project_name=environment.project)
)
@pytest.mark.results_uploader
def test_add_missing_test_cases_duplicated_case_names(
self, result_uploader_data_provider, mocker
):
def test_add_missing_test_cases_duplicated_case_names(self, result_uploader_data_provider, mocker):
"""The purpose of this test is to check that proper warning will be printed when duplicated case

@@ -451,7 +404,3 @@ names will be detected in result file."""

results_uploader.project = ProjectData(
project_id=1,
suite_mode=SuiteModes.single_suite,
error_message=""
)
results_uploader.project = ProjectData(project_id=1, suite_mode=SuiteModes.single_suite, error_message="")

@@ -479,7 +428,3 @@ assert (

results_uploader.project = ProjectData(
project_id=1,
suite_mode=SuiteModes.multiple_suites,
error_message=""
)
results_uploader.project = ProjectData(project_id=1, suite_mode=SuiteModes.multiple_suites, error_message="")

@@ -513,12 +458,6 @@ api_request_handler_delete_mocker(

results_uploader.project = ProjectData(
project_id=1,
suite_mode=SuiteModes.multiple_suites,
error_message=""
)
results_uploader.project = ProjectData(project_id=1, suite_mode=SuiteModes.multiple_suites, error_message="")
suite_id = 1234
results_uploader.api_request_handler.suites_data_from_provider.suite_id = (
suite_id
)
results_uploader.api_request_handler.suites_data_from_provider.suite_id = suite_id
results_uploader.api_request_handler.check_suite_id.return_value = (True, "")

@@ -525,0 +464,0 @@

Metadata-Version: 2.4
Name: trcli
Version: 1.12.6
Version: 1.13.0
License-File: LICENSE.md

@@ -5,0 +5,0 @@ Requires-Dist: click<8.2.2,>=8.1.0

@@ -14,5 +14,11 @@ LICENSE.md

tests/test_cmd_add_run.py
tests/test_cmd_export_gherkin.py
tests/test_cmd_import_gherkin.py
tests/test_cmd_labels.py
tests/test_cmd_parse_cucumber.py
tests/test_cmd_references.py
tests/test_cucumber_bdd_matching.py
tests/test_cucumber_parser.py
tests/test_dataclass_creation.py
tests/test_junit_bdd_parser.py
tests/test_junit_parse_reference.py

@@ -38,10 +44,24 @@ tests/test_junit_parser.py

trcli/api/__init__.py
trcli/api/api_cache.py
trcli/api/api_client.py
trcli/api/api_request_handler.py
trcli/api/api_response_verify.py
trcli/api/api_utils.py
trcli/api/bdd_handler.py
trcli/api/case_handler.py
trcli/api/case_matcher.py
trcli/api/label_manager.py
trcli/api/project_based_client.py
trcli/api/reference_manager.py
trcli/api/result_handler.py
trcli/api/results_uploader.py
trcli/api/run_handler.py
trcli/api/section_handler.py
trcli/api/suite_handler.py
trcli/commands/__init__.py
trcli/commands/cmd_add_run.py
trcli/commands/cmd_export_gherkin.py
trcli/commands/cmd_import_gherkin.py
trcli/commands/cmd_labels.py
trcli/commands/cmd_parse_cucumber.py
trcli/commands/cmd_parse_junit.py

@@ -57,3 +77,8 @@ trcli/commands/cmd_parse_openapi.py

trcli/data_providers/api_data_provider.py
trcli/logging/__init__.py
trcli/logging/config.py
trcli/logging/file_handler.py
trcli/logging/structured_logger.py
trcli/readers/__init__.py
trcli/readers/cucumber_json.py
trcli/readers/file_parser.py

@@ -60,0 +85,0 @@ trcli/readers/junit_xml.py

@@ -1,1 +0,1 @@

__version__ = "1.12.6"
__version__ = "1.13.0"

@@ -227,3 +227,3 @@ import json

headers["Proxy-Authorization"] = f"Basic {user_pass_encoded}"
print(f"Proxy authentication header added: {headers['Proxy-Authorization']}")
self.verbose_logging_function("Proxy authentication configured")

@@ -260,3 +260,3 @@ return headers

if host in self.noproxy:
print(f"Bypassing proxy for host: {host}")
self.verbose_logging_function(f"Bypassing proxy for host: {host}")
return None

@@ -263,0 +263,0 @@

@@ -52,3 +52,3 @@ from beartype.typing import Callable, Optional, Tuple

"noproxy": noproxy,
"uploader_metadata": uploader_metadata
"uploader_metadata": uploader_metadata,
}

@@ -73,6 +73,9 @@

"""
# Skip if project is already resolved (e.g., by parse_cucumber command)
if self.project is not None:
self.environment.vlog("Project already resolved, skipping project check")
return
self.environment.log("Checking project. ", new_line=False)
self.project = self.api_request_handler.get_project_data(
self.environment.project, self.environment.project_id
)
self.project = self.api_request_handler.get_project_data(self.environment.project, self.environment.project_id)
self._validate_project_id()

@@ -93,6 +96,3 @@ if self.environment.auto_creation_response:

self.environment.elog(
"\n"
+ FAULT_MAPPING["error_checking_project"].format(
error_message=self.project.error_message
)
"\n" + FAULT_MAPPING["error_checking_project"].format(error_message=self.project.error_message)
)

@@ -102,6 +102,3 @@ exit(1)

self.environment.elog(
"\n"
+ FAULT_MAPPING["error_checking_project"].format(
error_message=self.project.error_message
)
"\n" + FAULT_MAPPING["error_checking_project"].format(error_message=self.project.error_message)
)

@@ -139,5 +136,3 @@ exit(1)

)
adding_message = (
f"Adding missing suites to project {self.environment.project}."
)
adding_message = f"Adding missing suites to project {self.environment.project}."
fault_message = FAULT_MAPPING["no_user_agreement"].format(type="suite")

@@ -155,5 +150,3 @@ added_suites, result_code = self.prompt_user_and_add_items(

elif suite_mode == SuiteModes.single_suite_baselines:
suite_ids, error_message = self.api_request_handler.get_suite_ids(
project_id=project_id
)
suite_ids, error_message = self.api_request_handler.get_suite_ids(project_id=project_id)
if error_message:

@@ -164,5 +157,5 @@ self.environment.elog(error_message)

self.environment.elog(
FAULT_MAPPING[
"not_unique_suite_id_single_suite_baselines"
].format(project_name=self.environment.project)
FAULT_MAPPING["not_unique_suite_id_single_suite_baselines"].format(
project_name=self.environment.project
)
)

@@ -172,5 +165,3 @@ else:

elif suite_mode == SuiteModes.single_suite:
suite_ids, error_message = self.api_request_handler.get_suite_ids(
project_id=project_id
)
suite_ids, error_message = self.api_request_handler.get_suite_ids(project_id=project_id)
if error_message:

@@ -182,5 +173,3 @@ self.environment.elog(error_message)

else:
self.environment.elog(
FAULT_MAPPING["unknown_suite_mode"].format(suite_mode=suite_mode)
)
self.environment.elog(FAULT_MAPPING["unknown_suite_mode"].format(suite_mode=suite_mode))
else:

@@ -200,5 +189,3 @@ suite_id = self.api_request_handler.suites_data_from_provider.suite_id

project_id = self._get_project_id()
suite_exists, error_message = self.api_request_handler.check_suite_id(
project_id
)
suite_exists, error_message = self.api_request_handler.check_suite_id(project_id)
if suite_exists:

@@ -240,4 +227,4 @@ result_code = 1

run, error_message = self.api_request_handler.update_run(
run_id,
self.run_name,
run_id,
self.run_name,
start_date=self.environment.run_start_date,

@@ -247,3 +234,3 @@ end_date=self.environment.run_end_date,

refs=self.environment.run_refs,
refs_action=getattr(self.environment, 'run_refs_action', 'add')
refs_action=getattr(self.environment, "run_refs_action", "add"),
)

@@ -250,0 +237,0 @@ if self.environment.auto_close_run:

@@ -46,15 +46,31 @@ import time

# Check if all test cases already have case_id set (BDD mode or pre-existing cases)
# Note: In BDD mode, case_id can be -1 (marker for auto-creation) or a real ID
suite_data = self.api_request_handler.suites_data_from_provider
all_cases_have_ids = all(
test_case.case_id is not None and test_case.case_id != 0
for section in suite_data.testsections
for test_case in section.testcases
)
if all_cases_have_ids:
self.environment.vlog("All test cases have IDs - skipping section/case creation checks")
# Resolve missing test cases and sections
missing_test_cases, error_message = self.api_request_handler.check_missing_test_cases_ids(
self.project.project_id
)
if error_message:
self.environment.elog(
FAULT_MAPPING["error_checking_missing_item"].format(
missing_item="missing test cases", error_message=error_message
# Skip this check if all cases already have IDs (BDD mode)
missing_test_cases = False
if not all_cases_have_ids:
missing_test_cases, error_message = self.api_request_handler.check_missing_test_cases_ids(
self.project.project_id
)
if error_message:
self.environment.elog(
FAULT_MAPPING["error_checking_missing_item"].format(
missing_item="missing test cases", error_message=error_message
)
)
)
added_sections = None
added_test_cases = None
if self.environment.auto_creation_response:
if self.environment.auto_creation_response and not all_cases_have_ids:
added_sections, result_code = self.add_missing_sections(self.project.project_id)

@@ -111,23 +127,13 @@ if result_code == -1:

# Update existing cases with JUnit references and custom fields if enabled
# Update existing cases with JUnit references if enabled
case_update_results = None
case_update_failed = []
if hasattr(self.environment, "update_existing_cases") and self.environment.update_existing_cases == "yes":
self.environment.log("Updating existing cases with references and custom fields...")
self.environment.log("Updating existing cases with JUnit references...")
case_update_results, case_update_failed = self.update_existing_cases_with_junit_refs(added_test_cases)
if case_update_results.get("updated_cases"):
updated_count = len(case_update_results["updated_cases"])
# Count how many had refs vs fields updated
refs_updated = sum(1 for case in case_update_results["updated_cases"] if case.get("added_refs"))
fields_updated = sum(1 for case in case_update_results["updated_cases"] if case.get("updated_fields"))
msg_parts = []
if refs_updated:
msg_parts.append(f"{refs_updated} with references")
if fields_updated:
msg_parts.append(f"{fields_updated} with custom fields")
detail = f" ({', '.join(msg_parts)})" if msg_parts else ""
self.environment.log(f"Updated {updated_count} existing case(s){detail}.")
self.environment.log(
f"Updated {len(case_update_results['updated_cases'])} existing case(s) with references."
)
if case_update_results.get("failed_cases"):

@@ -243,3 +249,3 @@ self.environment.elog(f"Failed to update {len(case_update_results['failed_cases'])} case(s).")

"""
Update existing test cases with references and custom fields from JUnit properties.
Update existing test cases with references from JUnit properties.
Excludes newly created cases to avoid unnecessary API calls.

@@ -267,17 +273,14 @@

for test_case in section.testcases:
# Get refs and case fields for this test case
junit_refs = getattr(test_case, "_junit_case_refs", None)
case_fields = getattr(test_case, "case_fields", {})
# Only process cases that have a case_id (existing cases) and either JUnit refs or case fields
# Only process cases that have a case_id (existing cases) and JUnit refs
# AND exclude newly created cases
if (
test_case.case_id
and (junit_refs or case_fields)
and hasattr(test_case, "_junit_case_refs")
and test_case._junit_case_refs
and int(test_case.case_id) not in newly_created_case_ids
):
try:
success, error_msg, added_refs, skipped_refs, updated_fields = (
success, error_msg, added_refs, skipped_refs = (
self.api_request_handler.update_existing_case_references(
test_case.case_id, junit_refs or "", case_fields, strategy
test_case.case_id, test_case._junit_case_refs, strategy
)

@@ -287,4 +290,4 @@ )

if success:
if added_refs or updated_fields:
# Count as "updated" if references were added or fields were updated
if added_refs:
# Only count as "updated" if references were actually added
update_results["updated_cases"].append(

@@ -296,8 +299,11 @@ {

"skipped_refs": skipped_refs,
"updated_fields": updated_fields,
}
)
else:
# If nothing was updated (all refs were duplicates and no fields), count as skipped
reason = "All references already present" if skipped_refs else "No changes to apply"
# If no refs were added (all were duplicates or no valid refs), count as skipped
reason = (
"All references already present"
if skipped_refs
else "No valid references to process"
)
update_results["skipped_cases"].append(

@@ -329,6 +335,7 @@ {

test_case.case_id
and (junit_refs or case_fields)
and hasattr(test_case, "_junit_case_refs")
and test_case._junit_case_refs
and int(test_case.case_id) in newly_created_case_ids
):
# Skip newly created cases - they already have their fields set during creation
# Skip newly created cases - they already have their references set
update_results["skipped_cases"].append(

@@ -338,3 +345,3 @@ {

"case_title": test_case.title,
"reason": "Newly created case - fields already set during creation",
"reason": "Newly created case - references already set during creation",
}

@@ -341,0 +348,0 @@ )

@@ -23,2 +23,6 @@ import os

# Import structured logging infrastructure
from trcli.logging import get_logger
from trcli.logging.config import LoggingConfig
CONTEXT_SETTINGS = dict(auto_envvar_prefix="TR_CLI")

@@ -83,3 +87,17 @@

# Structured logger - lazy initialization
self._logger = None
@property
def logger(self):
"""Get structured logger for this environment.
Lazy initialization - logger is created on first access.
Returns a StructuredLogger instance for the current command.
"""
if self._logger is None:
self._logger = get_logger(f"trcli.{self.cmd}")
return self._logger
@property
def case_fields(self):

@@ -109,3 +127,6 @@ return self._case_fields

def log(self, msg: str, new_line=True, *args):
"""Logs a message to stdout only if silent mode is disabled."""
"""Logs a message to stdout only if silent mode is disabled.
Also logs to structured logger for observability.
"""
if not self.silent:

@@ -116,10 +137,32 @@ if args:

# Also log to structured logger (backward compatible)
try:
self.logger.info(msg)
except Exception:
# Silently fail if structured logging has issues
pass
def vlog(self, msg: str, *args):
"""Logs a message to stdout only if the verbose option is enabled."""
"""Logs a message to stdout only if the verbose option is enabled.
Also logs to structured logger at DEBUG level.
"""
if self.verbose:
self.log(msg, *args)
if args:
msg %= args
click.echo(msg, file=sys.stdout)
# Also log to structured logger
try:
self.logger.debug(msg)
except Exception:
# Silently fail if structured logging has issues
pass
@staticmethod
def elog(msg: str, new_line=True, *args):
"""Logs a message to stderr."""
"""Logs a message to stderr.
Also logs to structured logger at ERROR level.
"""
if args:

@@ -129,2 +172,10 @@ msg %= args

# Also log to structured logger
try:
error_logger = get_logger("trcli.error")
error_logger.error(msg)
except Exception:
# Silently fail if structured logging has issues
pass
def get_progress_bar(self, results_amount: int, prefix: str):

@@ -152,2 +203,17 @@ disabled = True if self.silent else False

param_sources_types = [ParameterSource.DEFAULT, ParameterSource.ENVIRONMENT]
# First, get parameters from parent context (global options like --verbose)
if context.parent:
for param, value in context.parent.params.items():
if param == "config":
continue
param_config_value = self.params_from_config.get(param, None)
param_source = context.parent.get_parameter_source(param)
if param_source in param_sources_types and param_config_value is not None:
setattr(self, param, param_config_value)
else:
setattr(self, param, value)
# Then, process current context parameters (subcommand-specific options)
for param, value in context.params.items():

@@ -355,1 +421,11 @@ # Don't set config again

environment.set_parameters(context)
# Initialize structured logging system
# This reads configuration from:
# 1. Environment variables (TRCLI_LOG_LEVEL, TRCLI_LOG_FORMAT, etc.)
# 2. Config file (if 'logging' section exists)
try:
LoggingConfig.setup_logging(environment.config)
except Exception as e:
# Fallback to stderr if logging setup fails - don't block execution
click.echo(f"Warning: Failed to initialize logging: {e}", file=sys.stderr)

@@ -21,4 +21,4 @@ from xml.etree.ElementTree import ParseError

default="junit",
type=click.Choice(["junit", "saucectl"], case_sensitive=False),
help="Optional special parser option for specialized JUnit reports.",
type=click.Choice(["junit", "saucectl", "bdd"], case_sensitive=False),
help="Optional special parser option for specialized JUnit reports. Use 'bdd' for BDD framework JUnit output.",
)

@@ -25,0 +25,0 @@ @click.option(

@@ -10,13 +10,19 @@ import functools

def print_config(env: Environment):
assign_info = f"Yes ({env.assign_failed_to})" if hasattr(env, 'assign_failed_to') and env.assign_failed_to and env.assign_failed_to.strip() else "No"
env.log(f"Parser Results Execution Parameters"
f"\n> Report file: {env.file}"
f"\n> Config file: {env.config}"
f"\n> TestRail instance: {env.host} (user: {env.username})"
f"\n> Project: {env.project if env.project else env.project_id}"
f"\n> Run title: {env.title}"
f"\n> Update run: {env.run_id if env.run_id else 'No'}"
f"\n> Add to milestone: {env.milestone_id if env.milestone_id else 'No'}"
f"\n> Auto-assign failures: {assign_info}"
f"\n> Auto-create entities: {env.auto_creation_response}")
assign_info = (
f"Yes ({env.assign_failed_to})"
if hasattr(env, "assign_failed_to") and env.assign_failed_to and env.assign_failed_to.strip()
else "No"
)
env.log(
f"Parser Results Execution Parameters"
f"\n> Report file: {env.file}"
f"\n> Config file: {env.config}"
f"\n> TestRail instance: {env.host} (user: {env.username})"
f"\n> Project: {env.project if env.project else env.project_id}"
f"\n> Run title: {env.title}"
f"\n> Update run: {env.run_id if env.run_id else 'No'}"
f"\n> Add to milestone: {env.milestone_id if env.milestone_id else 'No'}"
f"\n> Auto-assign failures: {assign_info}"
f"\n> Auto-create entities: {env.auto_creation_response}"
)

@@ -27,5 +33,5 @@

try:
return [int(part.strip()) for part in value.split(',')]
return [int(part.strip()) for part in value.split(",")]
except:
raise BadParameter('Invalid format, use a comma-separated list (i.e.: 43,19)')
raise BadParameter("Invalid format, use a comma-separated list (i.e.: 43,19)")

@@ -42,3 +48,3 @@

type=click.Choice(["auto", "name", "property"], case_sensitive=False),
help="Mechanism to match cases between the report and TestRail."
help="Mechanism to match cases between the report and TestRail.",
)

@@ -93,3 +99,3 @@ @click.option(

help="List of case fields and values for new test cases creation. "
"Usage: --case-fields type_id:1 --case-fields priority_id:3",
"Usage: --case-fields type_id:1 --case-fields priority_id:3",
)

@@ -102,3 +108,3 @@ @click.option(

help="List of result fields and values for test results creation. "
"Usage: --result-fields custom_field_a:value1 --result-fields custom_field_b:3",
"Usage: --result-fields custom_field_a:value1 --result-fields custom_field_b:3",
)

@@ -111,1 +117,54 @@ @click.option("--allow-ms", is_flag=True, help="Allows using milliseconds for elapsed times.")

return wrapper_common_options
def bdd_parser_options(f):
"""Options decorator for BDD/Cucumber parsers that don't need case-matcher or section-id"""
@click.option("-f", "--file", type=click.Path(), metavar="", help="Filename and path.")
@click.option("--close-run", is_flag=True, help="Close the newly created run")
@click.option("--title", metavar="", help="Title of Test Run to be created or updated in TestRail.")
@click.option(
"--suite-id",
type=click.IntRange(min=1),
metavar="",
help="Suite ID to submit results to.",
)
@click.option(
"--run-id",
type=click.IntRange(min=1),
metavar="",
help="Run ID for the results they are reporting (otherwise the tool will attempt to create a new run).",
)
@click.option(
"--plan-id",
type=click.IntRange(min=1),
metavar="",
help="Plan ID with which the Test Run will be associated.",
)
@click.option(
"--config-ids",
metavar="",
callback=resolve_comma_separated_list,
help="Comma-separated configuration IDs to use along with Test Plans (i.e.: 34,52).",
)
@click.option(
"--milestone-id",
type=click.IntRange(min=1),
metavar="",
help="Milestone ID to which the Test Run should be associated to.",
)
@click.option("--run-description", metavar="", default="", help="Summary text to be added to the test run.")
@click.option(
"--result-fields",
multiple=True,
metavar="",
default=[],
help="List of result fields and values for test results creation. "
"Usage: --result-fields custom_field_a:value1 --result-fields custom_field_b:3",
)
@click.option("--allow-ms", is_flag=True, help="Allows using milliseconds for elapsed times.")
@functools.wraps(f)
def wrapper_bdd_options(*args, **kwargs):
return f(*args, **kwargs)
return wrapper_bdd_options

@@ -79,2 +79,5 @@ import trcli

parse_junit=dict(**FAULT_MAPPING, **PARSE_COMMON_FAULT_MAPPING, **PARSE_JUNIT_OR_ROBOT_FAULT_MAPPING),
import_gherkin=dict(**FAULT_MAPPING, **PARSE_COMMON_FAULT_MAPPING),
export_gherkin=dict(**FAULT_MAPPING),
parse_cucumber=dict(**FAULT_MAPPING, **PARSE_COMMON_FAULT_MAPPING, **PARSE_JUNIT_OR_ROBOT_FAULT_MAPPING),
parse_openapi=dict(**FAULT_MAPPING, **PARSE_COMMON_FAULT_MAPPING),

@@ -102,2 +105,5 @@ parse_robot=dict(**FAULT_MAPPING, **PARSE_COMMON_FAULT_MAPPING, **PARSE_JUNIT_OR_ROBOT_FAULT_MAPPING),

- parse_junit: JUnit XML Files (& Similar)
- parse_cucumber: Cucumber JSON results (BDD)
- import_gherkin: Upload .feature files to TestRail BDD
- export_gherkin: Export BDD test cases as .feature files
- parse_robot: Robot Framework XML Files

@@ -107,3 +113,3 @@ - parse_openapi: OpenAPI YML Files

- labels: Manage labels (projects, cases, and tests)
- references: Manage references"""
- references: Manage references (cases and runs)"""

@@ -110,0 +116,0 @@ MISSING_COMMAND_SLOGAN = """Usage: trcli [OPTIONS] COMMAND [ARGS]...\nTry 'trcli --help' for help.

@@ -41,11 +41,10 @@ from dataclasses import dataclass

custom_step_results: List[TestRailSeparatedStep] = field(default_factory=list, skip_if_default=True)
custom_testrail_bdd_scenario_results: List[TestRailSeparatedStep] = field(
default_factory=list, skip_if_default=True
)
def __post_init__(self):
if self.junit_result_unparsed is not None:
self.status_id = self.calculate_status_id_from_junit_element(
self.junit_result_unparsed
)
self.comment = self.get_comment_from_junit_element(
self.junit_result_unparsed
)
self.status_id = self.calculate_status_id_from_junit_element(self.junit_result_unparsed)
self.comment = self.get_comment_from_junit_element(self.junit_result_unparsed)
if self.elapsed is not None:

@@ -75,5 +74,3 @@ self.elapsed = self.proper_format_for_elapsed(self.elapsed)

return ""
elif not any(
[junit_result[0].type, junit_result[0].message, junit_result[0].text]
):
elif not any([junit_result[0].type, junit_result[0].message, junit_result[0].text]):
return ""

@@ -208,8 +205,4 @@ else:

section_id: int = field(default=None, metadata={"serde_skip": True})
testcases: List[TestRailCase] = field(
default_factory=list, metadata={"serde_skip": True}
)
properties: List[TestRailProperty] = field(
default_factory=list, metadata={"serde_skip": True}
)
testcases: List[TestRailCase] = field(default_factory=list, metadata={"serde_skip": True})
properties: List[TestRailProperty] = field(default_factory=list, metadata={"serde_skip": True})

@@ -237,5 +230,3 @@ def __getitem__(self, item):

description: str = field(default=None, skip_if_default=True)
testsections: List[TestRailSection] = field(
default_factory=list, metadata={"serde_skip": True}
)
testsections: List[TestRailSection] = field(default_factory=list, metadata={"serde_skip": True})
source: str = field(default=None, metadata={"serde_skip": True})

@@ -242,0 +233,0 @@

@@ -7,4 +7,3 @@ import glob

from junitparser import (
JUnitXml, JUnitXmlError, Element, Attr, TestSuite as JUnitTestSuite, TestCase as JUnitTestCase)
from junitparser import JUnitXml, JUnitXmlError, Element, Attr, TestSuite as JUnitTestSuite, TestCase as JUnitTestCase

@@ -19,12 +18,8 @@ from trcli.cli import Environment

TestRailProperty,
TestRailResult, TestRailSeparatedStep,
TestRailResult,
TestRailSeparatedStep,
)
from trcli.readers.file_parser import FileParser
STEP_STATUSES = {
"passed": 1,
"untested": 3,
"skipped": 4,
"failed": 5
}
STEP_STATUSES = {"passed": 1, "untested": 3, "skipped": 4, "failed": 5}

@@ -52,3 +47,3 @@ TestCase.id = Attr("id")

self._special = environment.special_parser
self._case_result_statuses = {"passed": 1, "skipped": 4,"error": 5, "failure": 5}
self._case_result_statuses = {"passed": 1, "skipped": 4, "error": 5, "failure": 5}
self._update_with_custom_statuses()

@@ -140,3 +135,3 @@

f"Message: {result.message}" if result.message else "",
f"Text: {result.text}" if result.text else ""
f"Text: {result.text}" if result.text else "",
]

@@ -162,3 +157,3 @@ return "\n".join(part for part in parts if part).strip()

elif name.startswith("testrail_result_step"):
status, step = value.split(':', maxsplit=1)
status, step = value.split(":", maxsplit=1)
step_obj = TestRailSeparatedStep(step.strip())

@@ -177,3 +172,3 @@ step_obj.status_id = STEP_STATUSES[status.lower().strip()]

case_fields.append(field_value)
# Extract refs for case updates

@@ -210,4 +205,5 @@ if field_value and field_value.startswith("refs:"):

case_id, case_name = self._extract_case_id_and_name(case)
result_steps, attachments, result_fields, comments, case_fields, case_refs, sauce_session = self._parse_case_properties(
case)
result_steps, attachments, result_fields, comments, case_fields, case_refs, sauce_session = (
self._parse_case_properties(case)
)
result_fields_dict, case_fields_dict = self._resolve_case_fields(result_fields, case_fields)

@@ -231,10 +227,11 @@ status_id = self._get_status_id_for_case_result(case)

automation_id = (
case_fields_dict.pop(OLD_SYSTEM_NAME_AUTOMATION_ID, None)
or case._elem.get(OLD_SYSTEM_NAME_AUTOMATION_ID, automation_id))
automation_id = case_fields_dict.pop(OLD_SYSTEM_NAME_AUTOMATION_ID, None) or case._elem.get(
OLD_SYSTEM_NAME_AUTOMATION_ID, automation_id
)
# Create TestRailCase kwargs
case_kwargs = {
"title": TestRailCaseFieldsOptimizer.extract_last_words(case_name,
TestRailCaseFieldsOptimizer.MAX_TESTCASE_TITLE_LENGTH),
"title": TestRailCaseFieldsOptimizer.extract_last_words(
case_name, TestRailCaseFieldsOptimizer.MAX_TESTCASE_TITLE_LENGTH
),
"case_id": case_id,

@@ -245,13 +242,13 @@ "result": result,

}
# Only set refs field if case_refs has actual content
if case_refs and case_refs.strip():
case_kwargs["refs"] = case_refs
test_case = TestRailCase(**case_kwargs)
# Store JUnit references as a temporary attribute for case updates (not serialized)
if case_refs and case_refs.strip():
test_case._junit_case_refs = case_refs
test_cases.append(test_case)

@@ -268,3 +265,2 @@

def _parse_sections(self, suite) -> List[TestRailSection]:

@@ -285,12 +281,435 @@ sections = []

properties = self._extract_section_properties(section, processed_props)
test_cases = self._parse_test_cases(section)
# BDD MODE: Group all scenarios under one test case
if self._is_bdd_mode():
test_case = self._parse_bdd_feature_as_single_case(section)
test_cases = [test_case] if test_case else []
# STANDARD MODE: One test case per JUnit testcase
else:
test_cases = self._parse_test_cases(section)
self.env.log(f"Processed {len(test_cases)} test cases in section {section.name}.")
sections.append(TestRailSection(
section.name,
testcases=test_cases,
properties=properties,
))
sections.append(
TestRailSection(
section.name,
testcases=test_cases,
properties=properties,
)
)
return sections
def _is_bdd_mode(self) -> bool:
"""Check if BDD grouping mode is enabled
Returns:
True if special parser is 'bdd', False otherwise
"""
return self._special == "bdd"
def _extract_feature_case_id_from_property(self, testsuite) -> Union[int, None]:
"""Extract case ID from testsuite-level properties
Looks for properties: testrail_case_id, test_id, bdd_case_id
Args:
testsuite: JUnit testsuite element
Returns:
Case ID as integer or None if not found
"""
for prop in testsuite.properties():
if prop.name in ["testrail_case_id", "test_id", "bdd_case_id"]:
case_id_str = prop.value.lower().replace("c", "")
if case_id_str.isnumeric():
self.env.vlog(f"BDD: Found case ID C{case_id_str} in testsuite property '{prop.name}'")
return int(case_id_str)
return None
def _extract_case_id_from_testcases(self, testsuite) -> List[tuple]:
"""Extract case IDs from testcase properties and names
Args:
testsuite: JUnit testsuite element
Returns:
List of tuples (testcase_name, case_id)
"""
testcase_case_ids = []
for testcase in testsuite:
tc_case_id = None
# Check testcase properties first
for case_props in testcase.iterchildren(Properties):
for prop in case_props.iterchildren(Property):
if prop.name == "test_id":
tc_case_id_str = prop.value.lower().replace("c", "")
if tc_case_id_str.isnumeric():
tc_case_id = int(tc_case_id_str)
break
# Check testcase name if property not found
if not tc_case_id:
tc_case_id, _ = MatchersParser.parse_name_with_id(testcase.name)
if tc_case_id:
testcase_case_ids.append((testcase.name, tc_case_id))
return testcase_case_ids
def _extract_and_validate_bdd_case_id(self, testsuite) -> tuple:
"""Extract case ID from various sources and validate consistency
In BDD mode, all scenarios in a feature MUST share the same case ID.
Priority order:
1. Testsuite-level property (testrail_case_id, test_id, bdd_case_id)
2. Testcase properties (all must be same)
3. Testcase names (all must be same)
4. Testsuite name pattern [C123]
Args:
testsuite: JUnit testsuite element
Returns:
Tuple of (case_id: int or None, validation_errors: List[str])
"""
validation_errors = []
# Priority 1: Testsuite-level property
case_id = self._extract_feature_case_id_from_property(testsuite)
if case_id:
return case_id, []
# Priority 2 & 3: Check testcase properties and names
testcase_case_ids = self._extract_case_id_from_testcases(testsuite)
if not testcase_case_ids:
validation_errors.append(
f"BDD Error: No case ID found for feature '{testsuite.name}'.\n"
f" Add case ID using one of:\n"
f" - Testsuite property: <property name='testrail_case_id' value='C42'/>\n"
f" - Testcase names: 'Scenario name C42'\n"
f" - Testcase property: <property name='test_id' value='C42'/>"
)
return None, validation_errors
# Check consistency - all must be the same
unique_case_ids = set(cid for _, cid in testcase_case_ids)
if len(unique_case_ids) > 1:
validation_errors.append(
f"BDD Error: Multiple different case IDs found in feature '{testsuite.name}'.\n"
f" In BDD mode, all scenarios must map to the SAME TestRail case.\n"
f" Found case IDs: {sorted(unique_case_ids)}\n"
f" Scenarios:\n"
+ "\n".join(f" - '{name}' → C{cid}" for name, cid in testcase_case_ids)
+ f"\n\n If these should be separate test cases, remove --special-parser bdd flag."
)
return None, validation_errors
case_id = testcase_case_ids[0][1]
self.env.vlog(
f"BDD: Found consistent case ID C{case_id} across {len(testcase_case_ids)} scenario(s) "
f"in feature '{testsuite.name}'"
)
# Priority 4: Check testsuite name if no testcase IDs found
if not case_id and self._case_matcher == MatchersParser.NAME:
case_id, _ = MatchersParser.parse_name_with_id(testsuite.name)
if case_id:
self.env.vlog(f"BDD: Found case ID C{case_id} in testsuite name")
return case_id, []
def _validate_bdd_case_exists(self, case_id: int, feature_name: str) -> tuple:
"""Validate that case exists in TestRail AND is a BDD test case
A valid BDD test case MUST have:
- Exist in TestRail (case ID is valid)
- Have custom_testrail_bdd_scenario field with content
Args:
case_id: TestRail case ID to validate
feature_name: Feature/testsuite name for error context
Returns:
Tuple of (is_valid: bool, error_message: str, case_data: dict)
"""
try:
# Import here to avoid circular dependency
from trcli.api.api_request_handler import ApiRequestHandler
from trcli.api.project_based_client import ProjectBasedClient
from trcli.data_classes.dataclass_testrail import TestRailSuite
# Get API client
temp_suite = TestRailSuite(name="temp", suite_id=1)
project_client = ProjectBasedClient(environment=self.env, suite=temp_suite)
api_handler = project_client.api_request_handler
# Step 1: Get case from TestRail
response = api_handler.client.send_get(f"get_case/{case_id}")
if response.error_message:
return (
False,
(
f"BDD Validation Error: Case C{case_id} does not exist in TestRail.\n"
f"Feature: '{feature_name}'\n"
f"API Error: {response.error_message}\n\n"
f"Action Required:\n"
f" 1. Verify case C{case_id} exists in TestRail\n"
f" 2. Ensure you have permission to access this case\n"
f" 3. Create the BDD test case if it doesn't exist:\n"
f" trcli import_gherkin -f {feature_name}.feature --section-id <ID>"
),
{},
)
case_data = response.response_text
# Step 2: Validate it's a BDD test case
# Resolve BDD case field name dynamically
bdd_field_name = api_handler.get_bdd_case_field_name()
bdd_scenario_field = case_data.get(bdd_field_name)
if not bdd_scenario_field:
return (
False,
(
f"BDD Validation Error: Case C{case_id} is NOT a BDD test case.\n"
f"Feature: '{feature_name}'\n"
f"Case Title: '{case_data.get('title', 'Unknown')}'\n\n"
f"Reason: The '{bdd_field_name}' field is empty or null.\n"
f"This indicates the case is using a regular template, not the BDD template.\n\n"
f"Action Required:\n"
f" Option 1: Upload this case using standard mode (remove --special-parser bdd)\n"
f" Option 2: Create a proper BDD test case with:\n"
f" trcli import_gherkin -f {feature_name}.feature --section-id <ID>\n"
f" Option 3: Convert existing case to BDD template in TestRail UI"
),
case_data,
)
# Success!
self.env.vlog(
f"BDD: Validated case C{case_id} is a valid BDD test case\n"
f" - Title: '{case_data.get('title')}'\n"
f" - Template ID: {case_data.get('template_id')}\n"
f" - Has BDD scenarios: Yes"
)
return True, "", case_data
except Exception as e:
return (
False,
(
f"BDD Validation Error: Unable to validate case C{case_id}.\n"
f"Feature: '{feature_name}'\n"
f"Error: {str(e)}\n\n"
f"Action Required: Verify your TestRail connection and case access permissions."
),
{},
)
def _aggregate_scenario_statuses(self, scenario_statuses: List[int]) -> int:
"""Aggregate scenario statuses using fail-fast logic
Fail-fast logic:
- If ANY scenario is Failed (5) → Feature is Failed (5)
- Else if ANY scenario is Skipped (4) → Feature is Skipped (4)
- Else if ALL scenarios Passed (1) → Feature is Passed (1)
Args:
scenario_statuses: List of TestRail status IDs
Returns:
Aggregated status ID
"""
if 5 in scenario_statuses: # Any failure
return 5
elif 4 in scenario_statuses: # Any skipped (no failures)
return 4
else: # All passed
return 1
def _format_failure_message(self, scenario_name: str, result_obj) -> str:
"""Format failure details for comment
Args:
scenario_name: Clean scenario name
result_obj: JUnit result object (failure/error element)
Returns:
Formatted failure message
"""
lines = [f"Scenario: {scenario_name}"]
if result_obj.type:
lines.append(f" Type: {result_obj.type}")
if result_obj.message:
lines.append(f" Message: {result_obj.message}")
if result_obj.text:
# Truncate if too long
text = result_obj.text.strip()
if len(text) > 500:
text = text[:500] + "\n ... (truncated)"
lines.append(f" Details:\n {text}")
return "\n".join(lines)
def _parse_bdd_feature_as_single_case(self, testsuite) -> Union[TestRailCase, None]:
"""Parse all scenarios in a testsuite as a single BDD test case
Enhanced validation:
1. Case ID exists
2. All scenarios have same case ID
3. Case exists in TestRail
4. Case is actually a BDD test case (has custom_testrail_bdd_scenario)
Args:
testsuite: JUnit testsuite containing multiple scenarios
Returns:
Single TestRailCase with aggregated scenario results, or None if validation fails
"""
feature_name = testsuite.name
# Step 1: Extract and validate case ID consistency
case_id, validation_errors = self._extract_and_validate_bdd_case_id(testsuite)
if validation_errors:
for error in validation_errors:
self.env.elog(error)
return None
if not case_id:
self.env.elog(f"BDD Error: No valid case ID found for feature '{feature_name}'. " f"Skipping this feature.")
return None
# Step 2: Validate case exists AND is a BDD case
is_valid, error_message, case_data = self._validate_bdd_case_exists(case_id, feature_name)
if not is_valid:
self.env.elog(error_message)
# Raise exception to stop processing
from trcli.data_classes.validation_exception import ValidationException
raise ValidationException(
field_name="case_id",
class_name="BDD Feature",
reason=f"Case C{case_id} validation failed. See error above for details.",
)
self.env.log(f"BDD: Case C{case_id} validated as BDD test case for feature '{feature_name}'")
# Step 3: Parse all scenarios
bdd_scenario_results = []
scenario_statuses = []
total_time = 0
failure_messages = []
for idx, testcase in enumerate(testsuite, 1):
scenario_name = testcase.name
# Clean case ID from name
_, clean_scenario_name = MatchersParser.parse_name_with_id(scenario_name)
if not clean_scenario_name:
clean_scenario_name = scenario_name
scenario_time = float(testcase.time or 0)
total_time += scenario_time
# Determine scenario status
if testcase.is_passed:
scenario_status = 1
scenario_status_label = "PASSED"
elif testcase.is_skipped:
scenario_status = 4
scenario_status_label = "SKIPPED"
else: # Failed
scenario_status = 5
scenario_status_label = "FAILED"
# Capture failure details
if testcase.result:
result_obj = testcase.result[0]
error_msg = self._format_failure_message(clean_scenario_name, result_obj)
failure_messages.append(error_msg)
# Track status for aggregation
scenario_statuses.append(scenario_status)
# Create BDD scenario result (matches Cucumber parser format)
step = TestRailSeparatedStep(content=clean_scenario_name)
step.status_id = scenario_status
bdd_scenario_results.append(step)
self.env.vlog(f" - Scenario {idx}: {clean_scenario_name} → {scenario_status_label} " f"({scenario_time}s)")
# Step 4: Aggregate statuses
overall_status = self._aggregate_scenario_statuses(scenario_statuses)
status_labels = {1: "PASSED", 4: "SKIPPED", 5: "FAILED"}
overall_status_label = status_labels.get(overall_status, "UNKNOWN")
# Step 5: Create comment with summary
passed_count = scenario_statuses.count(1)
failed_count = scenario_statuses.count(5)
skipped_count = scenario_statuses.count(4)
total_count = len(scenario_statuses)
summary = (
f"Feature Summary:\n"
f" Total Scenarios: {total_count}\n"
f" Passed: {passed_count}\n"
f" Failed: {failed_count}\n"
f" Skipped: {skipped_count}\n"
)
if failure_messages:
comment = f"{summary}\n{'='*50}\nFailure Details:\n\n" + "\n\n".join(failure_messages)
else:
comment = summary
# Step 6: Create aggregated result
# Get API handler to resolve BDD result field name
from trcli.api.project_based_client import ProjectBasedClient
from trcli.data_classes.dataclass_testrail import TestRailSuite as TRSuite
temp_suite = TRSuite(name="temp", suite_id=1)
project_client = ProjectBasedClient(environment=self.env, suite=temp_suite)
bdd_result_field_name = project_client.api_request_handler.get_bdd_result_field_name()
result = TestRailResult(
case_id=case_id,
status_id=overall_status,
elapsed=total_time if total_time > 0 else None, # Pass numeric value, not formatted string
comment=comment,
)
# Add BDD scenario results to result_fields dict (for serialization)
# Convert TestRailSeparatedStep objects to dicts for API
result.result_fields[bdd_result_field_name] = [
{"content": step.content, "status_id": step.status_id} for step in bdd_scenario_results
]
# Step 7: Create test case
test_case = TestRailCase(
title=feature_name,
case_id=case_id,
result=result,
)
self.env.log(
f"BDD: Grouped {total_count} scenario(s) under case C{case_id} "
f"'{feature_name}' → {overall_status_label}"
)
self.env.log(f" Breakdown: {passed_count} passed, {failed_count} failed, " f"{skipped_count} skipped")
return test_case
def parse_file(self) -> List[TestRailSuite]:

@@ -310,7 +729,9 @@ self.env.log("Parsing JUnit report.")

testrail_suites.append(TestRailSuite(
suite_name,
testsections=testrail_sections,
source=self.filename,
))
testrail_suites.append(
TestRailSuite(
suite_name,
testsections=testrail_sections,
source=self.filename,
)
)

@@ -325,5 +746,5 @@ return testrail_suites

continue
divider_index = section.name.find('-')
divider_index = section.name.find("-")
subsuite_name = section.name[:divider_index].strip()
section.name = section.name[divider_index + 1:].strip()
section.name = section.name[divider_index + 1 :].strip()
new_xml = JUnitXml(subsuite_name)

@@ -360,3 +781,4 @@ if subsuite_name not in subsuites.keys():

if __name__ == '__main__':
if __name__ == "__main__":
pass

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display