Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

e-data

Package Overview
Dependencies
Maintainers
1
Versions
85
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

e-data - npm Package Compare versions

Comparing version
1.2.22
to
2.0.0b1
.python-version

Sorry, the diff of this file is not supported yet

+178
"""Datadis connector module testing."""
import datetime
from unittest.mock import AsyncMock, patch
import pytest
from edata.connectors.datadis import DatadisConnector
MOCK_USERNAME = "fake_user"
MOCK_PASSWORD = "fake_password"
SUPPLIES_RESPONSE = [
{
"cups": "ESXXXXXXXXXXXXXXXXTEST",
"validDateFrom": "2022/01/01",
"validDateTo": "",
"pointType": 5,
"distributorCode": "2",
"address": "fake address, fake 12345",
"postalCode": "12345",
"province": "FAKE PROVINCE",
"municipality": "FAKE MUNICIPALITY",
"distributor": "FAKE DISTRIBUTOR",
}
]
SUPPLIES_EXPECTATIONS = [
{
"cups": "ESXXXXXXXXXXXXXXXXTEST",
"date_start": datetime.datetime(2022, 1, 1, 0, 0),
"date_end": datetime.datetime.now().replace(
hour=0, minute=0, second=0, microsecond=0
)
+ datetime.timedelta(days=1),
"point_type": 5,
"distributor_code": "2",
"address": "fake address, fake 12345",
"postal_code": "12345",
"province": "FAKE PROVINCE",
"municipality": "FAKE MUNICIPALITY",
"distributor": "FAKE DISTRIBUTOR",
}
]
CONTRACTS_RESPONSE = [
{
"startDate": "2022/10/22",
"endDate": "2022/10/22",
"marketer": "fake_marketer",
"contractedPowerkW": [1.5, 1.5],
}
]
CONTRACTS_EXPECTATIONS = [
{
"date_start": datetime.datetime(2022, 10, 22, 0, 0),
"date_end": datetime.datetime(2022, 10, 22, 0, 0),
"marketer": "fake_marketer",
"distributor_code": "2",
"power_p1": 1.5,
"power_p2": 1.5,
}
]
CONSUMPTIONS_RESPONSE = [
{
"cups": "ESXXXXXXXXXXXXXXXXTEST",
"date": "2022/10/22",
"time": "01:00",
"consumptionKWh": 1.0,
"obtainMethod": "Real",
},
{
"cups": "ESXXXXXXXXXXXXXXXXTEST",
"date": "2022/10/22",
"time": "02:00",
"consumptionKWh": 1.0,
"obtainMethod": "Real",
},
]
CONSUMPTIONS_EXPECTATIONS = [
{
"datetime": datetime.datetime(2022, 10, 22, 0, 0),
"delta_h": 1,
"value_kwh": 1.0,
"surplus_kwh": 0,
"real": True,
},
{
"datetime": datetime.datetime(2022, 10, 22, 1, 0),
"delta_h": 1,
"value_kwh": 1.0,
"surplus_kwh": 0,
"real": True,
},
]
MAXIMETER_RESPONSE = [
{
"cups": "ESXXXXXXXXXXXXXXXXTEST",
"date": "2022/03/01",
"time": "12:00",
"maxPower": 1.0,
}
]
MAXIMETER_EXPECTATIONS = [
{
"datetime": datetime.datetime(2022, 3, 1, 12, 0),
"value_kw": 1.0,
}
]
# Tests for async methods (now the only methods available)
@pytest.mark.asyncio
@patch.object(DatadisConnector, "_get_token", AsyncMock(return_value=True))
@patch.object(DatadisConnector, "_get", AsyncMock(return_value=SUPPLIES_RESPONSE))
async def test_get_supplies():
"""Test a successful 'get_supplies' query."""
connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
result = await connector.get_supplies()
# Note: Now returns Pydantic models instead of dicts
# Convert to dicts for comparison with expectations
result_dicts = [supply.model_dump() for supply in result]
assert result_dicts == SUPPLIES_EXPECTATIONS
@pytest.mark.asyncio
@patch.object(DatadisConnector, "_get_token", AsyncMock(return_value=True))
@patch.object(DatadisConnector, "_get", AsyncMock(return_value=CONTRACTS_RESPONSE))
async def test_get_contract_detail():
"""Test a successful 'get_contract_detail' query."""
connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
result = await connector.get_contract_detail("ESXXXXXXXXXXXXXXXXTEST", "2")
# Note: Now returns Pydantic models instead of dicts
result_dicts = [contract.model_dump() for contract in result]
assert result_dicts == CONTRACTS_EXPECTATIONS
@pytest.mark.asyncio
@patch.object(DatadisConnector, "_get_token", AsyncMock(return_value=True))
@patch.object(DatadisConnector, "_get", AsyncMock(return_value=CONSUMPTIONS_RESPONSE))
async def test_get_consumption_data():
"""Test a successful 'get_consumption_data' query."""
connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
result = await connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0", # measurement_type as string
5,
)
# Note: Now returns Pydantic models instead of dicts
result_dicts = [consumption.model_dump() for consumption in result]
assert result_dicts == CONSUMPTIONS_EXPECTATIONS
@pytest.mark.asyncio
@patch.object(DatadisConnector, "_get_token", AsyncMock(return_value=True))
@patch.object(DatadisConnector, "_get", AsyncMock(return_value=MAXIMETER_RESPONSE))
async def test_get_max_power():
"""Test a successful 'get_max_power' query."""
connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
result = await connector.get_max_power(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 3, 1, 0, 0, 0),
datetime.datetime(2022, 4, 1, 0, 0, 0),
)
# Note: Now returns Pydantic models instead of dicts
result_dicts = [maxpower.model_dump() for maxpower in result]
assert result_dicts == MAXIMETER_EXPECTATIONS
"""Tests for REData (online)"""
from datetime import datetime, timedelta
import pytest
from edata.connectors.redata import REDataConnector
@pytest.mark.asyncio
async def test_get_realtime_prices():
"""Test a successful 'get_realtime_prices' query"""
connector = REDataConnector()
yesterday = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=1)
response = await connector.get_realtime_prices(
yesterday, yesterday + timedelta(days=1) - timedelta(minutes=1), False
)
assert len(response) == 24
@pytest.mark.asyncio
async def test_async_get_realtime_prices():
"""Test a successful 'get_realtime_prices' query (legacy test name)"""
connector = REDataConnector()
yesterday = datetime.now().replace(hour=0, minute=0, second=0) - timedelta(days=1)
response = await connector.get_realtime_prices(
yesterday, yesterday + timedelta(days=1) - timedelta(minutes=1), False
)
assert len(response) == 24
"""Tests for BillingService."""
import shutil
import tempfile
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from edata.models.pricing import PricingData, PricingRules
from edata.services.billing import BillingService
class TestBillingService:
"""Test suite for BillingService."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for tests."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
@pytest.fixture
def mock_redata_connector(self):
"""Mock REDataConnector for testing."""
with patch("edata.services.billing.REDataConnector") as mock_connector_class:
mock_connector = Mock()
mock_connector_class.return_value = mock_connector
yield mock_connector, mock_connector_class
@pytest.fixture
def mock_database_service(self):
"""Mock DatabaseService for testing."""
with patch("edata.services.billing.get_database_service") as mock_get_db:
mock_db = Mock()
# Hacer que los métodos async retornen AsyncMock
mock_db.get_pvpc_prices = AsyncMock(return_value=[])
mock_db.save_pvpc_price = AsyncMock(return_value=Mock())
mock_db.get_billing = AsyncMock(return_value=[])
mock_db.save_billing = AsyncMock(return_value=Mock())
mock_db.get_consumptions = AsyncMock(return_value=[])
mock_db.get_contracts = AsyncMock(return_value=[])
mock_db.generate_pricing_config_hash = Mock(return_value="test_hash")
mock_db.get_latest_pvpc_price = AsyncMock(return_value=None)
mock_db.get_latest_billing = AsyncMock(return_value=None)
mock_get_db.return_value = mock_db
yield mock_db
@pytest_asyncio.fixture
async def billing_service(
self, temp_dir, mock_redata_connector, mock_database_service
):
"""Create a BillingService instance for testing."""
return BillingService(storage_dir=temp_dir)
@pytest.fixture
def sample_pvpc_prices(self):
"""Sample PVPC price data for testing."""
return [
PricingData(
datetime=datetime(2024, 6, 17, 10, 0),
value_eur_kwh=0.12345,
delta_h=1.0,
),
PricingData(
datetime=datetime(2024, 6, 17, 11, 0),
value_eur_kwh=0.13456,
delta_h=1.0,
),
PricingData(
datetime=datetime(2024, 6, 17, 12, 0),
value_eur_kwh=0.14567,
delta_h=1.0,
),
]
@pytest.fixture
def sample_pricing_rules_pvpc(self):
"""Sample pricing rules for PVPC configuration."""
return PricingRules(
p1_kw_year_eur=30.67,
p2_kw_year_eur=1.42,
p1_kwh_eur=None, # PVPC
p2_kwh_eur=None, # PVPC
p3_kwh_eur=None, # PVPC
surplus_p1_kwh_eur=0.05,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.03,
meter_month_eur=0.81,
market_kw_year_eur=3.11,
electricity_tax=1.05113,
iva_tax=1.21,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
@pytest.fixture
def sample_pricing_rules_custom(self):
"""Sample pricing rules for custom pricing configuration."""
return PricingRules(
p1_kw_year_eur=30.67,
p2_kw_year_eur=1.42,
p1_kwh_eur=0.15, # Custom prices
p2_kwh_eur=0.12,
p3_kwh_eur=0.08,
surplus_p1_kwh_eur=0.05,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.03,
meter_month_eur=0.81,
market_kw_year_eur=3.11,
electricity_tax=1.05113,
iva_tax=1.21,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
@pytest.mark.asyncio
async def test_initialization(
self, temp_dir, mock_redata_connector, mock_database_service
):
"""Test BillingService initialization."""
mock_connector, mock_connector_class = mock_redata_connector
service = BillingService(storage_dir=temp_dir)
# Verify REDataConnector was initialized
mock_connector_class.assert_called_once()
# Verify database service is obtained lazily by calling _get_db_service
db_service = await service._get_db_service()
assert db_service is mock_database_service
@pytest.mark.asyncio
async def test_update_pvpc_prices_success(
self,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pvpc_prices,
):
"""Test successful PVPC price update."""
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 0, 0)
end_date = datetime(2024, 6, 17, 23, 59)
# Mock REData connector response
mock_connector.get_realtime_prices = AsyncMock(return_value=sample_pvpc_prices)
# Mock database service responses - no existing prices
mock_database_service.get_pvpc_prices.return_value = []
# Execute PVPC update
result = await billing_service.update_pvpc_prices(
start_date=start_date, end_date=end_date, is_ceuta_melilla=False
)
# Verify REData connector was called correctly
mock_connector.get_realtime_prices.assert_called_once_with(
dt_from=start_date, dt_to=end_date, is_ceuta_melilla=False
)
# Verify database service was called for each price
assert mock_database_service.save_pvpc_price.call_count == len(
sample_pvpc_prices
)
# Verify result structure
assert result["success"] is True
assert result["region"] == "Peninsula"
assert result["geo_id"] == 8741
assert result["stats"]["fetched"] == len(sample_pvpc_prices)
assert result["stats"]["saved"] == len(sample_pvpc_prices)
assert result["stats"]["updated"] == 0
@patch("edata.utils.get_pvpc_tariff")
def test_get_custom_prices_success(
self,
mock_get_pvpc_tariff,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_custom,
):
"""Test successful custom price calculation."""
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 10, 0) # Monday 10 AM
end_date = datetime(2024, 6, 17, 13, 0) # Monday 1 PM (3 hours)
# Mock tariff calculation to cycle through periods
mock_get_pvpc_tariff.side_effect = ["p1", "p1", "p1"] # All P1 hours
# Execute custom price calculation (not async)
result = billing_service.get_custom_prices(
pricing_rules=sample_pricing_rules_custom,
start_date=start_date,
end_date=end_date,
)
# Verify tariff function was called for each hour
assert mock_get_pvpc_tariff.call_count == 3
# Verify result structure
assert len(result) == 3
assert all(isinstance(price, PricingData) for price in result)
assert all(
price.value_eur_kwh == sample_pricing_rules_custom.p1_kwh_eur
for price in result
)
@pytest.mark.asyncio
async def test_get_stored_pvpc_prices(
self, billing_service, mock_redata_connector, mock_database_service
):
"""Test getting stored PVPC prices from database."""
start_date = datetime(2024, 6, 17, 0, 0)
end_date = datetime(2024, 6, 17, 23, 59)
geo_id = 8741
# Mock database service response
mock_prices = [Mock(), Mock(), Mock()]
mock_database_service.get_pvpc_prices.return_value = mock_prices
# Execute get stored prices
result = await billing_service.get_stored_pvpc_prices(
start_date=start_date, end_date=end_date, geo_id=geo_id
)
# Verify database service was called correctly
mock_database_service.get_pvpc_prices.assert_called_once_with(
start_date, end_date, geo_id
)
# Verify result
assert result == mock_prices
@pytest.mark.asyncio
async def test_get_prices_pvpc(
self,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_pvpc,
):
"""Test automatic price retrieval with PVPC configuration."""
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 0, 0)
end_date = datetime(2024, 6, 17, 23, 59)
# Mock stored PVPC prices
mock_pvpc_prices = [
Mock(
datetime=datetime(2024, 6, 17, 10, 0), value_eur_kwh=0.15, delta_h=1.0
),
Mock(
datetime=datetime(2024, 6, 17, 11, 0), value_eur_kwh=0.16, delta_h=1.0
),
]
mock_database_service.get_pvpc_prices.return_value = mock_pvpc_prices
# Execute automatic price retrieval with PVPC rules
result = await billing_service.get_prices(
pricing_rules=sample_pricing_rules_pvpc,
start_date=start_date,
end_date=end_date,
is_ceuta_melilla=False,
)
# Should call PVPC retrieval
mock_database_service.get_pvpc_prices.assert_called_once()
assert len(result) == 2
assert all(isinstance(price, PricingData) for price in result)
@patch("edata.utils.get_pvpc_tariff")
@pytest.mark.asyncio
async def test_get_prices_custom(
self,
mock_get_pvpc_tariff,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_custom,
):
"""Test automatic price retrieval with custom configuration."""
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 11, 0)
# Mock tariff calculation
mock_get_pvpc_tariff.return_value = "p1"
# Execute automatic price retrieval with custom rules
result = await billing_service.get_prices(
pricing_rules=sample_pricing_rules_custom,
start_date=start_date,
end_date=end_date,
)
# Should call custom calculation (not database)
mock_database_service.get_pvpc_prices.assert_not_called()
assert len(result) == 1
assert isinstance(result[0], PricingData)
assert result[0].value_eur_kwh == sample_pricing_rules_custom.p1_kwh_eur
@pytest.mark.asyncio
async def test_get_prices_pvpc_no_data(
self,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_pvpc,
):
"""Test automatic price retrieval with PVPC configuration but no data."""
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 0, 0)
end_date = datetime(2024, 6, 17, 23, 59)
# Mock no PVPC prices available
mock_database_service.get_pvpc_prices.return_value = []
# Execute automatic price retrieval with PVPC rules
result = await billing_service.get_prices(
pricing_rules=sample_pricing_rules_pvpc,
start_date=start_date,
end_date=end_date,
is_ceuta_melilla=False,
)
# Should return None when no data available
assert result is None
mock_database_service.get_pvpc_prices.assert_called_once()
@pytest.mark.asyncio
async def test_get_prices_custom_no_prices_defined(
self, billing_service, mock_redata_connector, mock_database_service
):
"""Test automatic price retrieval with custom configuration but no prices defined."""
from edata.models.pricing import PricingRules
mock_connector, mock_connector_class = mock_redata_connector
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 11, 0)
# Create pricing rules with no energy prices defined
empty_pricing_rules = PricingRules(
p1_kw_year_eur=30.67,
p2_kw_year_eur=1.42,
p1_kwh_eur=None, # No custom prices
p2_kwh_eur=None,
p3_kwh_eur=None,
surplus_p1_kwh_eur=0.05,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.03,
meter_month_eur=0.81,
market_kw_year_eur=3.11,
electricity_tax=1.05113,
iva_tax=1.21,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
# Mock empty PVPC prices since rules indicate PVPC usage
mock_database_service.get_pvpc_prices.return_value = []
# Execute automatic price retrieval with empty custom rules
result = await billing_service.get_prices(
pricing_rules=empty_pricing_rules, start_date=start_date, end_date=end_date
)
# Should return None when no PVPC prices available
assert result is None
# Should have tried to get PVPC prices since no custom prices are defined
mock_database_service.get_pvpc_prices.assert_called_once_with(
start_date, end_date, 8741
)
@pytest.mark.asyncio
@patch("edata.utils.get_pvpc_tariff")
async def test_get_cost_calculation(
self,
mock_get_pvpc_tariff,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_custom,
):
"""Test cost calculation functionality."""
from datetime import datetime
from edata.models.pricing import PricingAggregated
mock_connector, mock_connector_class = mock_redata_connector
cups = "ES0123456789012345AB"
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 12, 0) # 2 hours
# Mock no existing billing data initially
mock_database_service.get_billing.return_value = []
# Mock the pricing config hash generation
mock_database_service.generate_pricing_config_hash.return_value = (
"test_hash_12345678"
)
# Mock consumption data
mock_consumptions = [
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"value_kwh": 0.5,
"surplus_kwh": 0.0,
},
)(),
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 11, 0),
"value_kwh": 0.6,
"surplus_kwh": 0.0,
},
)(),
]
mock_database_service.get_consumptions.return_value = mock_consumptions
# Mock contract data
mock_contracts = [
type(
"MockContract",
(),
{
"power_p1": 4.0,
"power_p2": 4.0,
"date_start": datetime(2024, 6, 17, 0, 0),
"date_end": datetime(2024, 6, 18, 0, 0),
},
)()
]
mock_database_service.get_contracts.return_value = mock_contracts
# Mock the save_billing method to return a success response
mock_database_service.save_billing.return_value = type("MockBilling", (), {})()
# Mock billing data after calculation
mock_billing_results = [
type(
"MockBilling",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"total_eur": 0.05,
"energy_term": 0.03,
"power_term": 0.015,
"others_term": 0.005,
"surplus_term": 0.0,
},
)(),
type(
"MockBilling",
(),
{
"datetime": datetime(2024, 6, 17, 11, 0),
"total_eur": 0.06,
"energy_term": 0.036,
"power_term": 0.015,
"others_term": 0.005,
"surplus_term": 0.0,
},
)(),
]
# Configure get_billing to return empty first, then billing results after update_missing_costs
mock_database_service.get_billing.side_effect = [
[],
mock_billing_results,
mock_billing_results,
]
# Mock tariff calculation - need one call per hour in the data
mock_get_pvpc_tariff.return_value = (
"p1" # Use return_value instead of side_effect
)
# Execute cost calculation
result = await billing_service.get_cost(
cups=cups,
pricing_rules=sample_pricing_rules_custom,
start_date=start_date,
end_date=end_date,
)
# Validate result aggregation from mocked billing data
assert isinstance(result, PricingAggregated)
assert result.datetime == start_date
assert result.value_eur == 0.11 # 0.05 + 0.06
assert result.energy_term == 0.066 # 0.03 + 0.036
assert result.power_term == 0.03 # 0.015 + 0.015
assert result.others_term == 0.01 # 0.005 + 0.005
assert result.surplus_term == 0.0
assert result.delta_h == 2 # 2 billing records
# Verify database calls
mock_database_service.get_consumptions.assert_called_once_with(
cups, start_date, end_date
)
mock_database_service.get_contracts.assert_called_once_with(cups)
@pytest.mark.asyncio
async def test_get_cost_no_consumption_data(
self,
billing_service,
mock_redata_connector,
mock_database_service,
sample_pricing_rules_custom,
):
"""Test cost calculation with no consumption data."""
from datetime import datetime
from edata.models.pricing import PricingAggregated
mock_connector, mock_connector_class = mock_redata_connector
cups = "ES0123456789012345AB"
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 12, 0)
# Mock no existing billing data initially
mock_database_service.get_billing.return_value = []
# Mock the pricing config hash generation
mock_database_service.generate_pricing_config_hash.return_value = (
"test_hash_12345678"
)
# Mock no consumption data
mock_database_service.get_consumptions.return_value = []
# Execute cost calculation
result = await billing_service.get_cost(
cups=cups,
pricing_rules=sample_pricing_rules_custom,
start_date=start_date,
end_date=end_date,
)
# Should return default values when update_missing_costs fails
assert isinstance(result, PricingAggregated)
assert result.value_eur == 0.0
assert result.energy_term == 0.0
assert result.power_term == 0.0
assert result.others_term == 0.0
assert result.surplus_term == 0.0
assert result.delta_h == 2.0 # (end_date - start_date).total_seconds() / 3600
@pytest.mark.asyncio
async def test_get_cost_no_pricing_data(
self, billing_service, mock_redata_connector, mock_database_service
):
"""Test cost calculation with no pricing data available."""
from datetime import datetime
from edata.models.pricing import PricingAggregated, PricingRules
mock_connector, mock_connector_class = mock_redata_connector
cups = "ES0123456789012345AB"
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 12, 0)
# Mock no existing billing data initially
mock_database_service.get_billing.return_value = []
# Mock the pricing config hash generation
mock_database_service.generate_pricing_config_hash.return_value = (
"test_hash_12345678"
)
# Mock consumption data present
mock_consumptions = [
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"value_kwh": 0.5,
"surplus_kwh": 0.0,
},
)()
]
mock_database_service.get_consumptions.return_value = mock_consumptions
# Mock contract data present
mock_contracts = [
type(
"MockContract",
(),
{
"power_p1": 4.0,
"power_p2": 4.0,
"date_start": datetime(2024, 6, 17, 0, 0),
"date_end": datetime(2024, 6, 18, 0, 0),
},
)()
]
mock_database_service.get_contracts.return_value = mock_contracts
# Mock no PVPC prices available
mock_database_service.get_pvpc_prices.return_value = []
# Create PVPC pricing rules
pvpc_pricing_rules = PricingRules(
p1_kw_year_eur=30.67,
p2_kw_year_eur=1.42,
p1_kwh_eur=None, # PVPC
p2_kwh_eur=None,
p3_kwh_eur=None,
surplus_p1_kwh_eur=0.05,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.03,
meter_month_eur=0.81,
market_kw_year_eur=3.11,
electricity_tax=1.05113,
iva_tax=1.21,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
# Execute cost calculation
result = await billing_service.get_cost(
cups=cups,
pricing_rules=pvpc_pricing_rules,
start_date=start_date,
end_date=end_date,
)
# Verify result when no pricing data available
assert isinstance(result, PricingAggregated)
assert result.datetime == start_date
assert result.value_eur == 0.0
assert result.energy_term == 0.0
assert result.power_term == 0.0
assert result.others_term == 0.0
assert result.surplus_term == 0.0
@pytest.mark.asyncio
async def test_jinja2_formula_evaluation(
self, billing_service, mock_redata_connector, mock_database_service
):
"""Test Jinja2 formula evaluation with predictable values."""
from datetime import datetime
from edata.models.pricing import PricingAggregated, PricingRules
mock_connector, mock_connector_class = mock_redata_connector
cups = "ES0123456789012345AB"
start_date = datetime(2024, 6, 17, 10, 0) # P1 period (Monday 10:00)
end_date = datetime(2024, 6, 17, 11, 0) # 1 hour
# Mock no existing billing data initially
mock_database_service.get_billing.return_value = []
# Mock the pricing config hash generation
mock_database_service.generate_pricing_config_hash.return_value = (
"test_hash_12345678"
)
# Mock predictable consumption data: 1 kWh consumed, 0.5 kWh surplus
mock_consumptions = [
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"value_kwh": 1.0,
"surplus_kwh": 0.5,
},
)()
]
mock_database_service.get_consumptions.return_value = mock_consumptions
# Mock predictable contract data: 5kW P1, 3kW P2
mock_contracts = [
type(
"MockContract",
(),
{
"power_p1": 5.0,
"power_p2": 3.0,
"date_start": datetime(2024, 6, 17, 0, 0),
"date_end": datetime(2024, 6, 18, 0, 0),
},
)()
]
mock_database_service.get_contracts.return_value = mock_contracts
# Mock predictable PVPC prices: 0.10 €/kWh
mock_pvpc_prices = [
type(
"MockPVPCPrice",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"value_eur_kwh": 0.10,
"delta_h": 1.0,
},
)()
]
mock_database_service.get_pvpc_prices.return_value = mock_pvpc_prices
# Mock the save_billing method to return a success response
mock_database_service.save_billing.return_value = type("MockBilling", (), {})()
# Mock billing result after calculation (with predictable values)
# Energy term: 1.05 * 1.21 * 0.10 * 1.0 = 0.12705
expected_energy_term = 1.05 * 1.21 * 0.10 * 1.0
# Power term: 1.05 * 1.21 * (5 * (40 + 4) + 3 * 20) / 365 / 24
expected_power_term = 1.05 * 1.21 * (5 * (40 + 4) + 3 * 20) / 365 / 24
# Others term: 1.21 * 3.0 / 30 / 24
expected_others_term = 1.21 * 3.0 / 30 / 24
# Surplus term: 1.05 * 1.21 * 0.5 * 0.06
expected_surplus_term = 1.05 * 1.21 * 0.5 * 0.06
# Total: energy + power + others - surplus
expected_total = (
expected_energy_term
+ expected_power_term
+ expected_others_term
- expected_surplus_term
)
mock_billing_result = [
type(
"MockBilling",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"total_eur": expected_total,
"energy_term": expected_energy_term,
"power_term": expected_power_term,
"others_term": expected_others_term,
"surplus_term": expected_surplus_term,
},
)()
]
# Configure get_billing to return empty first, then billing results after update_missing_costs
mock_database_service.get_billing.side_effect = [
[],
mock_billing_result,
mock_billing_result,
]
# Create PVPC pricing rules with simplified formulas for testing
test_pricing_rules = PricingRules(
p1_kw_year_eur=40.0, # €40/kW/year
p2_kw_year_eur=20.0, # €20/kW/year
p1_kwh_eur=None, # Use PVPC (0.10 €/kWh)
p2_kwh_eur=None,
p3_kwh_eur=None,
surplus_p1_kwh_eur=0.06, # €0.06/kWh surplus in P1
surplus_p2_kwh_eur=0.04, # €0.04/kWh surplus in P2
surplus_p3_kwh_eur=0.02, # €0.02/kWh surplus in P3
meter_month_eur=3.0, # €3/month meter
market_kw_year_eur=4.0, # €4/kW/year market
electricity_tax=1.05, # 5% electricity tax
iva_tax=1.21, # 21% IVA
# Simplified formulas for predictable calculation
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term - surplus_term",
)
# Execute cost calculation
result = await billing_service.get_cost(
cups=cups,
pricing_rules=test_pricing_rules,
start_date=start_date,
end_date=end_date,
)
# Verify the result matches our mocked billing data
assert isinstance(result, PricingAggregated)
assert result.datetime == start_date
assert result.delta_h == 1 # 1 billing record
# Verify that the aggregated values match our expected calculations
assert round(result.energy_term, 4) == round(expected_energy_term, 4)
assert round(result.power_term, 4) == round(expected_power_term, 4)
assert round(result.others_term, 4) == round(expected_others_term, 4)
assert round(result.surplus_term, 4) == round(expected_surplus_term, 4)
assert round(result.value_eur, 4) == round(expected_total, 4)
assert round(result.value_eur, 5) == round(expected_total, 5)
@pytest.mark.asyncio
async def test_update_missing_costs(
self, billing_service, mock_redata_connector, mock_database_service
):
"""Test update_missing_costs method."""
from datetime import datetime
from edata.models.pricing import PricingRules
mock_connector, mock_connector_class = mock_redata_connector
cups = "ES0123456789012345AB"
start_date = datetime(2024, 6, 17, 10, 0)
end_date = datetime(2024, 6, 17, 12, 0)
# Mock consumption data
mock_consumptions = [
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 10, 0),
"value_kwh": 0.5,
"surplus_kwh": 0.0,
},
)(),
type(
"MockConsumption",
(),
{
"datetime": datetime(2024, 6, 17, 11, 0),
"value_kwh": 0.7,
"surplus_kwh": 0.1,
},
)(),
]
mock_database_service.get_consumptions.return_value = mock_consumptions
# Mock contract data
mock_contracts = [
type(
"MockContract",
(),
{
"power_p1": 4.0,
"power_p2": 4.0,
"date_start": datetime(2024, 6, 17, 0, 0),
"date_end": datetime(2024, 6, 18, 0, 0),
},
)()
]
mock_database_service.get_contracts.return_value = mock_contracts
# Mock no existing billing records
mock_database_service.get_billing.return_value = []
# Mock successful billing save
mock_billing_record = type("MockBilling", (), {"id": 1})()
mock_database_service.save_billing.return_value = mock_billing_record
# Mock hash generation
mock_database_service.generate_pricing_config_hash.return_value = (
"test_hash_123"
)
# Create custom pricing rules (no PVPC)
custom_pricing_rules = PricingRules(
p1_kw_year_eur=30.0,
p2_kw_year_eur=20.0,
p1_kwh_eur=0.15, # Custom prices - no PVPC
p2_kwh_eur=0.12,
p3_kwh_eur=0.10,
surplus_p1_kwh_eur=0.06,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.02,
meter_month_eur=3.0,
market_kw_year_eur=4.0,
electricity_tax=1.05,
iva_tax=1.21,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term - surplus_term",
)
# Execute update_missing_costs
result = await billing_service.update_missing_costs(
cups=cups,
pricing_rules=custom_pricing_rules,
start_date=start_date,
end_date=end_date,
)
# Verify successful result
assert result["success"] is True
assert result["cups"] == cups
assert result["pricing_config_hash"] == "test_hash_123"
# Verify statistics
stats = result["stats"]
assert stats["total_consumptions"] == 2
assert stats["processed"] > 0 # Should have processed some records
# Verify database methods were called
mock_database_service.get_consumptions.assert_called_once_with(
cups, start_date, end_date
)
mock_database_service.get_contracts.assert_called_once_with(cups)
mock_database_service.get_billing.assert_called_once()
mock_database_service.generate_pricing_config_hash.assert_called_once()
# Verify save_billing was called (at least once)
assert mock_database_service.save_billing.call_count > 0
@pytest.mark.asyncio
async def test_get_daily_costs_with_existing_data(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test get_daily_costs with existing billing data."""
from edata.models.database import BillingModel
# Create mock billing records for 2 days
base_date = datetime(2024, 1, 1, 0, 0, 0)
mock_billing_records = []
# Create 48 hours of billing data (2 days)
for hour in range(48):
record = BillingModel(
datetime=base_date + timedelta(hours=hour),
cups="ES0012345678901234567890AB",
pricing_config_hash="test_hash",
total_eur=1.5 + (hour * 0.1), # Varying costs
energy_term=1.0 + (hour * 0.05),
power_term=0.3,
others_term=0.1,
surplus_term=0.1 + (hour * 0.05),
)
mock_billing_records.append(record)
# Setup mocks
mock_database_service.generate_pricing_config_hash.return_value = "test_hash"
mock_database_service.get_billing.return_value = mock_billing_records
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 2, 23, 59, 59)
# Call method
result = await billing_service.get_daily_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
# Assertions
assert len(result) == 2 # Two days
from edata.models.pricing import PricingAggregated
assert all(isinstance(item, PricingAggregated) for item in result)
# Check first day
first_day = result[0]
assert first_day.datetime.date() == datetime(2024, 1, 1).date()
assert first_day.delta_h == 24 # 24 hours
assert first_day.value_eur > 0
# Check second day
second_day = result[1]
assert second_day.datetime.date() == datetime(2024, 1, 2).date()
assert second_day.delta_h == 24 # 24 hours
assert (
second_day.value_eur > first_day.value_eur
) # Should be higher due to increasing pattern
@pytest.mark.asyncio
async def test_get_daily_costs_without_existing_data(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test get_daily_costs when no billing data exists."""
# Setup mocks - no existing data
mock_database_service.generate_pricing_config_hash.return_value = "test_hash"
mock_database_service.get_billing.side_effect = [
[],
[],
] # First call empty, second still empty
# Mock update_missing_costs to fail
with patch.object(billing_service, "update_missing_costs") as mock_update:
mock_update.return_value = {"success": False, "error": "Test error"}
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 1, 23, 59, 59)
# Call method
result = await billing_service.get_daily_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
# Assertions
assert result == [] # Should return empty list when update fails
mock_update.assert_called_once()
@pytest.mark.asyncio
async def test_get_monthly_costs_with_existing_data(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test get_monthly_costs with existing billing data."""
from edata.models.database import BillingModel
# Create mock billing records for 2 days in same month
base_date = datetime(2024, 1, 1, 0, 0, 0)
mock_billing_records = []
# Create 48 hours of billing data (2 days)
for hour in range(48):
record = BillingModel(
datetime=base_date + timedelta(hours=hour),
cups="ES0012345678901234567890AB",
pricing_config_hash="test_hash",
total_eur=1.5 + (hour * 0.1), # Varying costs
energy_term=1.0 + (hour * 0.05),
power_term=0.3,
others_term=0.1,
surplus_term=0.1 + (hour * 0.05),
)
mock_billing_records.append(record)
# Setup mocks
mock_database_service.generate_pricing_config_hash.return_value = "test_hash"
mock_database_service.get_billing.return_value = mock_billing_records
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 31, 23, 59, 59)
# Call method
result = await billing_service.get_monthly_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
# Assertions
assert len(result) == 1 # One month
from edata.models.pricing import PricingAggregated
assert all(isinstance(item, PricingAggregated) for item in result)
# Check month
month_data = result[0]
assert month_data.datetime.date() == datetime(2024, 1, 1).date()
assert month_data.delta_h == 48 # 48 hours from mock data
assert month_data.value_eur > 0
@pytest.mark.asyncio
async def test_get_monthly_costs_multiple_months(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test get_monthly_costs with data spanning multiple months."""
from edata.models.database import BillingModel
# Create billing records spanning two months
records = []
# January data (24 hours)
for hour in range(24):
record = BillingModel(
datetime=datetime(2024, 1, 15) + timedelta(hours=hour),
cups="ES0012345678901234567890AB",
pricing_config_hash="test_hash",
total_eur=1.0,
energy_term=0.7,
power_term=0.2,
others_term=0.1,
surplus_term=0.0,
)
records.append(record)
# February data (24 hours)
for hour in range(24):
record = BillingModel(
datetime=datetime(2024, 2, 15) + timedelta(hours=hour),
cups="ES0012345678901234567890AB",
pricing_config_hash="test_hash",
total_eur=1.2,
energy_term=0.8,
power_term=0.3,
others_term=0.1,
surplus_term=0.0,
)
records.append(record)
# Setup mocks
mock_database_service.generate_pricing_config_hash.return_value = "test_hash"
mock_database_service.get_billing.return_value = records
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 2, 28, 23, 59, 59)
# Call method
result = await billing_service.get_monthly_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
# Assertions
assert len(result) == 2 # Two months
# Check January
jan_data = result[0]
assert jan_data.datetime.date() == datetime(2024, 1, 1).date()
assert jan_data.delta_h == 24
assert jan_data.value_eur == 24.0 # 24 hours * 1.0 EUR
# Check February
feb_data = result[1]
assert feb_data.datetime.date() == datetime(2024, 2, 1).date()
assert feb_data.delta_h == 24
assert feb_data.value_eur == 28.8 # 24 hours * 1.2 EUR
@pytest.mark.asyncio
async def test_get_daily_costs_error_handling(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test error handling in get_daily_costs."""
# Setup mocks to raise exception
mock_database_service.generate_pricing_config_hash.side_effect = Exception(
"Database error"
)
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 1, 23, 59, 59)
# Call method and expect exception
with pytest.raises(Exception, match="Database error"):
await billing_service.get_daily_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
@pytest.mark.asyncio
async def test_get_monthly_costs_error_handling(
self, billing_service, mock_database_service, sample_pricing_rules_custom
):
"""Test error handling in get_monthly_costs."""
# Setup mocks to raise exception
mock_database_service.generate_pricing_config_hash.side_effect = Exception(
"Database error"
)
# Test parameters
cups = "ES0012345678901234567890AB"
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 31, 23, 59, 59)
# Call method and expect exception
with pytest.raises(Exception, match="Database error"):
await billing_service.get_monthly_costs(
cups, sample_pricing_rules_custom, start_date, end_date
)
"""Tests for ConsumptionService."""
import shutil
import tempfile
from datetime import date, datetime, timedelta
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from edata.connectors.datadis import DatadisConnector
from edata.models.consumption import Consumption, ConsumptionAggregated
from edata.services.consumption import ConsumptionService
class TestConsumptionService:
"""Test suite for ConsumptionService."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for tests."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
@pytest.fixture
def mock_datadis_connector(self):
"""Mock DatadisConnector for testing."""
with patch(
"edata.services.consumption.DatadisConnector"
) as mock_connector_class:
mock_connector = Mock(spec=DatadisConnector)
mock_connector_class.return_value = mock_connector
yield mock_connector, mock_connector_class
@pytest.fixture
def mock_database_service(self):
"""Mock DatabaseService for testing."""
with patch("edata.services.consumption.get_database_service") as mock_get_db:
mock_db = Mock()
# Hacer que los métodos async retornen AsyncMock
mock_db.get_consumptions = AsyncMock(return_value=[])
mock_db.save_consumption = AsyncMock(return_value=Mock())
mock_db.get_latest_consumption = AsyncMock(return_value=None)
mock_get_db.return_value = mock_db
yield mock_db
@pytest_asyncio.fixture
async def consumption_service(
self, temp_dir, mock_datadis_connector, mock_database_service
):
"""Create a ConsumptionService instance for testing."""
mock_connector, mock_connector_class = mock_datadis_connector
return ConsumptionService(
datadis_connector=mock_connector,
storage_dir=temp_dir,
)
@pytest.fixture
def sample_consumptions(self):
"""Sample consumption data for testing."""
return [
Consumption(
datetime=datetime(2024, 6, 15, 10, 0),
delta_h=1.0,
value_kwh=0.5,
surplus_kwh=0.0,
real=True,
),
Consumption(
datetime=datetime(2024, 6, 15, 11, 0),
delta_h=1.0,
value_kwh=0.7,
surplus_kwh=0.0,
real=True,
),
Consumption(
datetime=datetime(2024, 6, 15, 12, 0),
delta_h=1.0,
value_kwh=0.6,
surplus_kwh=0.0,
real=True,
),
]
@pytest.mark.asyncio
async def test_initialization(
self, temp_dir, mock_datadis_connector, mock_database_service
):
"""Test ConsumptionService initialization."""
mock_connector, mock_connector_class = mock_datadis_connector
service = ConsumptionService(
datadis_connector=mock_connector,
storage_dir=temp_dir,
)
# Verify service stores the connector and storage directory
assert service._datadis == mock_connector
assert service._storage_dir == temp_dir
# Verify database service is obtained lazily by calling _get_db_service
db_service = await service._get_db_service()
assert db_service is mock_database_service
@pytest.mark.asyncio
async def test_update_consumptions_success(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test successful consumption update."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
# Mock database service responses - no existing consumptions
mock_database_service.get_consumptions.return_value = []
# Execute update
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify datadis connector was called correctly
mock_connector.get_consumption_data.assert_called_once_with(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
measurement_type="0",
point_type=5,
authorized_nif=None,
)
# Verify database service was called for each consumption
assert mock_database_service.save_consumption.call_count == len(
sample_consumptions
)
# Verify result structure
assert result["success"] is True
assert result["cups"] == cups
assert result["period"]["start"] == start_date.isoformat()
assert result["period"]["end"] == end_date.isoformat()
assert result["stats"]["fetched"] == len(sample_consumptions)
assert result["stats"]["saved"] == len(sample_consumptions)
assert result["stats"]["updated"] == 0
@pytest.mark.asyncio
async def test_update_consumptions_with_existing_data(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption update with some existing data."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
# Mock get_latest_consumption to return an existing consumption before the start date
mock_latest = Mock()
mock_latest.datetime = datetime(2024, 6, 14, 23, 0) # Day before start_date
mock_database_service.get_latest_consumption.return_value = mock_latest
# Mock database service responses - first consumption exists, others don't
def mock_get_consumptions(cups, start_date, end_date):
if start_date == sample_consumptions[0].datetime:
return [Mock()] # Existing consumption
return [] # No existing consumption
mock_database_service.get_consumptions.side_effect = mock_get_consumptions
# Execute update
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify result
assert result["success"] is True
assert result["stats"]["fetched"] == len(sample_consumptions)
assert result["stats"]["saved"] == 2 # Two new consumptions
assert result["stats"]["updated"] == 1 # One updated consumption
@pytest.mark.asyncio
async def test_update_consumptions_with_optional_parameters(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption update with optional parameters."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
measurement_type = "1"
point_type = 3
authorized_nif = "12345678A"
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
mock_database_service.get_consumptions.return_value = []
# Execute update with optional parameters
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
measurement_type=measurement_type,
point_type=point_type,
authorized_nif=authorized_nif,
)
# Verify datadis connector was called with optional parameters
mock_connector.get_consumption_data.assert_called_once_with(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
measurement_type=measurement_type,
point_type=point_type,
authorized_nif=authorized_nif,
)
assert result["success"] is True
@pytest.mark.asyncio
async def test_update_consumptions_error_handling(
self, consumption_service, mock_datadis_connector, mock_database_service
):
"""Test consumption update error handling."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector to raise an exception
error_message = "API connection failed"
mock_connector.get_consumption_data.side_effect = Exception(error_message)
# Mock database service to return None for get_latest_consumption (no existing data)
mock_database_service.get_latest_consumption.return_value = None
# Execute update
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify error result
assert result["success"] is False
assert result["cups"] == cups
assert result["error"] == error_message
assert result["period"]["start"] == start_date.isoformat()
assert result["period"]["end"] == end_date.isoformat()
# Verify database service was not called
mock_database_service.save_consumption.assert_not_called()
@pytest.mark.asyncio
async def test_update_consumptions_with_force_full_update(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption update with force_full_update=True ignores existing data."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector response
mock_connector.get_consumption_data.return_value = sample_consumptions
# Mock get_latest_consumption to return existing data
mock_latest = Mock()
mock_latest.datetime = datetime(
2024, 6, 15, 12, 0
) # Within the requested range
mock_database_service.get_latest_consumption.return_value = mock_latest
# Mock database service responses - no existing consumptions
mock_database_service.get_consumptions.return_value = []
# Execute update with force_full_update=True
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
force_full_update=True,
)
# Verify datadis connector was called with original start_date (not optimized)
mock_connector.get_consumption_data.assert_called_once_with(
cups=cups,
distributor_code=distributor_code,
start_date=start_date, # Should use original start_date, not optimized
end_date=end_date,
measurement_type="0",
point_type=5,
authorized_nif=None,
)
# Verify result
assert result["success"] is True
assert result["stats"]["fetched"] == len(sample_consumptions)
@pytest.mark.asyncio
async def test_update_consumptions_incremental_optimization(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test that consumption update optimizes by starting from last consumption date."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector response
mock_connector.get_consumption_data.return_value = sample_consumptions
# Mock get_latest_consumption to return existing data
mock_latest = Mock()
mock_latest.datetime = datetime(2024, 6, 15, 8, 0) # 8 AM on same day
mock_database_service.get_latest_consumption.return_value = mock_latest
# Mock database service responses - no existing consumptions for the new range
mock_database_service.get_consumptions.return_value = []
# Execute update
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify datadis connector was called with optimized start_date (9 AM)
expected_optimized_start = datetime(2024, 6, 15, 9, 0) # last + 1 hour
mock_connector.get_consumption_data.assert_called_once_with(
cups=cups,
distributor_code=distributor_code,
start_date=expected_optimized_start, # Should be optimized
end_date=end_date,
measurement_type="0",
point_type=5,
authorized_nif=None,
)
# Verify result includes message about optimization
assert result["success"] is True
assert "message" in result
assert "missing data" in result["message"]
@pytest.mark.asyncio
async def test_update_consumptions_up_to_date(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
):
"""Test consumption update when data is already up to date."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock get_latest_consumption to return data beyond end_date
mock_latest = Mock()
mock_latest.datetime = datetime(2024, 6, 16, 1, 0) # After end_date
mock_database_service.get_latest_consumption.return_value = mock_latest
# Execute update
result = await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify datadis connector was NOT called (data is up to date)
mock_connector.get_consumption_data.assert_not_called()
# Verify result indicates no new data needed
assert result["success"] is True
assert result["stats"]["fetched"] == 0
assert result["stats"]["skipped"] == "up_to_date"
assert "up to date" in result["message"]
@pytest.mark.asyncio
async def test_update_consumption_range_by_months_single_month(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption range update for a single month."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 1, 0, 0)
end_date = datetime(2024, 6, 30, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
mock_database_service.get_consumptions.return_value = []
# Execute range update
result = await consumption_service.update_consumption_range_by_months(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify result structure
assert result["success"] is True
assert result["cups"] == cups
assert result["months_processed"] == 1
assert result["total_stats"]["consumptions_fetched"] == len(sample_consumptions)
assert result["total_stats"]["consumptions_saved"] == len(sample_consumptions)
assert result["total_stats"]["consumptions_updated"] == 0
assert len(result["monthly_results"]) == 1
# Verify monthly result
monthly_result = result["monthly_results"][0]
assert monthly_result["month"] == "2024-06"
assert monthly_result["consumption"]["success"] is True
@pytest.mark.asyncio
async def test_update_consumption_range_by_months_multiple_months(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption range update for multiple months."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 5, 15, 0, 0)
end_date = datetime(2024, 7, 15, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
mock_database_service.get_consumptions.return_value = []
# Execute range update
result = await consumption_service.update_consumption_range_by_months(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Should process 3 months: May (partial), June (full), July (partial)
assert result["months_processed"] == 3
assert len(result["monthly_results"]) == 3
# Verify month identifiers
months = [r["month"] for r in result["monthly_results"]]
assert "2024-05" in months
assert "2024-06" in months
assert "2024-07" in months
# Verify total stats
expected_total_fetched = len(sample_consumptions) * 3
assert result["total_stats"]["consumptions_fetched"] == expected_total_fetched
@pytest.mark.asyncio
async def test_update_consumption_range_by_months_with_errors(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption range update with some months failing."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 1, 0, 0)
end_date = datetime(2024, 8, 31, 23, 59)
# Mock datadis connector to fail on second call
call_count = 0
def mock_get_consumption_data(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 2: # Second month fails
raise Exception("API rate limit exceeded")
return sample_consumptions
mock_connector.get_consumption_data.side_effect = mock_get_consumption_data
mock_database_service.get_consumptions.return_value = []
# Execute range update
result = await consumption_service.update_consumption_range_by_months(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Should process 3 months but with one failure
assert result["months_processed"] == 3
assert result["success"] is False # Overall failure due to one failed month
# Check individual month results
successful_months = [
r for r in result["monthly_results"] if r["consumption"]["success"]
]
failed_months = [
r for r in result["monthly_results"] if not r["consumption"]["success"]
]
assert len(successful_months) == 2
assert len(failed_months) == 1
@pytest.mark.asyncio
async def test_update_consumption_range_year_boundary(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test consumption range update across year boundary."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2023, 12, 1, 0, 0)
end_date = datetime(2024, 2, 28, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
mock_database_service.get_consumptions.return_value = []
# Execute range update
result = await consumption_service.update_consumption_range_by_months(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Should process 3 months: December 2023, January 2024, February 2024
assert result["months_processed"] == 3
# Verify month identifiers
months = [r["month"] for r in result["monthly_results"]]
assert "2023-12" in months
assert "2024-01" in months
assert "2024-02" in months
@pytest.mark.asyncio
async def test_get_stored_consumptions_no_filters(
self, consumption_service, mock_database_service, sample_consumptions
):
"""Test getting stored consumptions without date filters."""
cups = "ES1234567890123456789"
# Mock database service response
mock_database_service.get_consumptions.return_value = sample_consumptions
# Execute get stored consumptions
result = await consumption_service.get_stored_consumptions(cups)
# Verify database service was called correctly
mock_database_service.get_consumptions.assert_called_once_with(cups, None, None)
# Verify result
assert result == sample_consumptions
@pytest.mark.asyncio
async def test_get_stored_consumptions_with_filters(
self, consumption_service, mock_database_service, sample_consumptions
):
"""Test getting stored consumptions with date filters."""
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock database service response
filtered_consumptions = sample_consumptions[:2] # Return first two
mock_database_service.get_consumptions.return_value = filtered_consumptions
# Execute get stored consumptions with filters
result = await consumption_service.get_stored_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Verify database service was called correctly
mock_database_service.get_consumptions.assert_called_once_with(
cups, start_date, end_date
)
# Verify result
assert result == filtered_consumptions
@pytest.mark.asyncio
async def test_initialization_default_parameters(
self, temp_dir, mock_datadis_connector, mock_database_service
):
"""Test ConsumptionService initialization with default parameters."""
mock_connector, mock_connector_class = mock_datadis_connector
service = ConsumptionService(datadis_connector=mock_connector)
# Verify service stores the connector with default storage directory
assert service._datadis == mock_connector
assert service._storage_dir is None
@patch("edata.services.consumption._LOGGER")
@pytest.mark.asyncio
async def test_logging_during_operations(
self,
mock_logger,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_consumptions,
):
"""Test that appropriate logging occurs during operations."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
distributor_code = "123"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock datadis connector response (now returns Pydantic models)
mock_connector.get_consumption_data.return_value = sample_consumptions
mock_database_service.get_consumptions.return_value = []
# Execute update
await consumption_service.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=start_date,
end_date=end_date,
)
# Verify logging calls
assert mock_logger.info.call_count >= 2 # Start and completion logs
# Verify log messages contain expected information
log_calls = [call.args[0] for call in mock_logger.info.call_args_list]
assert any("Updating consumptions" in msg for msg in log_calls)
assert any("Consumption update completed" in msg for msg in log_calls)
@pytest.fixture
def sample_db_consumptions(self):
"""Sample database consumption data for aggregation testing."""
from edata.services.database import ConsumptionModel as DbConsumption
# Use Monday (weekday 0) instead of Saturday for proper tariff testing
base_date = datetime(2024, 6, 17) # Monday, June 17, 2024
# Create 48 hours of hourly data (2 days: Monday and Tuesday)
db_consumptions = []
for hour in range(48):
dt = base_date + timedelta(hours=hour)
# Vary consumption by hour to test tariff periods
if 10 <= dt.hour <= 13 or 18 <= dt.hour <= 21: # P1 hours
kwh = 1.5
elif dt.hour in [8, 9, 14, 15, 16, 17, 22, 23]: # P2 hours
kwh = 1.0
else: # P3 hours
kwh = 0.5
db_cons = Mock(spec=DbConsumption)
db_cons.datetime = dt
db_cons.delta_h = 1.0
db_cons.value_kwh = kwh
db_cons.surplus_kwh = (
0.1 if hour % 10 == 0 else 0.0
) # Some surplus every 10 hours
db_cons.real = True
db_consumptions.append(db_cons)
return db_consumptions
@pytest.mark.asyncio
async def test_get_daily_consumptions(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_db_consumptions,
):
"""Test daily consumption aggregation."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 17, 0, 0) # Monday
end_date = datetime(2024, 6, 18, 23, 59) # Tuesday
# Mock database service to return sample data
mock_database_service.get_consumptions.return_value = sample_db_consumptions
# Execute daily aggregation
daily_consumptions = await consumption_service.get_daily_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Verify database service was called correctly
mock_database_service.get_consumptions.assert_called_once_with(
cups, start_date, end_date
)
# Should have 2 days of data
assert len(daily_consumptions) == 2
# Verify first day aggregation
day1 = daily_consumptions[0]
assert isinstance(day1, ConsumptionAggregated)
assert day1.datetime.date() == date(2024, 6, 17) # Monday
assert day1.delta_h == 24.0 # 24 hours
# Verify total consumption (should be sum of all hourly values)
expected_day1_total = (8 * 1.5) + (8 * 1.0) + (8 * 0.5) # P1 + P2 + P3 hours
assert day1.value_kwh == expected_day1_total
# Verify P1 consumption (hours 10-13, 18-21)
expected_p1 = 8 * 1.5 # 8 P1 hours at 1.5 kWh each
assert day1.value_p1_kwh == expected_p1
# Verify some surplus was recorded
assert day1.surplus_kwh > 0
# Verify second day
day2 = daily_consumptions[1]
assert day2.datetime.date() == date(2024, 6, 18) # Tuesday
assert day2.delta_h == 24.0
@pytest.mark.asyncio
async def test_get_monthly_consumptions(
self,
consumption_service,
mock_datadis_connector,
mock_database_service,
sample_db_consumptions,
):
"""Test monthly consumption aggregation."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 17, 0, 0) # Monday
end_date = datetime(2024, 6, 18, 23, 59) # Tuesday
# Mock database service to return sample data
mock_database_service.get_consumptions.return_value = sample_db_consumptions
# Execute monthly aggregation
monthly_consumptions = await consumption_service.get_monthly_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Verify database service was called correctly
mock_database_service.get_consumptions.assert_called_once_with(
cups, start_date, end_date
)
# Should have 1 month of data (both days in same month)
assert len(monthly_consumptions) == 1
# Verify monthly aggregation
month = monthly_consumptions[0]
assert isinstance(month, ConsumptionAggregated)
assert month.datetime.replace(day=1).date() == date(2024, 6, 1)
assert month.delta_h == 48.0 # 48 hours total
# Verify total consumption (should be sum of both days)
expected_total = 2 * ((8 * 1.5) + (8 * 1.0) + (8 * 0.5))
assert month.value_kwh == expected_total
# Verify P1 consumption
expected_p1 = 2 * (8 * 1.5) # 2 days * 8 P1 hours * 1.5 kWh
assert month.value_p1_kwh == expected_p1
@pytest.mark.asyncio
async def test_get_monthly_consumptions_with_cycle_start_day(
self, consumption_service, mock_datadis_connector, mock_database_service
):
"""Test monthly consumption aggregation with custom cycle start day."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 1, 0, 0)
end_date = datetime(2024, 6, 30, 23, 59)
# Create sample data spanning across billing cycle boundary
from edata.services.database import ConsumptionModel as DbConsumption
db_consumptions = []
# Data on June 14th (before cycle start)
dt1 = datetime(2024, 6, 14, 12, 0)
db_cons1 = Mock(spec=DbConsumption)
db_cons1.datetime = dt1
db_cons1.delta_h = 1.0
db_cons1.value_kwh = 2.0
db_cons1.surplus_kwh = 0.0
db_cons1.real = True
db_consumptions.append(db_cons1)
# Data on June 16th (after cycle start)
dt2 = datetime(2024, 6, 16, 12, 0)
db_cons2 = Mock(spec=DbConsumption)
db_cons2.datetime = dt2
db_cons2.delta_h = 1.0
db_cons2.value_kwh = 3.0
db_cons2.surplus_kwh = 0.0
db_cons2.real = True
db_consumptions.append(db_cons2)
mock_database_service.get_consumptions.return_value = db_consumptions
# Execute with cycle start day = 15
monthly_consumptions = await consumption_service.get_monthly_consumptions(
cups=cups, start_date=start_date, end_date=end_date, cycle_start_day=15
)
# Should have 2 months (May billing period and June billing period)
assert len(monthly_consumptions) == 2
# Verify the months
months = sorted([m.datetime for m in monthly_consumptions])
assert months[0].month == 5 # May billing period (for June 14th data)
assert months[1].month == 6 # June billing period (for June 16th data)
@pytest.mark.asyncio
async def test_get_daily_consumptions_empty_data(
self, consumption_service, mock_datadis_connector, mock_database_service
):
"""Test daily consumption aggregation with no data."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 17, 0, 0) # Monday
end_date = datetime(2024, 6, 17, 23, 59) # Monday
# Mock database service to return empty data
mock_database_service.get_consumptions.return_value = []
# Execute daily aggregation
daily_consumptions = await consumption_service.get_daily_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Should return empty list
assert len(daily_consumptions) == 0
@pytest.mark.asyncio
async def test_get_monthly_consumptions_empty_data(
self, consumption_service, mock_datadis_connector, mock_database_service
):
"""Test monthly consumption aggregation with no data."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 15, 0, 0)
end_date = datetime(2024, 6, 15, 23, 59)
# Mock database service to return empty data
mock_database_service.get_consumptions.return_value = []
# Execute monthly aggregation
monthly_consumptions = await consumption_service.get_monthly_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Should return empty list
assert len(monthly_consumptions) == 0
@pytest.mark.asyncio
@patch("edata.services.consumption.get_pvpc_tariff")
async def test_tariff_calculation_in_aggregations(
self,
mock_get_pvpc_tariff,
consumption_service,
mock_datadis_connector,
mock_database_service,
):
"""Test that tariff calculation is used correctly in aggregations."""
mock_connector, mock_connector_class = mock_datadis_connector
cups = "ES1234567890123456789"
start_date = datetime(2024, 6, 17, 0, 0) # Monday
end_date = datetime(2024, 6, 17, 23, 59) # Monday
# Create single consumption data
from edata.services.database import ConsumptionModel as DbConsumption
db_cons = Mock(spec=DbConsumption)
db_cons.datetime = datetime(2024, 6, 17, 12, 0) # Monday noon
db_cons.delta_h = 1.0
db_cons.value_kwh = 2.0
db_cons.surplus_kwh = 0.1
db_cons.real = True
mock_database_service.get_consumptions.return_value = [db_cons]
# Mock tariff calculation to return P2
mock_get_pvpc_tariff.return_value = "p2"
# Execute daily aggregation with await
daily_consumptions = await consumption_service.get_daily_consumptions(
cups=cups, start_date=start_date, end_date=end_date
)
# Verify tariff function was called
mock_get_pvpc_tariff.assert_called_with(datetime(2024, 6, 17, 12, 0))
# Verify P2 values were set correctly
assert len(daily_consumptions) == 1
day = daily_consumptions[0]
assert day.value_p2_kwh == 2.0
assert day.surplus_p2_kwh == 0.1
assert day.value_p1_kwh == 0.0
assert day.value_p3_kwh == 0.0
"""Tests for ContractService."""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from edata.models.database import ContractModel
from edata.services.contract import ContractService
@pytest_asyncio.fixture
async def contract_service():
"""Create a contract service with mocked dependencies."""
with patch("edata.services.contract.get_database_service") as mock_db_factory:
mock_db = Mock()
# Hacer que los métodos de la base de datos retornen AsyncMock
mock_db.get_contracts = AsyncMock(return_value=[])
mock_db.save_contract = AsyncMock(return_value=Mock())
mock_db_factory.return_value = mock_db
# Create a mock DatadisConnector
mock_datadis_connector = Mock()
service = ContractService(datadis_connector=mock_datadis_connector)
service._db_service = mock_db
return service
@pytest.fixture
def sample_contracts():
"""Sample contract data for testing."""
return [
ContractModel(
cups="ES0012345678901234567890AB",
date_start=datetime(2023, 1, 1),
date_end=datetime(2023, 12, 31),
marketer="Test Marketer 1",
distributor_code="123",
power_p1=4.6,
power_p2=4.6,
),
ContractModel(
cups="ES0012345678901234567890AB",
date_start=datetime(2024, 1, 1),
date_end=datetime(2024, 12, 31),
marketer="Test Marketer 2",
distributor_code="123",
power_p1=5.0,
power_p2=5.0,
),
]
class TestContractService:
"""Test class for ContractService."""
@pytest.mark.asyncio
async def test_update_contracts_success(self, contract_service, sample_contracts):
"""Test successful contract update."""
# Setup mocks - now returns Pydantic models instead of dicts
contract_service._datadis.get_contract_detail = AsyncMock(
return_value=[
ContractModel(
cups="ES0012345678901234567890AB",
date_start=datetime(2024, 1, 1),
date_end=datetime(2024, 12, 31),
marketer="Test Marketer",
distributor_code="123",
power_p1=5.0,
power_p2=5.0,
)
]
)
contract_service._db_service.get_contracts.return_value = []
contract_service._db_service.save_contract.return_value = sample_contracts[0]
# Execute
result = await contract_service.update_contracts(
cups="ES0012345678901234567890AB", distributor_code="123"
)
# Verify
assert result["success"] is True
assert result["stats"]["fetched"] == 1
assert result["stats"]["saved"] == 1
assert result["stats"]["updated"] == 0
contract_service._datadis.get_contract_detail.assert_called_once()
contract_service._db_service.save_contract.assert_called_once()
@pytest.mark.asyncio
async def test_get_contracts(self, contract_service, sample_contracts):
"""Test getting contracts."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = sample_contracts
# Execute
result = await contract_service.get_contracts("ES0012345678901234567890AB")
# Verify
assert len(result) == 2
assert result[0].power_p1 == 4.6
assert result[1].power_p1 == 5.0
contract_service._db_service.get_contracts.assert_called_once_with(
cups="ES0012345678901234567890AB"
)
@pytest.mark.asyncio
async def test_get_active_contract(self, contract_service, sample_contracts):
"""Test getting active contract."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = sample_contracts
# Execute - test with date in 2024
result = await contract_service.get_active_contract(
"ES0012345678901234567890AB", datetime(2024, 6, 15)
)
# Verify
assert result is not None
assert result.power_p1 == 5.0 # Should return 2024 contract
assert result.date_start.year == 2024
@pytest.mark.asyncio
async def test_get_most_recent_contract(self, contract_service, sample_contracts):
"""Test getting most recent contract."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = sample_contracts
# Execute - use the correct method name
result = await contract_service.get_latest_contract(
"ES0012345678901234567890AB"
)
# Verify
assert result is not None
assert result.power_p1 == 5.0 # Should return 2024 contract (most recent)
assert result.date_start.year == 2024
@pytest.mark.asyncio
async def test_get_contract_stats(self, contract_service, sample_contracts):
"""Test getting contract statistics."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = sample_contracts
# Execute
result = await contract_service.get_contract_stats("ES0012345678901234567890AB")
# Verify
assert result["total_contracts"] == 2
assert result["power_ranges"]["p1_kw"]["min"] == 4.6
assert result["power_ranges"]["p1_kw"]["max"] == 5.0
assert result["power_ranges"]["p2_kw"]["min"] == 4.6
assert result["power_ranges"]["p2_kw"]["max"] == 5.0
assert result["date_range"]["earliest_start"] == datetime(2023, 1, 1)
assert result["date_range"]["latest_end"] == datetime(2024, 12, 31)
@pytest.mark.asyncio
async def test_update_contracts_no_data(self, contract_service):
"""Test contract update with no data returned."""
# Setup mocks
contract_service._datadis.get_contract_detail = AsyncMock(return_value=[])
# Execute
result = await contract_service.update_contracts(
cups="ES0012345678901234567890AB", distributor_code="123"
)
# Verify
assert result["success"] is True
assert result["stats"]["fetched"] == 0
assert result["stats"]["saved"] == 0
@pytest.mark.asyncio
async def test_update_contracts_error(self, contract_service):
"""Test contract update with error."""
# Setup mocks
contract_service._datadis.get_contract_detail = AsyncMock(
side_effect=Exception("API Error")
)
# Execute
result = await contract_service.update_contracts(
cups="ES0012345678901234567890AB", distributor_code="123"
)
# Verify
assert result["success"] is False
assert "error" in result
assert result["error"] == "API Error"
@pytest.mark.asyncio
async def test_get_contract_summary(self, contract_service, sample_contracts):
"""Test getting contract summary."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = sample_contracts
# Execute
result = await contract_service.get_contract_summary("ES001234567890123456AB")
# Verify
assert result["contract_p1_kW"] == 5.0 # From the most recent contract (2024)
assert result["contract_p2_kW"] == 5.0
@pytest.mark.asyncio
async def test_get_contract_summary_no_data(self, contract_service):
"""Test getting contract summary with no data."""
# Setup mocks
contract_service._db_service.get_contracts.return_value = []
# Execute
result = await contract_service.get_contract_summary("ES001234567890123456AB")
# Verify
assert result["contract_p1_kW"] is None
assert result["contract_p2_kW"] is None
"""Tests for DatabaseService."""
import os
import shutil
import tempfile
from datetime import datetime
from unittest.mock import patch
import pytest
import pytest_asyncio
from edata.models.database import (
ConsumptionModel,
ContractModel,
MaxPowerModel,
SupplyModel,
)
from edata.services.database import get_database_service
class TestDatabaseService:
"""Test suite for DatabaseService."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for tests."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
@pytest_asyncio.fixture
async def db_service(self, temp_dir):
"""Create a database service for testing."""
# Create a new instance directly instead of using the global singleton
from edata.services.database import DatabaseService
db_service = DatabaseService(temp_dir)
await db_service.create_tables()
yield db_service
@pytest.fixture
def sample_supply_data(self):
"""Sample supply data for testing."""
return {
"cups": "ES1234567890123456789",
"date_start": datetime(2024, 1, 1),
"date_end": datetime(2024, 12, 31),
"address": "Test Address 123",
"postal_code": "12345",
"province": "Test Province",
"municipality": "Test Municipality",
"distributor": "Test Distributor",
"point_type": 5,
"distributor_code": "123",
}
@pytest.fixture
def sample_contract_data(self):
"""Sample contract data for testing."""
return {
"cups": "ES1234567890123456789",
"date_start": datetime(2024, 1, 1),
"date_end": datetime(2024, 12, 31),
"marketer": "Test Marketer",
"distributor_code": "123",
"power_p1": 4.4,
"power_p2": 4.4,
}
@pytest.fixture
def sample_consumption_data(self):
"""Sample consumption data for testing."""
return {
"cups": "ES1234567890123456789",
"datetime": datetime(2024, 6, 15, 12, 0),
"delta_h": 1.0,
"value_kwh": 0.5,
"surplus_kwh": 0.0,
"real": True,
}
@pytest.fixture
def sample_maxpower_data(self):
"""Sample maxpower data for testing."""
return {
"cups": "ES1234567890123456789",
"datetime": datetime(2024, 6, 15, 15, 30),
"value_kw": 3.2,
}
@pytest.mark.asyncio
async def test_database_initialization(self, temp_dir):
"""Test database service initialization."""
service = await get_database_service(storage_dir=temp_dir)
# Check that database file was created
expected_db_path = os.path.join(temp_dir, "edata.db")
assert os.path.exists(expected_db_path)
# Check that we can get a session
session = service.get_session()
assert session is not None
await session.close()
@pytest.mark.asyncio
async def test_save_and_get_supply(self, db_service, sample_supply_data):
"""Test saving and retrieving supply data."""
# Save supply
saved_supply = await db_service.save_supply(sample_supply_data)
assert saved_supply.cups == sample_supply_data["cups"]
assert saved_supply.distributor == sample_supply_data["distributor"]
assert saved_supply.point_type == sample_supply_data["point_type"]
assert saved_supply.created_at is not None
assert saved_supply.updated_at is not None
# Get supply
retrieved_supply = await db_service.get_supply(sample_supply_data["cups"])
assert retrieved_supply is not None
assert retrieved_supply.cups == sample_supply_data["cups"]
assert retrieved_supply.distributor == sample_supply_data["distributor"]
@pytest.mark.asyncio
async def test_update_existing_supply(self, db_service, sample_supply_data):
"""Test updating an existing supply."""
# Save initial supply
await db_service.save_supply(sample_supply_data)
# Update supply data
updated_data = sample_supply_data.copy()
updated_data["distributor"] = "Updated Distributor"
# Save updated supply
updated_supply = await db_service.save_supply(updated_data)
assert updated_supply.distributor == "Updated Distributor"
assert updated_supply.cups == sample_supply_data["cups"]
# Verify only one supply exists
retrieved_supply = await db_service.get_supply(sample_supply_data["cups"])
assert retrieved_supply.distributor == "Updated Distributor"
@pytest.mark.asyncio
async def test_save_and_get_contract(
self, db_service, sample_supply_data, sample_contract_data
):
"""Test saving and retrieving contract data."""
# Save supply first (foreign key dependency)
await db_service.save_supply(sample_supply_data)
# Save contract
saved_contract = await db_service.save_contract(sample_contract_data)
assert saved_contract.cups == sample_contract_data["cups"]
assert saved_contract.marketer == sample_contract_data["marketer"]
assert saved_contract.power_p1 == sample_contract_data["power_p1"]
assert saved_contract.id is not None
# Get contracts
contracts = await db_service.get_contracts(sample_contract_data["cups"])
assert len(contracts) == 1
assert contracts[0].marketer == sample_contract_data["marketer"]
@pytest.mark.asyncio
async def test_contract_unique_constraint(
self, db_service, sample_supply_data, sample_contract_data
):
"""Test that contract unique constraint works (cups + date_start)."""
# Save supply first
await db_service.save_supply(sample_supply_data)
# Save first contract
await db_service.save_contract(sample_contract_data)
# Try to save contract with same cups + date_start but different data
updated_contract_data = sample_contract_data.copy()
updated_contract_data["marketer"] = "Different Marketer"
updated_contract_data["power_p1"] = 6.6
# This should update, not create new
await db_service.save_contract(updated_contract_data)
# Should still have only one contract, but with updated data
contracts = await db_service.get_contracts(sample_contract_data["cups"])
assert len(contracts) == 1
assert contracts[0].marketer == "Different Marketer"
assert contracts[0].power_p1 == 6.6
@pytest.mark.asyncio
async def test_save_and_get_consumption(
self, db_service, sample_supply_data, sample_consumption_data
):
"""Test saving and retrieving consumption data."""
# Save supply first
await db_service.save_supply(sample_supply_data)
# Save consumption
saved_consumption = await db_service.save_consumption(sample_consumption_data)
assert saved_consumption.cups == sample_consumption_data["cups"]
assert saved_consumption.value_kwh == sample_consumption_data["value_kwh"]
assert saved_consumption.real == sample_consumption_data["real"]
assert saved_consumption.id is not None
# Get consumptions
consumptions = await db_service.get_consumptions(
sample_consumption_data["cups"]
)
assert len(consumptions) == 1
assert consumptions[0].value_kwh == sample_consumption_data["value_kwh"]
@pytest.mark.asyncio
async def test_get_consumptions_with_date_filter(
self, db_service, sample_supply_data, sample_consumption_data
):
"""Test getting consumptions with date range filter."""
# Save supply first
await db_service.save_supply(sample_supply_data)
# Save multiple consumptions with different dates
consumption1 = sample_consumption_data.copy()
consumption1["datetime"] = datetime(2024, 6, 15, 10, 0)
consumption2 = sample_consumption_data.copy()
consumption2["datetime"] = datetime(2024, 6, 16, 10, 0)
consumption3 = sample_consumption_data.copy()
consumption3["datetime"] = datetime(2024, 6, 17, 10, 0)
await db_service.save_consumption(consumption1)
await db_service.save_consumption(consumption2)
await db_service.save_consumption(consumption3)
# Get consumptions with date filter
start_date = datetime(2024, 6, 15, 12, 0) # After first consumption
end_date = datetime(2024, 6, 16, 12, 0) # Before third consumption
filtered_consumptions = await db_service.get_consumptions(
cups=sample_consumption_data["cups"],
start_date=start_date,
end_date=end_date,
)
# Should only get the second consumption
assert len(filtered_consumptions) == 1
assert filtered_consumptions[0].datetime == datetime(2024, 6, 16, 10, 0)
@pytest.mark.asyncio
async def test_save_and_get_maxpower(
self, db_service, sample_supply_data, sample_maxpower_data
):
"""Test saving and retrieving maxpower data."""
# Save supply first
await db_service.save_supply(sample_supply_data)
# Save maxpower
saved_maxpower = await db_service.save_maxpower(sample_maxpower_data)
assert saved_maxpower.cups == sample_maxpower_data["cups"]
assert saved_maxpower.value_kw == sample_maxpower_data["value_kw"]
assert saved_maxpower.id is not None
# Get maxpower readings
maxpower_readings = await db_service.get_maxpower_readings(
sample_maxpower_data["cups"]
)
assert len(maxpower_readings) == 1
assert maxpower_readings[0].value_kw == sample_maxpower_data["value_kw"]
@pytest.mark.asyncio
async def test_consumption_unique_constraint(
self, db_service, sample_supply_data, sample_consumption_data
):
"""Test that consumption unique constraint works (cups + datetime)."""
# Save supply first
await db_service.save_supply(sample_supply_data)
# Save first consumption
await db_service.save_consumption(sample_consumption_data)
# Try to save consumption with same cups + datetime but different value
updated_consumption = sample_consumption_data.copy()
updated_consumption["value_kwh"] = 1.5
# This should update, not create new
await db_service.save_consumption(updated_consumption)
# Should still have only one consumption, but with updated value
consumptions = await db_service.get_consumptions(
sample_consumption_data["cups"]
)
assert len(consumptions) == 1
assert consumptions[0].value_kwh == 1.5
@pytest.mark.asyncio
async def test_save_from_pydantic_models(self, db_service):
"""Test saving data from Pydantic models."""
cups = "ES1234567890123456789"
# Create Pydantic models
supply = SupplyModel(
cups=cups,
date_start=datetime(2024, 1, 1),
date_end=datetime(2024, 12, 31),
address="Test Address",
postal_code="12345",
province="Test Province",
municipality="Test Municipality",
distributor="Test Distributor",
point_type=5,
distributor_code="123",
)
contract = ContractModel(
cups=cups,
date_start=datetime(2024, 1, 1),
date_end=datetime(2024, 12, 31),
marketer="Test Marketer",
distributor_code="123",
power_p1=4.4,
power_p2=4.4,
)
consumption = ConsumptionModel(
cups=cups, datetime=datetime(2024, 6, 15, 12, 0), delta_h=1.0, value_kwh=0.5
)
maxpower = MaxPowerModel(
cups=cups, datetime=datetime(2024, 6, 15, 15, 30), value_kw=3.2
)
# Save using the batch method
await db_service.save_from_pydantic_models(
cups=cups,
supplies=[supply],
contracts=[contract],
consumptions=[consumption],
maximeter=[maxpower],
)
# Verify data was saved
saved_supply = await db_service.get_supply(cups)
assert saved_supply is not None
assert saved_supply.cups == cups
saved_contracts = await db_service.get_contracts(cups)
assert len(saved_contracts) == 1
assert saved_contracts[0].marketer == "Test Marketer"
saved_consumptions = await db_service.get_consumptions(cups)
assert len(saved_consumptions) == 1
assert saved_consumptions[0].value_kwh == 0.5
saved_maxpower = await db_service.get_maxpower_readings(cups)
assert len(saved_maxpower) == 1
assert saved_maxpower[0].value_kw == 3.2
@pytest.mark.asyncio
async def test_database_relationships(
self, db_service, sample_supply_data, sample_contract_data
):
"""Test that database relationships work correctly."""
# Save supply and contract
await db_service.save_supply(sample_supply_data)
await db_service.save_contract(sample_contract_data)
# Get supply with relationships (this would work if we load with relationships)
supply = await db_service.get_supply(sample_supply_data["cups"])
assert supply is not None
assert supply.cups == sample_supply_data["cups"]
# Verify foreign key constraint works
contracts = await db_service.get_contracts(sample_supply_data["cups"])
assert len(contracts) == 1
assert contracts[0].cups == sample_supply_data["cups"]
@pytest.mark.asyncio
async def test_invalid_cups_foreign_key(self, db_service, sample_contract_data):
"""Test that foreign key constraint prevents orphaned records."""
# Try to save contract without supply (should fail or handle gracefully)
# Note: This depends on SQLite foreign key enforcement
try:
await db_service.save_contract(sample_contract_data)
# If it doesn't raise an error, verify the record wasn't actually saved
# or that the database handles it appropriately
except Exception:
# Expected if foreign key constraints are enforced
pass
@pytest.mark.asyncio
async def test_default_storage_dir(self):
"""Test that default storage directory is used when none provided."""
import tempfile
test_dir = tempfile.mkdtemp()
try:
# Reset the global singleton to allow testing default directory
import edata.services.database
edata.services.database._db_service = None
with patch("edata.services.database.DEFAULT_STORAGE_DIR", test_dir):
service = await get_database_service()
# Check that service was created with the correct directory
assert service._db_dir == test_dir
assert os.path.exists(service._db_dir)
finally:
# Clean up
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
"""Tests for MaximeterService."""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from edata.models.database import MaxPowerModel
from edata.services.maximeter import MaximeterService
@pytest_asyncio.fixture
async def maximeter_service():
"""Create a maximeter service with mocked dependencies."""
with patch("edata.services.maximeter.get_database_service") as mock_db_factory:
mock_db = Mock()
# Hacer que los métodos de la base de datos retornen AsyncMock
mock_db.get_maxpower = AsyncMock(return_value=[])
mock_db.save_maxpower = AsyncMock(return_value=Mock())
mock_db_factory.return_value = mock_db
# Create a mock DatadisConnector
mock_datadis_connector = Mock()
service = MaximeterService(datadis_connector=mock_datadis_connector)
service._db_service = mock_db
return service
@pytest.fixture
def sample_maximeter():
"""Sample maximeter data for testing."""
return [
MaxPowerModel(
cups="ES001234567890123456AB",
datetime=datetime(2023, 1, 15, 14, 30),
value_kw=2.5,
),
MaxPowerModel(
cups="ES001234567890123456AB",
datetime=datetime(2023, 2, 20, 16, 45),
value_kw=3.2,
),
MaxPowerModel(
cups="ES001234567890123456AB",
datetime=datetime(2023, 3, 10, 12, 15),
value_kw=1.8,
),
]
class TestMaximeterService:
"""Test class for MaximeterService."""
@pytest.mark.asyncio
async def test_get_maximeter_summary(self, maximeter_service, sample_maximeter):
"""Test getting maximeter summary."""
# Setup mocks
maximeter_service.get_stored_maxpower = AsyncMock(return_value=sample_maximeter)
# Execute
result = await maximeter_service.get_maximeter_summary("ES001234567890123456AB")
# Verify
assert result["max_power_kW"] == 3.2 # max value
assert result["max_power_date"] == datetime(2023, 2, 20, 16, 45)
assert result["max_power_mean_kW"] == 2.5 # (2.5 + 3.2 + 1.8) / 3
assert result["max_power_90perc_kW"] == 3.2 # 90th percentile
@pytest.mark.asyncio
async def test_get_maximeter_summary_no_data(self, maximeter_service):
"""Test getting maximeter summary with no data."""
# Setup mocks
maximeter_service.get_stored_maxpower = AsyncMock(return_value=[])
# Execute
result = await maximeter_service.get_maximeter_summary("ES001234567890123456AB")
# Verify
assert result["max_power_kW"] is None
assert result["max_power_date"] is None
assert result["max_power_mean_kW"] is None
assert result["max_power_90perc_kW"] is None
"""Tests for SupplyService."""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from edata.models.database import SupplyModel
from edata.services.supply import SupplyService
@pytest_asyncio.fixture
async def supply_service():
"""Create a supply service with mocked dependencies."""
with patch("edata.services.supply.get_database_service") as mock_db_factory:
mock_db = Mock()
# Hacer que los métodos de la base de datos retornen AsyncMock
mock_db.get_supplies = AsyncMock(return_value=[])
mock_db.save_supply = AsyncMock(return_value=Mock())
mock_db_factory.return_value = mock_db
# Create a mock DatadisConnector
mock_datadis_connector = Mock()
service = SupplyService(datadis_connector=mock_datadis_connector)
service._db_service = mock_db
return service
@pytest.fixture
def sample_supplies():
"""Sample supply data for testing."""
return [
SupplyModel(
cups="ES001234567890123456AB",
distributor_code="123",
point_type=5,
date_start=datetime(2023, 1, 1),
date_end=datetime(2024, 12, 31),
address="Test Address 1",
postal_code="12345",
province="Test Province 1",
municipality="Test Municipality 1",
distributor="Test Distributor 1",
),
SupplyModel(
cups="ES987654321098765432BA",
distributor_code="456",
point_type=4,
date_start=datetime(2023, 6, 1),
date_end=datetime(2025, 6, 1),
address="Test Address 2",
postal_code="67890",
province="Test Province 2",
municipality="Test Municipality 2",
distributor="Test Distributor 2",
),
]
class TestSupplyService:
"""Test class for SupplyService."""
@pytest.mark.asyncio
async def test_update_supplies_success(self, supply_service):
"""Test successful supply update."""
# Setup mocks - now returns Pydantic models
supply_service._datadis.get_supplies = AsyncMock(
return_value=[
SupplyModel(
cups="ES001234567890123456AB",
distributor_code="123",
point_type=5,
date_start=datetime(2023, 1, 1),
date_end=datetime(2024, 12, 31),
address="Test Address",
postal_code="12345",
province="Test Province",
municipality="Test Municipality",
distributor="Test Distributor",
)
]
)
supply_service._db_service.get_supplies.side_effect = [
[],
[Mock()],
] # No existing, then 1 stored
supply_service._db_service.save_supply.return_value = Mock()
# Execute
result = await supply_service.update_supplies()
# Verify
assert result["success"] is True
assert result["stats"]["fetched"] == 1
assert result["stats"]["saved"] == 1
assert result["stats"]["updated"] == 0
supply_service._datadis.get_supplies.assert_called_once()
supply_service._db_service.save_supply.assert_called_once()
@pytest.mark.asyncio
async def test_get_supplies(self, supply_service, sample_supplies):
"""Test getting supplies."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = sample_supplies
# Execute
result = await supply_service.get_supplies()
# Verify
assert len(result) == 2
assert result[0].cups == "ES001234567890123456AB"
assert result[1].cups == "ES987654321098765432BA"
supply_service._db_service.get_supplies.assert_called_once_with(cups=None)
@pytest.mark.asyncio
async def test_get_supply_by_cups(self, supply_service, sample_supplies):
"""Test getting supply by CUPS."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = [sample_supplies[0]]
# Execute
result = await supply_service.get_supply_by_cups("ES001234567890123456AB")
# Verify
assert result is not None
assert result.cups == "ES001234567890123456AB"
assert result.distributor == "Test Distributor 1"
supply_service._db_service.get_supplies.assert_called_once_with(
cups="ES001234567890123456AB"
)
@pytest.mark.asyncio
async def test_get_cups_list(self, supply_service, sample_supplies):
"""Test getting CUPS list."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = sample_supplies
# Execute
result = await supply_service.get_cups_list()
# Verify
assert len(result) == 2
assert "ES001234567890123456AB" in result
assert "ES987654321098765432BA" in result
@pytest.mark.asyncio
async def test_get_active_supplies(self, supply_service, sample_supplies):
"""Test getting active supplies."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = sample_supplies
# Execute - test with date in 2024 (both should be active)
result = await supply_service.get_active_supplies(datetime(2024, 6, 15))
# Verify
assert len(result) == 2 # Both supplies should be active in 2024
for supply in result:
assert supply.date_start <= datetime(2024, 6, 15) <= supply.date_end
@pytest.mark.asyncio
async def test_get_supply_stats(self, supply_service, sample_supplies):
"""Test getting supply statistics."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = sample_supplies
# Execute
result = await supply_service.get_supply_stats()
# Verify
# Verify
assert result["total_supplies"] == 2
assert result["total_cups"] == 2
assert result["date_range"]["earliest_start"] == datetime(2023, 1, 1)
assert result["date_range"]["latest_end"] == datetime(2025, 6, 1)
assert result["point_types"] == {5: 1, 4: 1}
assert result["distributors"] == {
"Test Distributor 1": 1,
"Test Distributor 2": 1,
}
@pytest.mark.asyncio
async def test_validate_cups(self, supply_service, sample_supplies):
"""Test CUPS validation."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = [sample_supplies[0]]
# Execute
result = await supply_service.validate_cups("ES001234567890123456AB")
# Verify
assert result is True
# Test invalid CUPS
supply_service._db_service.get_supplies.return_value = []
result = await supply_service.validate_cups("INVALID_CUPS")
assert result is False
@pytest.mark.asyncio
async def test_get_distributor_code(self, supply_service, sample_supplies):
"""Test getting distributor code."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = [sample_supplies[0]]
# Execute
result = await supply_service.get_distributor_code("ES001234567890123456AB")
# Verify
assert result == "123"
@pytest.mark.asyncio
async def test_get_point_type(self, supply_service, sample_supplies):
"""Test getting point type."""
# Setup mocks
supply_service._db_service.get_supplies.return_value = [sample_supplies[0]]
# Execute
result = await supply_service.get_point_type("ES001234567890123456AB")
# Verify
assert result == 5
@pytest.mark.asyncio
async def test_update_supplies_no_data(self, supply_service):
"""Test supply update with no data returned."""
# Setup mocks
supply_service._datadis.get_supplies = AsyncMock(return_value=[])
# Execute
result = await supply_service.update_supplies()
# Verify
assert result["success"] is True
assert result["stats"]["fetched"] == 0
assert result["stats"]["saved"] == 0
@pytest.mark.asyncio
async def test_update_supplies_error(self, supply_service):
"""Test supply update with error."""
# Setup mocks
supply_service._datadis.get_supplies = AsyncMock(
side_effect=Exception("API Error")
)
# Execute
result = await supply_service.update_supplies()
# Verify
assert result["success"] is False
assert "error" in result
assert result["error"] == "API Error"
"""Integration tests for EdataHelper with service-based architecture."""
from datetime import datetime
from unittest.mock import AsyncMock, Mock, patch
import pytest
from freezegun import freeze_time
from edata.const import ATTRIBUTES
from edata.helpers import EdataHelper
from edata.models.consumption import Consumption
from edata.models.contract import Contract
from edata.models.maximeter import MaxPower
from edata.models.pricing import PricingRules
from edata.models.supply import Supply
# Test data constants
TEST_CUPS = "ES1234000000000001JN0F"
TEST_USERNAME = "testuser"
TEST_PASSWORD = "testpass"
TEST_NIF = "12345678Z"
AT_TIME = "2023-10-15"
# Sample pricing rules for testing
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
p1_kwh_eur=None, # PVPC mode
p2_kwh_eur=None,
p3_kwh_eur=None,
surplus_p1_kwh_eur=None,
surplus_p2_kwh_eur=None,
surplus_p3_kwh_eur=None,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
PRICING_RULES_FIXED = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
p1_kwh_eur=0.12, # Fixed prices
p2_kwh_eur=0.10,
p3_kwh_eur=0.08,
surplus_p1_kwh_eur=0.05,
surplus_p2_kwh_eur=0.04,
surplus_p3_kwh_eur=0.03,
energy_formula="electricity_tax * iva_tax * kwh_eur * kwh",
power_formula="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
others_formula="iva_tax * meter_month_eur / 30 / 24",
surplus_formula="electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
main_formula="energy_term + power_term + others_term",
)
# Sample supply data
SAMPLE_SUPPLY = Supply(
cups=TEST_CUPS,
distributor_code="0031",
point_type=5,
date_start=datetime(2020, 1, 1),
date_end=datetime(2025, 12, 31),
address="Test Address 123",
postal_code="28001",
province="Madrid",
municipality="Madrid",
distributor="Test Distributor",
)
# Sample contract data
SAMPLE_CONTRACT = Contract(
distributor_code="0031",
date_start=datetime(2023, 1, 1),
date_end=datetime(2023, 12, 31),
power_p1=5.75,
power_p2=5.75,
marketer="Test Marketer",
)
# Sample consumption data
SAMPLE_CONSUMPTIONS = [
Consumption(
datetime=datetime(2023, 10, 14, hour),
delta_h=1.0,
value_kwh=0.5 + hour * 0.1,
surplus_kwh=0.0,
)
for hour in range(24)
]
# Sample maximeter data
SAMPLE_MAXPOWER = [
MaxPower(
datetime=datetime(2023, 10, day),
value_kw=4.5 + day * 0.1,
)
for day in range(1, 15)
]
class TestEdataHelperIntegration:
"""Integration tests for EdataHelper with mocked services."""
def test_initialization_pvpc(self):
"""Test EdataHelper initialization with PVPC pricing."""
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
datadis_authorized_nif=TEST_NIF,
pricing_rules=PRICING_RULES_PVPC,
storage_dir_path=None,
)
# Test basic properties
assert helper._cups == TEST_CUPS
assert helper._scups == "1JN0F"
assert helper._authorized_nif == TEST_NIF
assert helper.pricing_rules == PRICING_RULES_PVPC
assert helper.enable_billing is True
assert helper.is_pvpc is True
# Test attributes initialization
assert len(helper.attributes) == len(ATTRIBUTES)
for attr in ATTRIBUTES:
assert helper.attributes[attr] is None
# Test that attributes and summary are the same object
assert helper.attributes is helper.summary
# Test services initialization
assert helper._supply_service is not None
assert helper._contract_service is not None
assert helper._consumption_service is not None
assert helper._maximeter_service is not None
assert helper._billing_service is not None
def test_initialization_fixed_pricing(self):
"""Test EdataHelper initialization with fixed pricing."""
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
pricing_rules=PRICING_RULES_FIXED,
)
assert helper.enable_billing is True
assert helper.is_pvpc is False
assert helper._billing_service is not None
def test_initialization_no_billing(self):
"""Test EdataHelper initialization without billing."""
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
pricing_rules=None,
)
assert helper.enable_billing is False
assert helper.is_pvpc is False
@freeze_time(AT_TIME)
@patch("edata.helpers.SupplyService")
@patch("edata.helpers.ContractService")
@patch("edata.helpers.ConsumptionService")
@patch("edata.helpers.MaximeterService")
@patch("edata.helpers.BillingService")
@pytest.mark.asyncio
async def test_update_successful_flow_pvpc(
self,
mock_billing_service,
mock_maximeter_service,
mock_consumption_service,
mock_contract_service,
mock_supply_service,
):
"""Test successful update flow with PVPC pricing."""
# Setup mocks
mock_supply_instance = Mock()
mock_supply_instance.update_supplies = AsyncMock(return_value={"success": True})
mock_supply_instance.validate_cups = AsyncMock(return_value=True)
mock_supply_instance.get_supply_by_cups = AsyncMock(return_value=SAMPLE_SUPPLY)
mock_supply_instance.get_supply_summary = AsyncMock(
return_value={"cups": TEST_CUPS}
)
mock_supply_service.return_value = mock_supply_instance
mock_contract_instance = Mock()
mock_contract_instance.update_contracts = AsyncMock(
return_value={"success": True}
)
mock_contract_instance.get_contract_summary = AsyncMock(
return_value={
"contract_p1_kW": 5.75,
"contract_p2_kW": 5.75,
}
)
mock_contract_service.return_value = mock_contract_instance
mock_consumption_instance = Mock()
mock_consumption_instance.update_consumption_range_by_months = AsyncMock(
return_value={"success": True}
)
mock_consumption_instance.get_consumption_summary = AsyncMock(
return_value={
"yesterday_kWh": 12.5,
"month_kWh": 350.0,
"last_month_kWh": 340.0,
"last_registered_date": datetime(2023, 10, 14, 23),
}
)
mock_consumption_service.return_value = mock_consumption_instance
mock_maximeter_instance = Mock()
mock_maximeter_instance.update_maxpower_range_by_months = AsyncMock(
return_value={"success": True}
)
mock_maximeter_instance.get_maximeter_summary = AsyncMock(
return_value={
"max_power_kW": 5.8,
"max_power_date": datetime(2023, 10, 10),
"max_power_mean_kW": 4.5,
"max_power_90perc_kW": 5.2,
}
)
mock_maximeter_service.return_value = mock_maximeter_instance
mock_billing_instance = Mock()
mock_billing_instance.update_pvpc_prices = AsyncMock(
return_value={"success": True}
)
mock_billing_instance.update_missing_costs = AsyncMock(
return_value={"success": True}
)
mock_billing_instance.get_billing_summary = AsyncMock(
return_value={
"month_€": 45.67,
"last_month_€": 43.21,
}
)
mock_billing_service.return_value = mock_billing_instance
# Test update
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
pricing_rules=PRICING_RULES_PVPC,
)
date_from = datetime(2023, 1, 1)
date_to = datetime(2023, 10, 15)
result = await helper.update(date_from=date_from, date_to=date_to)
# Verify result
assert result is True
# Verify service calls
mock_supply_instance.update_supplies.assert_called_once_with(
authorized_nif=None
)
mock_supply_instance.validate_cups.assert_called_once_with(TEST_CUPS)
mock_supply_instance.get_supply_by_cups.assert_called_once_with(TEST_CUPS)
mock_contract_instance.update_contracts.assert_called_once_with(
cups=TEST_CUPS, distributor_code="0031", authorized_nif=None
)
mock_consumption_instance.update_consumption_range_by_months.assert_called_once_with(
cups=TEST_CUPS,
distributor_code="0031",
start_date=date_from, # Use the original date_from since it's after supply start
end_date=date_to,
measurement_type="0",
point_type=5,
authorized_nif=None,
)
mock_maximeter_instance.update_maxpower_range_by_months.assert_called_once()
mock_billing_instance.update_pvpc_prices.assert_called_once()
mock_billing_instance.update_missing_costs.assert_called_once()
# Verify summary attributes
assert helper.attributes["cups"] == TEST_CUPS
assert helper.attributes["contract_p1_kW"] == 5.75
assert helper.attributes["contract_p2_kW"] == 5.75
assert helper.attributes["yesterday_kWh"] == 12.5
assert helper.attributes["month_kWh"] == 350.0
assert helper.attributes["last_month_kWh"] == 340.0
assert helper.attributes["max_power_kW"] == 5.8
assert helper.attributes["month_€"] == 45.67
assert helper.attributes["last_month_€"] == 43.21
@freeze_time(AT_TIME)
@patch("edata.helpers.SupplyService")
@patch("edata.helpers.ContractService")
@patch("edata.helpers.ConsumptionService")
@patch("edata.helpers.MaximeterService")
@patch("edata.helpers.BillingService")
@pytest.mark.asyncio
async def test_update_with_service_failures(
self,
mock_billing_service,
mock_maximeter_service,
mock_consumption_service,
mock_contract_service,
mock_supply_service,
):
"""Test update flow with some service failures."""
# Setup mocks with some failures
mock_supply_instance = Mock()
mock_supply_instance.update_supplies = AsyncMock(return_value={"success": True})
mock_supply_instance.validate_cups = AsyncMock(return_value=True)
mock_supply_instance.get_supply_by_cups = AsyncMock(return_value=SAMPLE_SUPPLY)
mock_supply_instance.get_supply_summary = AsyncMock(
return_value={"cups": TEST_CUPS}
)
mock_supply_service.return_value = mock_supply_instance
mock_contract_instance = Mock()
mock_contract_instance.update_contracts = AsyncMock(
return_value={"success": False, "error": "Contract API down"}
)
mock_contract_instance.get_contract_summary = AsyncMock(return_value={})
mock_contract_service.return_value = mock_contract_instance
mock_consumption_instance = Mock()
mock_consumption_instance.update_consumption_range_by_months = AsyncMock(
return_value={"success": False}
)
mock_consumption_instance.get_consumption_summary = AsyncMock(return_value={})
mock_consumption_service.return_value = mock_consumption_instance
mock_maximeter_instance = Mock()
mock_maximeter_instance.update_maxpower_range_by_months = AsyncMock(
return_value={"success": True}
)
mock_maximeter_instance.get_maximeter_summary = AsyncMock(
return_value={"max_power_kW": 5.8}
)
mock_maximeter_service.return_value = mock_maximeter_instance
mock_billing_instance = Mock()
mock_billing_instance.update_pvpc_prices = AsyncMock(
return_value={"success": False, "error": "PVPC API error"}
)
mock_billing_instance.get_billing_summary = AsyncMock(return_value={})
mock_billing_service.return_value = mock_billing_instance
# Test update
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
pricing_rules=PRICING_RULES_PVPC,
)
result = await helper.update()
# Update should still succeed even with some service failures
assert result is True
# Verify summary attributes include successful services
assert helper.attributes["cups"] == TEST_CUPS
assert helper.attributes["max_power_kW"] == 5.8
# Failed services should have None values
assert helper.attributes["contract_p1_kW"] is None
assert helper.attributes["yesterday_kWh"] is None
@patch("edata.helpers.SupplyService")
@pytest.mark.asyncio
async def test_update_supply_failure(self, mock_supply_service):
"""Test update with supply service failure."""
mock_supply_instance = Mock()
mock_supply_instance.update_supplies.return_value = {
"success": False,
"error": "Authentication failed",
}
mock_supply_service.return_value = mock_supply_instance
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
)
result = await helper.update()
# Should fail if supplies can't be updated
assert result is False
@patch("edata.helpers.SupplyService")
@pytest.mark.asyncio
async def test_update_cups_not_found(self, mock_supply_service):
"""Test update when CUPS is not found in account."""
mock_supply_instance = Mock()
mock_supply_instance.update_supplies.return_value = {"success": True}
mock_supply_instance.validate_cups.return_value = False
mock_supply_service.return_value = mock_supply_instance
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
)
result = await helper.update()
# Should fail if CUPS is not found
assert result is False
@patch("edata.helpers.SupplyService")
@patch("edata.helpers.ContractService")
@patch("edata.helpers.ConsumptionService")
@patch("edata.helpers.MaximeterService")
def test_calculate_summary_attributes_error_handling(
self,
mock_maximeter_service,
mock_consumption_service,
mock_contract_service,
mock_supply_service,
):
"""Test error handling in summary calculation."""
# Setup mock that raises exception
mock_supply_instance = Mock()
mock_supply_instance.get_supply_summary.side_effect = Exception(
"Database error"
)
mock_supply_service.return_value = mock_supply_instance
mock_contract_instance = Mock()
mock_contract_instance.get_contract_summary.return_value = {
"contract_p1_kW": 5.75
}
mock_contract_service.return_value = mock_contract_instance
mock_consumption_instance = Mock()
mock_consumption_instance.get_consumption_summary.return_value = {
"yesterday_kWh": 12.5
}
mock_consumption_service.return_value = mock_consumption_instance
mock_maximeter_instance = Mock()
mock_maximeter_instance.get_maximeter_summary.return_value = {
"max_power_kW": 5.8
}
mock_maximeter_service.return_value = mock_maximeter_instance
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
)
# Should not raise exception
# Note: We can't actually test the exception handling easily in async context
# but we can test that all attributes are None initially
for attr in ATTRIBUTES:
assert helper.attributes[attr] is None
@pytest.mark.asyncio
async def test_numeric_value_rounding(self):
"""Test that numeric values are properly rounded."""
with patch("edata.helpers.SupplyService") as mock_supply_service, patch(
"edata.helpers.ContractService"
) as mock_contract_service, patch(
"edata.helpers.ConsumptionService"
) as mock_consumption_service, patch(
"edata.helpers.MaximeterService"
) as mock_maximeter_service:
# Setup mocks with unrounded values
mock_supply_instance = Mock()
mock_supply_instance.get_supply_summary = AsyncMock(
return_value={"cups": TEST_CUPS}
)
mock_supply_service.return_value = mock_supply_instance
mock_contract_instance = Mock()
mock_contract_instance.get_contract_summary = AsyncMock(
return_value={"contract_p1_kW": 5.7523456}
)
mock_contract_service.return_value = mock_contract_instance
mock_consumption_instance = Mock()
mock_consumption_instance.get_consumption_summary = AsyncMock(
return_value={"yesterday_kWh": 12.54789}
)
mock_consumption_service.return_value = mock_consumption_instance
mock_maximeter_instance = Mock()
mock_maximeter_instance.get_maximeter_summary = AsyncMock(
return_value={"max_power_kW": 5.87654321}
)
mock_maximeter_service.return_value = mock_maximeter_instance
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
)
await helper._calculate_summary_attributes()
# Check rounding
assert helper.attributes["contract_p1_kW"] == 5.75
assert helper.attributes["yesterday_kWh"] == 12.55
assert helper.attributes["max_power_kW"] == 5.88
assert (
helper.attributes["cups"] == TEST_CUPS
) # String should not be affected
@pytest.mark.asyncio
async def test_date_range_adjustment(self):
"""Test that date ranges are properly adjusted to supply validity period."""
with patch("edata.helpers.SupplyService") as mock_supply_service, patch(
"edata.helpers.ContractService"
) as mock_contract_service, patch(
"edata.helpers.ConsumptionService"
) as mock_consumption_service, patch(
"edata.helpers.MaximeterService"
) as mock_maximeter_service:
# Supply with limited date range
limited_supply = Supply(
cups=TEST_CUPS,
distributor_code="0031",
point_type=5,
date_start=datetime(2023, 6, 1), # Later start
date_end=datetime(2023, 9, 30), # Earlier end
address="Test Address",
postal_code="28001",
province="Madrid",
municipality="Madrid",
distributor="Test Distributor",
)
mock_supply_instance = Mock()
mock_supply_instance.update_supplies = AsyncMock(
return_value={"success": True}
)
mock_supply_instance.validate_cups = AsyncMock(return_value=True)
mock_supply_instance.get_supply_by_cups = AsyncMock(
return_value=limited_supply
)
mock_supply_instance.get_supply_summary = AsyncMock(
return_value={"cups": TEST_CUPS}
)
mock_supply_service.return_value = mock_supply_instance
mock_contract_instance = Mock()
mock_contract_instance.update_contracts = AsyncMock(
return_value={"success": True}
)
mock_contract_instance.get_contract_summary = AsyncMock(return_value={})
mock_contract_service.return_value = mock_contract_instance
mock_consumption_instance = Mock()
mock_consumption_instance.update_consumption_range_by_months = AsyncMock(
return_value={"success": True}
)
mock_consumption_instance.get_consumption_summary = AsyncMock(
return_value={}
)
mock_consumption_service.return_value = mock_consumption_instance
mock_maximeter_instance = Mock()
mock_maximeter_instance.update_maxpower_range_by_months = AsyncMock(
return_value={"success": True}
)
mock_maximeter_instance.get_maximeter_summary = AsyncMock(return_value={})
mock_maximeter_service.return_value = mock_maximeter_instance
helper = EdataHelper(
datadis_username=TEST_USERNAME,
datadis_password=TEST_PASSWORD,
cups=TEST_CUPS,
)
# Request broader date range
result = await helper.update(
date_from=datetime(2023, 1, 1), date_to=datetime(2023, 12, 31)
)
assert result is True
# Verify that consumption service was called with adjusted dates
mock_consumption_instance.update_consumption_range_by_months.assert_called_once_with(
cups=TEST_CUPS,
distributor_code="0031",
start_date=datetime(2023, 6, 1), # Adjusted to supply start
end_date=datetime(2023, 9, 30), # Adjusted to supply end
measurement_type="0",
point_type=5,
authorized_nif=None,
)
"""Utility functions for edata package."""
import contextlib
import json
import logging
import math
from copy import deepcopy
from datetime import date, datetime, timedelta
from json import JSONEncoder
from typing import Any, Dict, List, Optional
import holidays
_LOGGER = logging.getLogger(__name__)
# PVPC tariff constants
HOURS_P1 = [10, 11, 12, 13, 18, 19, 20, 21]
HOURS_P2 = [8, 9, 14, 15, 16, 17, 22, 23]
WEEKDAYS_P3 = [5, 6]
def get_pvpc_tariff(a_datetime: datetime) -> str:
"""Evaluate the PVPC tariff for a given datetime.
Args:
a_datetime: The datetime to evaluate
Returns:
The tariff period: "p1", "p2", or "p3"
"""
hdays = holidays.country_holidays("ES")
hour = a_datetime.hour
weekday = a_datetime.weekday()
if weekday in WEEKDAYS_P3 or a_datetime.date() in hdays:
return "p3"
elif hour in HOURS_P1:
return "p1"
elif hour in HOURS_P2:
return "p2"
else:
return "p3"
def extend_by_key(
old_lst: List[Dict[str, Any]], new_lst: List[Dict[str, Any]], key: str
) -> List[Dict[str, Any]]:
"""Extend a list of dicts by key."""
lst = deepcopy(old_lst)
temp_list = []
for new_element in new_lst:
for old_element in lst:
if new_element[key] == old_element[key]:
for i in old_element:
old_element[i] = new_element[i]
break
else:
temp_list.append(new_element)
lst.extend(temp_list)
return lst
def extract_dt_ranges(
lst: List[Dict[str, Any]],
dt_from: datetime,
dt_to: datetime,
gap_interval: timedelta = timedelta(hours=1),
) -> tuple:
"""Filter a list of dicts between two datetimes."""
new_lst = []
missing = []
oldest_dt = None
newest_dt = None
last_dt = None
if len(lst) > 0:
sorted_lst = sorted(lst, key=lambda i: i["datetime"])
last_dt = dt_from
for i in sorted_lst:
if dt_from <= i["datetime"] <= dt_to:
if (i["datetime"] - last_dt) > gap_interval:
missing.append({"from": last_dt, "to": i["datetime"]})
if i.get("value_kWh", 1) > 0:
if oldest_dt is None or i["datetime"] < oldest_dt:
oldest_dt = i["datetime"]
if newest_dt is None or i["datetime"] > newest_dt:
newest_dt = i["datetime"]
if i["datetime"] != last_dt: # remove duplicates
new_lst.append(i)
last_dt = i["datetime"]
if dt_to > last_dt:
missing.append({"from": last_dt, "to": dt_to})
_LOGGER.debug("found data from %s to %s", oldest_dt, newest_dt)
else:
missing.append({"from": dt_from, "to": dt_to})
return new_lst, missing
def get_by_key(
lst: List[Dict[str, Any]], key: str, value: Any
) -> Optional[Dict[str, Any]]:
"""Obtain an element of a list of dicts by key=value."""
for i in lst:
if i[key] == value:
return i
return None
def serialize_dict(data: dict) -> dict:
"""Serialize dicts as json."""
class DateTimeEncoder(JSONEncoder):
"""Replace datetime objects with ISO strings."""
def default(self, o):
if isinstance(o, (date, datetime)):
return o.isoformat()
return json.loads(json.dumps(data, cls=DateTimeEncoder))
def deserialize_dict(serialized_dict: dict) -> dict:
"""Deserializes a json replacing ISOTIME strings into datetime."""
def datetime_parser(json_dict):
"""Parse JSON while converting ISO strings into datetime objects."""
for key, value in json_dict.items():
if "date" in key:
with contextlib.suppress(Exception):
json_dict[key] = datetime.fromisoformat(value)
return json_dict
return json.loads(json.dumps(serialized_dict), object_hook=datetime_parser)
def percentile(N: List, percent: float, key=lambda x: x):
"""Find the percentile of a list of values."""
if not N:
return None
k = (len(N) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(N[int(k)])
d0 = key(N[int(f)]) * (c - k)
d1 = key(N[int(c)]) * (k - f)
return d0 + d1
def extend_and_filter(
old_lst: List[Dict[str, Any]],
new_lst: List[Dict[str, Any]],
key: str,
dt_from: datetime,
dt_to: datetime,
) -> List[Dict[str, Any]]:
"""Extend and filter data by datetime range."""
data = extend_by_key(old_lst, new_lst, key)
data, _ = extract_dt_ranges(
data,
dt_from,
dt_to,
gap_interval=timedelta(days=365), # trick
)
return data
.PHONY: help install install-dev test lint format build clean publish publish-test
help: ## Show this help message
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
install: ## Install the package in development mode
pip install -e .
install-dev: ## Install with development dependencies
pip install -e ".[dev,test]"
test: ## Run tests
pytest
test-cov: ## Run tests with coverage
pytest --cov=edata --cov-report=html --cov-report=term
lint: ## Run linting checks
flake8 edata/
mypy edata/
format: ## Format code with black
black edata/
format-check: ## Check if code is formatted
black --check edata/
clean: ## Clean build artifacts
rm -rf build/
rm -rf dist/
rm -rf *.egg-info/
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name "*.pyc" -delete
build: ## Build the package
python -m build
publish-test: ## Publish to TestPyPI
python -m twine upload --repository testpypi dist/*
publish: ## Publish to PyPI
python -m twine upload dist/*
# Combined workflow commands
dev-setup: install-dev ## Setup development environment
pre-commit: format lint test ## Run all pre-commit checks
release: clean build publish ## Build and publish to PyPI
release-test: clean build publish-test ## Build and publish to TestPyPI
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "e-data"
version = "2.0.0b1"
authors = [
{name = "VMG", email = "vmayorg@outlook.es"},
]
description = "Python library for managing spanish energy data from various web providers"
readme = "README.md"
license = {text = "GPL-3.0-or-later"}
requires-python = ">=3.8"
classifiers = [
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: Implementation :: CPython",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Utilities",
]
keywords = ["energy", "data", "spain", "electricity", "consumption"]
dependencies = [
"dateparser>=1.1.2",
"holidays>=0.14.2",
"python-dateutil>=2.8.2",
"requests>=2.28.1",
"voluptuous>=0.13.1",
"Jinja2>=3.1.2",
"pydantic>=2.0.0",
"sqlmodel>=0.0.24",
"aiosqlite>=0.20.0",
"sqlalchemy[asyncio]>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.1.2",
"freezegun>=1.2.1",
"twine>=4.0.0",
"build>=0.10.0",
"black>=22.0.0",
"flake8>=5.0.0",
"mypy>=1.0.0",
]
test = [
"pytest>=7.1.2",
"freezegun>=1.2.1",
"pytest-cov>=4.0.0",
"pytest-asyncio>=0.20.0",
]
[project.urls]
Homepage = "https://github.com/uvejota/python-edata"
Repository = "https://github.com/uvejota/python-edata"
Issues = "https://github.com/uvejota/python-edata/issues"
[tool.setuptools]
packages = ["edata"]
[tool.setuptools.package-data]
edata = ["py.typed"]
# Configuración para herramientas de desarrollo
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
# directories
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| build
| dist
)/
'''
[tool.pytest.ini_options]
testpaths = ["edata/tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
+16
-18

@@ -0,24 +1,22 @@

.python-version
LICENSE
MANIFEST.in
Makefile
README.md
setup.cfg
setup.py
e_data.egg-info/PKG-INFO
e_data.egg-info/SOURCES.txt
e_data.egg-info/dependency_links.txt
e_data.egg-info/requires.txt
e_data.egg-info/top_level.txt
pyproject.toml
edata/__init__.py
edata/const.py
edata/definitions.py
edata/helpers.py
edata/storage.py
edata/connectors/__init__.py
edata/connectors/datadis.py
edata/connectors/redata.py
edata/processors/__init__.py
edata/processors/base.py
edata/processors/billing.py
edata/processors/consumption.py
edata/processors/maximeter.py
edata/processors/utils.py
edata/utils.py
edata/tests/__init__.py
edata/tests/test_helpers.py
edata/tests/connectors/__init__.py
edata/tests/connectors/test_datadis_connector.py
edata/tests/connectors/test_redata_connector.py
edata/tests/services/__init__.py
edata/tests/services/test_billing_service.py
edata/tests/services/test_consumption_service.py
edata/tests/services/test_contract_service.py
edata/tests/services/test_database_service.py
edata/tests/services/test_maximeter_service.py
edata/tests/services/test_supply_service.py
"""Constants file."""
PROG_NAME = "edata"
DEFAULT_STORAGE_DIR = "edata.storage"
# Attributes definition for backward compatibility
ATTRIBUTES = {
"cups": None,
"contract_p1_kW": "kW",
"contract_p2_kW": "kW",
"yesterday_kWh": "kWh",
"yesterday_hours": "h",
"yesterday_p1_kWh": "kWh",
"yesterday_p2_kWh": "kWh",
"yesterday_p3_kWh": "kWh",
"yesterday_surplus_kWh": "kWh",
"yesterday_surplus_p1_kWh": "kWh",
"yesterday_surplus_p2_kWh": "kWh",
"yesterday_surplus_p3_kWh": "kWh",
"last_registered_date": None,
"last_registered_day_kWh": "kWh",
"last_registered_day_hours": "h",
"last_registered_day_p1_kWh": "kWh",
"last_registered_day_p2_kWh": "kWh",
"last_registered_day_p3_kWh": "kWh",
"last_registered_day_surplus_kWh": "kWh",
"last_registered_day_surplus_p1_kWh": "kWh",
"last_registered_day_surplus_p2_kWh": "kWh",
"last_registered_day_surplus_p3_kWh": "kWh",
"month_kWh": "kWh",
"month_daily_kWh": "kWh",
"month_days": "d",
"month_p1_kWh": "kWh",
"month_p2_kWh": "kWh",
"month_p3_kWh": "kWh",
"month_surplus_kWh": "kWh",
"month_surplus_p1_kWh": "kWh",
"month_surplus_p2_kWh": "kWh",
"month_surplus_p3_kWh": "kWh",
"month_€": "€",
"last_month_kWh": "kWh",
"last_month_daily_kWh": "kWh",
"last_month_days": "d",
"last_month_p1_kWh": "kWh",
"last_month_p2_kWh": "kWh",
"last_month_p3_kWh": "kWh",
"last_month_surplus_kWh": "kWh",
"last_month_surplus_p1_kWh": "kWh",
"last_month_surplus_p2_kWh": "kWh",
"last_month_surplus_p3_kWh": "kWh",
"last_month_€": "€",
"max_power_kW": "kW",
"max_power_date": None,
"max_power_mean_kW": "kW",
"max_power_90perc_kW": "kW",
}
"""A module for edata helpers."""
import asyncio
import contextlib
from datetime import datetime, timedelta
import logging
import os
from datetime import datetime
from typing import Any, Dict
from dateutil.relativedelta import relativedelta
import requests
from edata.connectors.datadis import DatadisConnector
from edata.const import ATTRIBUTES
from edata.models.pricing import PricingRules
from edata.services.billing import BillingService
from edata.services.consumption import ConsumptionService
from edata.services.contract import ContractService
from edata.services.maximeter import MaximeterService
from edata.services.supply import SupplyService
from . import const
from .connectors.datadis import DatadisConnector
from .connectors.redata import REDataConnector
from .definitions import ATTRIBUTES, EdataData, PricingRules
from .processors import utils
from .processors.billing import BillingInput, BillingProcessor
from .processors.consumption import ConsumptionProcessor
from .processors.maximeter import MaximeterProcessor
from .storage import check_storage_integrity, dump_storage, load_storage
_LOGGER = logging.getLogger(__name__)

@@ -27,3 +21,2 @@

"""Print an abbreviated and anonymized CUPS."""
return cups[-5:]

@@ -33,6 +26,4 @@

class EdataHelper:
"""Main EdataHelper class."""
"""Main EdataHelper class using service-based architecture."""
UPDATE_INTERVAL = timedelta(hours=1)
def __init__(

@@ -46,727 +37,279 @@ self,

storage_dir_path: str | None = None,
data: EdataData | None = None,
enable_smart_fetch: bool = True,
) -> None:
self.data = EdataData(
supplies=[],
contracts=[],
consumptions=[],
maximeter=[],
pvpc=[],
consumptions_daily_sum=[],
consumptions_monthly_sum=[],
cost_hourly_sum=[],
cost_daily_sum=[],
cost_monthly_sum=[],
)
"""Initialize EdataHelper with service-based architecture.
self.attributes = {}
self._storage_dir = storage_dir_path
Args:
datadis_username: Datadis username
datadis_password: Datadis password
cups: CUPS identifier
datadis_authorized_nif: Optional authorized NIF
pricing_rules: Pricing configuration
storage_dir_path: Directory for database and cache storage
enable_smart_fetch: Enable smart fetching in datadis connector
"""
self._cups = cups
self._scups = acups(cups)
self._authorized_nif = datadis_authorized_nif
self.last_update = {x: datetime(1970, 1, 1) for x in self.data}
self._date_from = datetime(1970, 1, 1)
self._date_to = datetime.today()
self._must_dump = True
self._incremental_update = True
self._storage_dir = storage_dir_path
self.pricing_rules = pricing_rules
if data is not None:
data = check_storage_integrity(data)
self.data = data
else:
with contextlib.suppress(Exception):
self.data = load_storage(self._cups, self._storage_dir)
# Initialize summary attributes
self.summary: Dict[str, Any] = {}
for attr in ATTRIBUTES:
self.attributes[attr] = None
self.summary[attr] = None
self.datadis_api = DatadisConnector(
datadis_username,
datadis_password,
storage_path=(
os.path.join(storage_dir_path, const.PROG_NAME)
if storage_dir_path is not None
else None
),
)
self.redata_api = REDataConnector()
# For backward compatibility, alias 'attributes' to 'summary'
self.attributes = self.summary
self.pricing_rules = pricing_rules
if self.pricing_rules is not None:
self.enable_billing = True
if not all(
x in self.pricing_rules and self.pricing_rules[x] is not None
# Determine if using PVPC pricing
self.enable_billing = pricing_rules is not None
if self.enable_billing:
self.is_pvpc = not all(
getattr(pricing_rules, x, None) is not None
for x in ("p1_kwh_eur", "p2_kwh_eur", "p3_kwh_eur")
):
self.is_pvpc = True
else:
self.is_pvpc = False
)
else:
self.enable_billing = False
self.is_pvpc = False
async def async_update(
self,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
):
"""Async call of update method."""
asyncio.get_event_loop().run_in_executor(
None, self.update, *[date_from, date_to]
# Create shared Datadis connector
self._datadis_connector = DatadisConnector(
username=datadis_username,
password=datadis_password,
enable_smart_fetch=enable_smart_fetch,
storage_path=storage_dir_path,
)
def update(
self,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
incremental_update: bool = True,
):
"""Update synchronously."""
_LOGGER.info(
"%s: update triggered",
self._scups,
# Initialize services with dependency injection
self._supply_service = SupplyService(
datadis_connector=self._datadis_connector,
storage_dir=storage_dir_path,
)
self._date_from = date_from
self._date_to = date_to
# update datadis resources
self.update_datadis(self._cups, date_from, date_to)
# update redata resources if pvpc is requested
if self.is_pvpc:
try:
self.update_redata(date_from, date_to)
except requests.exceptions.Timeout:
_LOGGER.error("Timeout exception while updating from REData")
self.process_data(incremental_update=incremental_update)
if self._must_dump:
dump_storage(self._cups, self.data, self._storage_dir)
def update_supplies(self):
"""Update supplies."""
_LOGGER.debug("%s: supplies update triggered", self._scups)
if datetime.today().date() != self.last_update["supplies"].date():
# if supplies haven't been updated today
supplies = self.datadis_api.get_supplies(
authorized_nif=self._authorized_nif
) # fetch supplies
if len(supplies) > 0:
self.data["supplies"] = supplies
# if we got something, update last_update flag
self.last_update["supplies"] = datetime.now()
_LOGGER.info("%s: supplies update succeeded", self._scups)
else:
_LOGGER.info("%s: supplies are already updated (skipping)", self._scups)
def update_contracts(self, cups: str, distributor_code: str):
"""Update contracts."""
_LOGGER.debug("%s: contracts update triggered", self._scups)
if datetime.today().date() != self.last_update["contracts"].date():
# if contracts haven't been updated today
contracts = self.datadis_api.get_contract_detail(
cups, distributor_code, authorized_nif=self._authorized_nif
)
if len(contracts) > 0:
self.data["contracts"] = utils.extend_by_key(
self.data["contracts"], contracts, "date_start"
) # extend contracts data with new ones
# if we got something, update last_update flag
self.last_update["contracts"] = datetime.now()
_LOGGER.info("%s: contracts update succeeded", self._scups)
else:
_LOGGER.info("%s: contracts are already updated (skipping)", self._scups)
def update_consumptions(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
):
"""Update consumptions."""
_LOGGER.debug("%s: consumptions update triggered", self._scups)
if (datetime.now() - self.last_update["consumptions"]) > self.UPDATE_INTERVAL:
consumptions = self.datadis_api.get_consumption_data(
cups,
distributor_code,
start_date,
end_date,
measurement_type,
point_type,
authorized_nif=self._authorized_nif,
)
if len(consumptions) > 0:
_LOGGER.info(
"%s: got consumptions from %s to %s",
self._scups,
consumptions[0]["datetime"].isoformat(),
consumptions[-1]["datetime"].isoformat(),
)
self.data["consumptions"] = utils.extend_by_key(
self.data["consumptions"], consumptions, "datetime"
)
self.last_update["consumptions"] = datetime.now()
else:
_LOGGER.info("%s: consumptions are up to date", self._scups)
else:
_LOGGER.info("%s: consumptions are already updated (skipping)", self._scups)
def update_maximeter(self, cups, distributor_code, start_date, end_date):
"""Update maximeter."""
_LOGGER.debug("%s: maximeter update triggered", self._scups)
if (datetime.now() - self.last_update["maximeter"]) > self.UPDATE_INTERVAL:
maximeter = self.datadis_api.get_max_power(
cups,
distributor_code,
start_date,
end_date,
authorized_nif=self._authorized_nif,
)
if len(maximeter) > 0:
_LOGGER.info(
"%s: maximeter update succeeded",
self._scups,
)
self.data["maximeter"] = utils.extend_by_key(
self.data["maximeter"], maximeter, "datetime"
)
self.last_update["maximeter"] = datetime.now()
else:
_LOGGER.info("%s: maximeter is up to date", self._scups)
else:
_LOGGER.info("%s: maximeter is already updated (skipping)", self._scups)
def update_datadis(
self,
cups: str,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
):
"""Update all data from Datadis."""
_LOGGER.info(
"%s: datadis update triggered (from %s to %s)",
self._scups,
date_from.isoformat(),
date_to.isoformat(),
self._contract_service = ContractService(
datadis_connector=self._datadis_connector,
storage_dir=storage_dir_path,
)
# update supplies and get distributorCode
self.update_supplies()
if len(self.data["supplies"]) == 0:
# return if no supplies were discovered
_LOGGER.warning(
"%s: supplies update failed or no supplies found in the provided account",
self._scups,
)
return False
# find requested cups in supplies
supply = utils.get_by_key(self.data["supplies"], "cups", cups)
if supply is None:
# return if specified cups seems not valid
_LOGGER.error(
"%s: CUPS not found. Got: %s",
self._scups,
[acups(x["cups"]) for x in self.data["supplies"]],
)
return False
_LOGGER.info("%s: CUPS found in account", self._scups)
# get some supply-related data
supply_date_start = supply["date_start"]
distributor_code = supply["distributorCode"]
point_type = supply["pointType"]
_LOGGER.info(
"%s: CUPS start date is %s", self._scups, supply_date_start.isoformat()
self._consumption_service = ConsumptionService(
datadis_connector=self._datadis_connector,
storage_dir=storage_dir_path,
)
_LOGGER.info(
"%s: CUPS end date is %s", self._scups, supply["date_end"].isoformat()
)
# update contracts to get valid periods
self.update_contracts(cups, distributor_code)
if len(self.data["contracts"]) == 0:
_LOGGER.warning(
"%s: contracts update failed or no contracts found in the provided account",
self._scups,
)
# return False
# filter consumptions and maximeter, and log gaps
def sort_and_filter(dt_from, dt_to):
self.data["consumptions"], miss_cons = utils.extract_dt_ranges(
self.data["consumptions"],
dt_from,
dt_to,
gap_interval=timedelta(hours=6),
)
self.data["maximeter"], miss_maxim = utils.extract_dt_ranges(
self.data["maximeter"],
dt_from,
dt_to,
gap_interval=timedelta(days=60),
)
return miss_cons, miss_maxim
miss_cons, miss_maxim = sort_and_filter(date_from, date_to)
# update consumptions
_LOGGER.info(
"%s: missing consumptions: %s",
self._scups,
", ".join(
[
"from "
+ (x["from"] + timedelta(hours=1)).isoformat()
+ " to "
+ x["to"].isoformat()
for x in miss_cons
]
),
self._maximeter_service = MaximeterService(
datadis_connector=self._datadis_connector,
storage_dir=storage_dir_path,
)
for gap in miss_cons:
if not (
gap["to"] < supply["date_start"] or gap["from"] > supply["date_end"]
):
# fetch consumptions for each consumptions gap in valid periods
start = max([gap["from"] + timedelta(hours=1), supply["date_start"]])
end = min([gap["to"], supply["date_end"]])
_LOGGER.info(
"%s: requesting consumptions from %s to %s",
self._scups,
start.isoformat(),
end.isoformat(),
)
self.update_consumptions(
cups,
distributor_code,
start,
end,
"0",
point_type,
)
# update maximeter
_LOGGER.info(
"%s: missing maximeter: %s",
self._scups,
", ".join(
[
"from " + x["from"].isoformat() + " to " + x["to"].isoformat()
for x in miss_maxim
]
),
)
for gap in miss_maxim:
if not (date_to < supply["date_start"] or date_from > supply["date_end"]):
# fetch maximeter for each maximeter gap in valid periods
start = max(
[gap["from"], supply["date_start"] + relativedelta(months=1)]
)
end = min([gap["to"], supply["date_end"]])
start = min([start, end])
_LOGGER.info(
"%s: requesting maximeter from %s to %s",
self._scups,
start.isoformat(),
end.isoformat(),
)
self.update_maximeter(cups, distributor_code, start, end)
if self.enable_billing:
self._billing_service = BillingService(storage_dir=storage_dir_path)
miss_cons, miss_maxim = sort_and_filter(date_from, date_to)
_LOGGER.info(f"EdataHelper initialized for CUPS {self._scups}")
return True
@property
def datadis_connector(self) -> DatadisConnector:
"""Get the shared Datadis connector instance."""
return self._datadis_connector
def update_redata(
async def update(
self,
date_from: datetime = (datetime.today() - timedelta(days=30)).replace(
hour=0, minute=0
),
date_to: datetime = (datetime.today() + timedelta(days=2)).replace(
hour=0, minute=0
),
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
):
"""Fetch PVPC prices using REData API."""
"""Update all data and calculate summary attributes.
Args:
date_from: Start date for data updates
date_to: End date for data updates
incremental_update: Whether to update incrementally (deprecated, ignored)
"""
_LOGGER.info(
"%s: updating PVPC prices",
self._scups,
f"{self._scups}: Starting update from {date_from.date()} to {date_to.date()}"
)
self.data["pvpc"], missing = utils.extract_dt_ranges(
self.data["pvpc"],
date_from,
date_to,
gap_interval=timedelta(hours=1),
)
for gap in missing:
prices = []
gap["from"] = max(
(datetime.today() - timedelta(days=30)).replace(hour=0, minute=0),
gap["from"],
try:
# Step 1: Update supplies
_LOGGER.info(f"{self._scups}: Updating supplies")
supply_result = await self._supply_service.update_supplies(
authorized_nif=self._authorized_nif
)
while len(prices) == 0 and gap["from"] < gap["to"]:
prices = self.redata_api.get_realtime_prices(gap["from"], gap["to"])
gap["from"] = gap["from"] + timedelta(days=1)
self.data["pvpc"] = utils.extend_by_key(
self.data["pvpc"], prices, "datetime"
)
return True
def process_data(self, incremental_update: bool = True):
"""Process all raw data."""
self._incremental_update = incremental_update
for process_method in [
self.process_supplies,
self.process_contracts,
self.process_consumptions,
self.process_maximeter,
self.process_cost,
]:
try:
process_method()
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error("Unhandled exception while updating attributes")
_LOGGER.exception(ex)
for attribute in self.attributes:
if attribute in ATTRIBUTES and ATTRIBUTES[attribute] is not None:
self.attributes[attribute] = (
round(self.attributes[attribute], 2)
if self.attributes[attribute] is not None
else None
if not supply_result["success"]:
_LOGGER.error(
f"{self._scups}: Failed to update supplies: {supply_result.get('error', 'Unknown error')}"
)
return False
if not incremental_update:
dump_storage(self._cups, self.data, self._storage_dir)
# Validate that our CUPS exists
if not await self._supply_service.validate_cups(self._cups):
_LOGGER.error(f"{self._scups}: CUPS not found in account")
return False
def process_supplies(self):
"""Process supplies data."""
for i in self.data["supplies"]:
if i["cups"] == self._cups:
self.attributes["cups"] = self._cups
break
_LOGGER.info(f"{self._scups}: CUPS validated successfully")
def process_contracts(self):
"""Process contracts data."""
most_recent_date = datetime(1970, 1, 1)
for i in self.data["contracts"]:
if i["date_end"] > most_recent_date:
most_recent_date = i["date_end"]
self.attributes["contract_p1_kW"] = i.get("power_p1", None)
self.attributes["contract_p2_kW"] = i.get("power_p2", None)
break
# Get supply information
supply = await self._supply_service.get_supply_by_cups(self._cups)
if not supply:
_LOGGER.error(f"{self._scups}: Could not retrieve supply details")
return False
def process_consumptions(self):
"""Process consumptions data."""
if len(self.data["consumptions"]) > 0:
new_data_from = self._date_from
if self._incremental_update:
with contextlib.suppress(Exception):
new_data_from = self.data["consumptions_monthly_sum"][-1][
"datetime"
]
distributor_code = supply.distributor_code
point_type = supply.point_type
proc = ConsumptionProcessor(
{
"consumptions": [
x
for x in self.data["consumptions"]
if x["datetime"] >= new_data_from
],
"cycle_start_day": 1,
}
_LOGGER.info(
f"{self._scups}: Supply dates from {supply.date_start.date()} to {supply.date_end.date()}"
)
today_starts = datetime(
datetime.today().year,
datetime.today().month,
datetime.today().day,
0,
0,
0,
)
month_starts = datetime(
datetime.today().year, datetime.today().month, 1, 0, 0, 0
)
# Adjust date range to supply validity period
effective_start = max(date_from, supply.date_start)
effective_end = min(date_to, supply.date_end)
# append new data
self.data["consumptions_daily_sum"] = utils.extend_and_filter(
self.data["consumptions_daily_sum"],
proc.output["daily"],
"datetime",
self._date_from,
self._date_to,
# Step 2: Update contracts
_LOGGER.info(f"{self._scups}: Updating contracts")
contract_result = await self._contract_service.update_contracts(
cups=self._cups,
distributor_code=distributor_code,
authorized_nif=self._authorized_nif,
)
self.data["consumptions_monthly_sum"] = utils.extend_and_filter(
self.data["consumptions_monthly_sum"],
proc.output["monthly"],
"datetime",
self._date_from,
self._date_to,
)
yday = utils.get_by_key(
self.data["consumptions_daily_sum"],
"datetime",
today_starts - timedelta(days=1),
)
self.attributes["yesterday_kWh"] = (
yday.get("value_kWh", None) if yday is not None else None
)
if not contract_result["success"]:
_LOGGER.warning(
f"{self._scups}: Contract update failed: {contract_result.get('error', 'Unknown error')}"
)
for tariff in (1, 2, 3):
self.attributes[f"yesterday_p{tariff}_kWh"] = (
yday.get(f"value_p{tariff}_kWh", None) if yday is not None else None
# Step 3: Update consumptions in monthly chunks
_LOGGER.info(f"{self._scups}: Updating consumptions")
consumption_result = (
await self._consumption_service.update_consumption_range_by_months(
cups=self._cups,
distributor_code=distributor_code,
start_date=effective_start,
end_date=effective_end,
measurement_type="0",
point_type=point_type,
authorized_nif=self._authorized_nif,
)
self.attributes["yesterday_surplus_kWh"] = (
yday.get("surplus_kWh", None) if yday is not None else None
)
for tariff in (1, 2, 3):
self.attributes[f"yesterday_surplus_p{tariff}_kWh"] = (
yday.get(f"surplus_p{tariff}_kWh", None)
if yday is not None
else None
)
if not consumption_result["success"]:
_LOGGER.warning(f"{self._scups}: Consumption update failed")
self.attributes["yesterday_hours"] = (
yday.get("delta_h", None) if yday is not None else None
)
month = utils.get_by_key(
self.data["consumptions_monthly_sum"], "datetime", month_starts
)
self.attributes["month_kWh"] = (
month.get("value_kWh", None) if month is not None else None
)
self.attributes["month_surplus_kWh"] = (
month.get("surplus_kWh", None) if month is not None else None
)
self.attributes["month_days"] = (
month.get("delta_h", 0) / 24 if month is not None else None
)
self.attributes["month_daily_kWh"] = (
(
(self.attributes["month_kWh"] / self.attributes["month_days"])
if self.attributes["month_days"] > 0
else 0
# Step 4: Update maximeter data
_LOGGER.info(f"{self._scups}: Updating maximeter")
maximeter_result = (
await self._maximeter_service.update_maxpower_range_by_months(
cups=self._cups,
distributor_code=distributor_code,
start_date=effective_start,
end_date=effective_end,
authorized_nif=self._authorized_nif,
)
if month is not None
else None
)
for tariff in (1, 2, 3):
self.attributes[f"month_p{tariff}_kWh"] = (
month.get(f"value_p{tariff}_kWh", None)
if month is not None
else None
)
self.attributes[f"month_surplus_p{tariff}_kWh"] = (
month.get(f"surplus_p{tariff}_kWh", None)
if month is not None
else None
)
if not maximeter_result["success"]:
_LOGGER.warning(f"{self._scups}: Maximeter update failed")
last_month = utils.get_by_key(
self.data["consumptions_monthly_sum"],
"datetime",
(month_starts - relativedelta(months=1)),
)
self.attributes["last_month_kWh"] = (
last_month.get("value_kWh", None) if last_month is not None else None
)
self.attributes["last_month_surplus_kWh"] = (
last_month.get("surplus_kWh", None) if last_month is not None else None
)
self.attributes["last_month_days"] = (
last_month.get("delta_h", 0) / 24 if last_month is not None else None
)
self.attributes["last_month_daily_kWh"] = (
(
(
self.attributes["last_month_kWh"]
/ self.attributes["last_month_days"]
# Step 5: Update PVPC prices if needed
if self.enable_billing and self.is_pvpc:
_LOGGER.info(f"{self._scups}: Updating PVPC prices")
try:
pvpc_result = await self._billing_service.update_pvpc_prices(
start_date=effective_start,
end_date=effective_end,
is_ceuta_melilla=False, # Default to Peninsula
)
if self.attributes["last_month_days"] > 0
else 0
)
if last_month is not None
else None
)
for tariff in (1, 2, 3):
self.attributes[f"last_month_p{tariff}_kWh"] = (
last_month.get(f"value_p{tariff}_kWh", None)
if last_month is not None
else None
)
self.attributes[f"last_month_surplus_p{tariff}_kWh"] = (
last_month.get(f"surplus_p{tariff}_kWh", None)
if last_month is not None
else None
)
if len(self.data["consumptions"]) > 0:
self.attributes["last_registered_date"] = self.data["consumptions"][-1][
"datetime"
]
if not pvpc_result["success"]:
_LOGGER.warning(
f"{self._scups}: PVPC price update failed: {pvpc_result.get('error', 'Unknown error')}"
)
if len(self.data["consumptions_daily_sum"]) > 0:
last_day = self.data["consumptions_daily_sum"][-1]
self.attributes["last_registered_day_kWh"] = (
last_day.get("value_kWh", None)
if last_day is not None
else None
except Exception as e:
_LOGGER.warning(
f"{self._scups}: PVPC price update failed with exception: {str(e)}"
)
self.attributes["last_registered_day_surplus_kWh"] = (
last_day.get("surplus_kWh", None)
if last_day is not None
else None
# Step 6: Update billing costs if pricing rules are defined
if self.enable_billing and self.pricing_rules:
_LOGGER.info(f"{self._scups}: Updating billing costs")
try:
billing_result = await self._billing_service.update_missing_costs(
cups=self._cups,
pricing_rules=self.pricing_rules,
start_date=effective_start,
end_date=effective_end,
is_ceuta_melilla=False,
force_recalculate=False,
)
for tariff in (1, 2, 3):
self.attributes[f"last_registered_day_p{tariff}_kWh"] = (
last_day.get(f"value_p{tariff}_kWh", None)
if last_day is not None
else None
if not billing_result["success"]:
_LOGGER.warning(
f"{self._scups}: Billing cost update failed: {billing_result.get('error', 'Unknown error')}"
)
self.attributes[
f"last_registered_day_surplus_p{tariff}_kWh"
] = (
last_day.get(f"surplus_p{tariff}_kWh", None)
if last_day is not None
else None
)
self.attributes["last_registered_day_hours"] = (
last_day.get("delta_h", None) if last_day is not None else None
except Exception as e:
_LOGGER.warning(
f"{self._scups}: Billing cost update failed with exception: {str(e)}"
)
def process_maximeter(self):
"""Process maximeter data."""
if len(self.data["maximeter"]) > 0:
processor = MaximeterProcessor(self.data["maximeter"])
last_relative_year = processor.output["stats"]
self.attributes["max_power_kW"] = last_relative_year.get(
"value_max_kW", None
)
self.attributes["max_power_date"] = last_relative_year.get("date_max", None)
self.attributes["max_power_mean_kW"] = last_relative_year.get(
"value_mean_kW", None
)
self.attributes["max_power_90perc_kW"] = last_relative_year.get(
"value_tile90_kW", None
)
# Step 7: Calculate summary attributes
_LOGGER.info(f"{self._scups}: Calculating summary attributes")
await self._calculate_summary_attributes()
def process_cost(self):
"""Process costs."""
if self.enable_billing:
new_data_from = self._date_from
if self._incremental_update:
with contextlib.suppress(Exception):
new_data_from = self.data["cost_monthly_sum"][-1]["datetime"]
_LOGGER.info(f"{self._scups}: Update completed successfully")
return True
proc = BillingProcessor(
BillingInput(
contracts=self.data["contracts"],
consumptions=[
x
for x in self.data["consumptions"]
if x["datetime"] >= new_data_from
],
prices=(
[x for x in self.data["pvpc"] if x["datetime"] >= new_data_from]
if self.is_pvpc
else None
),
rules=self.pricing_rules,
)
)
month_starts = datetime(
datetime.today().year, datetime.today().month, 1, 0, 0, 0
)
except Exception as e:
_LOGGER.error(f"{self._scups}: Update failed with exception: {str(e)}")
return False
# append new data
hourly = proc.output["hourly"]
self.data["cost_hourly_sum"] = utils.extend_and_filter(
self.data["cost_hourly_sum"],
hourly,
"datetime",
self._date_from,
self._date_to,
)
async def _calculate_summary_attributes(self):
"""Calculate summary attributes from all services."""
daily = proc.output["daily"]
self.data["cost_daily_sum"] = utils.extend_and_filter(
self.data["cost_daily_sum"],
daily,
"datetime",
self._date_from,
self._date_to,
)
# Reset all attributes
for attr in ATTRIBUTES:
self.summary[attr] = None
monthly = proc.output["monthly"]
self.data["cost_monthly_sum"] = utils.extend_and_filter(
self.data["cost_monthly_sum"],
monthly,
"datetime",
self._date_from,
self._date_to,
try:
# Get supply summary
supply_summary = await self._supply_service.get_supply_summary(self._cups)
self.summary.update(supply_summary)
# Get contract summary
contract_summary = await self._contract_service.get_contract_summary(
self._cups
)
self.summary.update(contract_summary)
this_month = utils.get_by_key(
self.data["cost_monthly_sum"],
"datetime",
month_starts,
# Get consumption summary
consumption_summary = (
await self._consumption_service.get_consumption_summary(self._cups)
)
self.summary.update(consumption_summary)
last_month = utils.get_by_key(
self.data["cost_monthly_sum"],
"datetime",
(month_starts - relativedelta(months=1)),
# Get maximeter summary
maximeter_summary = await self._maximeter_service.get_maximeter_summary(
self._cups
)
self.summary.update(maximeter_summary)
if this_month is not None:
self.attributes["month_€"] = this_month.get("value_eur", None)
# Get billing summary if enabled
if self.enable_billing and self.pricing_rules and self._billing_service:
billing_summary = await self._billing_service.get_billing_summary(
cups=self._cups,
pricing_rules=self.pricing_rules,
is_ceuta_melilla=False,
)
self.summary.update(billing_summary)
if last_month is not None:
self.attributes["last_month_€"] = last_month.get("value_eur", None)
# Round numeric values to 2 decimal places for consistency
for key, value in self.summary.items():
if isinstance(value, float):
self.summary[key] = round(value, 2)
def reset(self):
"""Reset in-mem objects."""
_LOGGER.debug(f"{self._scups}: Summary attributes calculated successfully")
self.data = EdataData(
supplies=[],
contracts=[],
consumptions=[],
maximeter=[],
pvpc=[],
consumptions_daily_sum=[],
consumptions_monthly_sum=[],
cost_hourly_sum=[],
cost_daily_sum=[],
cost_monthly_sum=[],
)
for attr in ATTRIBUTES:
self.attributes[attr] = None
self.last_update = {x: datetime(1970, 1, 1) for x in self.data}
except Exception as e:
_LOGGER.error(
f"{self._scups}: Error calculating summary attributes: {str(e)}"
)

@@ -0,13 +1,22 @@

# Configuration files
include pyproject.toml
include Makefile
include .python-version
# Include the README
# Documentation
include *.md
# Include the license file
include LICENSE.txt
# License
include LICENSE
# Include setup.py
include setup.py
# Include tests (setuptools auto-includes package files)
recursive-include edata/tests *.py
# Include the data files
recursive-include data *
# Exclude compiled files and caches
global-exclude *.pyc
global-exclude *.pyo
global-exclude __pycache__
global-exclude .DS_Store
prune build
prune dist
prune *.egg-info
+131
-53

@@ -1,31 +0,49 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: e-data
Version: 1.2.22
Version: 2.0.0b1
Summary: Python library for managing spanish energy data from various web providers
Home-page: https://github.com/uvejota/python-edata
Author: VMG
Author-email: vmayorg@outlook.es
License: GPLv3
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Programming Language :: Python
Author-email: VMG <vmayorg@outlook.es>
License: GPL-3.0-or-later
Project-URL: Homepage, https://github.com/uvejota/python-edata
Project-URL: Repository, https://github.com/uvejota/python-edata
Project-URL: Issues, https://github.com/uvejota/python-edata/issues
Keywords: energy,data,spain,electricity,consumption
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.6.0
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Utilities
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: dateparser>=1.1.2
Requires-Dist: freezegun>=1.2.1
Requires-Dist: holidays>=0.14.2
Requires-Dist: pytest>=7.1.2
Requires-Dist: python_dateutil>=2.8.2
Requires-Dist: python-dateutil>=2.8.2
Requires-Dist: requests>=2.28.1
Requires-Dist: voluptuous>=0.13.1
Requires-Dist: Jinja2>=3.1.2
Requires-Dist: pydantic>=2.0.0
Requires-Dist: sqlmodel>=0.0.24
Requires-Dist: aiosqlite>=0.20.0
Requires-Dist: sqlalchemy[asyncio]>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.1.2; extra == "dev"
Requires-Dist: freezegun>=1.2.1; extra == "dev"
Requires-Dist: twine>=4.0.0; extra == "dev"
Requires-Dist: build>=0.10.0; extra == "dev"
Requires-Dist: black>=22.0.0; extra == "dev"
Requires-Dist: flake8>=5.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Provides-Extra: test
Requires-Dist: pytest>=7.1.2; extra == "test"
Requires-Dist: freezegun>=1.2.1; extra == "test"
Requires-Dist: pytest-cov>=4.0.0; extra == "test"
Requires-Dist: pytest-asyncio>=0.20.0; extra == "test"
Dynamic: license-file
[![Downloads](https://pepy.tech/badge/e-data)](https://pepy.tech/project/e-data)

@@ -52,3 +70,3 @@ [![Downloads](https://pepy.tech/badge/e-data/month)](https://pepy.tech/project/e-data)

``` bash
pip install -r requirements.txt
make install-dev
```

@@ -58,7 +76,8 @@

El paquete consta de tres módulos diferenciados:
El paquete utiliza una **arquitectura basada en servicios** con los siguientes módulos:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
* **Modelos** (módulo `models`), que definen las estructuras de datos usando Pydantic v2 para validación robusta. Incluye modelos para suministros, contratos, consumos, maxímetro, precios y base de datos.
* **Servicios** (módulo `services`), que implementan la lógica de negocio para cada dominio: gestión de suministros, contratos, consumos, maxímetro, facturación y base de datos SQLite.
* **Helper principal** (`helpers.py`), que orquesta todos los servicios y proporciona una interfaz simplificada. El `EdataHelper` permite descargar y procesar datos automáticamente, calculando más de 40 atributos de resumen.

@@ -70,16 +89,49 @@ Estos módulos corresponden a la siguiente estructura del paquete:

· __init__.py
· const.py # Constantes y definiciones de atributos
· utils.py # Utilidades generales
· helpers.py # Helper principal (EdataHelper)
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· datadis.py # Conector API Datadis
· redata.py # Conector API REData (PVPC)
· models/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
· base.py # Modelos base con Pydantic
· supply.py # Modelo de suministros
· contract.py # Modelo de contratos
· consumption.py # Modelo de consumos
· maximeter.py # Modelo de maxímetro
· pricing.py # Modelo de reglas de precios
· database.py # Modelos para SQLite
· services/
· __init__.py
· database.py # Servicio de base de datos SQLite
· supply.py # Gestión de suministros
· contract.py # Gestión de contratos
· consumption.py # Gestión de consumos
· maximeter.py # Gestión de maxímetro
· billing.py # Gestión de facturación
· scripts/
· __init__.py
· dump.py # Script interactivo de descarga
```
## Script interactivo
El paquete incluye un script interactivo que facilita la descarga inicial de datos:
```bash
# Ejecutar el script interactivo
python -m edata.scripts.dump
# Con directorio personalizado
python -m edata.scripts.dump --storage-dir /ruta/personalizada
```
Este script te guiará paso a paso para:
1. Configurar credenciales de Datadis
2. Seleccionar el suministro a procesar
3. Definir el rango de fechas
4. Descargar y almacenar todos los datos
## Ejemplo de uso

@@ -94,11 +146,12 @@

``` python
import asyncio
from datetime import datetime
import json
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
# importamos el modelo de reglas de tarificación
from edata.models.pricing import PricingRules
# importamos el helper principal
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
# importamos utilidades para serialización
from edata import utils

@@ -119,23 +172,48 @@ # Preparar reglas de tarificación (si se quiere)

# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
async def main():
# Instanciar el helper
# 'datadis_authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'storage_dir_path' permite especificar dónde almacenar la base de datos local
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
storage_dir_path=None, # por defecto usa ./edata.storage/
)
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
# Solicitar actualización de todo el histórico (los datos se almacenan en SQLite)
success = await edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
if success:
# Imprimir atributos resumen calculados
print("Atributos calculados:")
for key, value in edata.attributes.items():
if value is not None:
print(f" {key}: {value}")
# Los datos se almacenan automáticamente en la base de datos SQLite
# ubicada en edata.storage/edata.db (por defecto)
print(f"\nDatos almacenados en la base de datos local")
else:
print("Error durante la actualización de datos")
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
# Ejecutar el ejemplo
if __name__ == "__main__":
asyncio.run(main())
```
# Imprimir atributos
print(edata.attributes)
```
## Contribuir
Este proyecto está en desarrollo activo. Las contribuciones son bienvenidas:
1. Fork del repositorio
2. Crear una rama para tu feature: `git checkout -b feature/nueva-funcionalidad`
3. Commit de tus cambios: `git commit -am 'Añadir nueva funcionalidad'`
4. Push a la rama: `git push origin feature/nueva-funcionalidad`
5. Crear un Pull Request
## Licencia
Este proyecto está licenciado bajo GPLv3. Ver el archivo [LICENSE](LICENSE) para más detalles.
+97
-37

@@ -22,3 +22,3 @@ [![Downloads](https://pepy.tech/badge/e-data)](https://pepy.tech/project/e-data)

``` bash
pip install -r requirements.txt
make install-dev
```

@@ -28,7 +28,8 @@

El paquete consta de tres módulos diferenciados:
El paquete utiliza una **arquitectura basada en servicios** con los siguientes módulos:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
* **Modelos** (módulo `models`), que definen las estructuras de datos usando Pydantic v2 para validación robusta. Incluye modelos para suministros, contratos, consumos, maxímetro, precios y base de datos.
* **Servicios** (módulo `services`), que implementan la lógica de negocio para cada dominio: gestión de suministros, contratos, consumos, maxímetro, facturación y base de datos SQLite.
* **Helper principal** (`helpers.py`), que orquesta todos los servicios y proporciona una interfaz simplificada. El `EdataHelper` permite descargar y procesar datos automáticamente, calculando más de 40 atributos de resumen.

@@ -40,16 +41,49 @@ Estos módulos corresponden a la siguiente estructura del paquete:

· __init__.py
· const.py # Constantes y definiciones de atributos
· utils.py # Utilidades generales
· helpers.py # Helper principal (EdataHelper)
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· datadis.py # Conector API Datadis
· redata.py # Conector API REData (PVPC)
· models/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
· base.py # Modelos base con Pydantic
· supply.py # Modelo de suministros
· contract.py # Modelo de contratos
· consumption.py # Modelo de consumos
· maximeter.py # Modelo de maxímetro
· pricing.py # Modelo de reglas de precios
· database.py # Modelos para SQLite
· services/
· __init__.py
· database.py # Servicio de base de datos SQLite
· supply.py # Gestión de suministros
· contract.py # Gestión de contratos
· consumption.py # Gestión de consumos
· maximeter.py # Gestión de maxímetro
· billing.py # Gestión de facturación
· scripts/
· __init__.py
· dump.py # Script interactivo de descarga
```
## Script interactivo
El paquete incluye un script interactivo que facilita la descarga inicial de datos:
```bash
# Ejecutar el script interactivo
python -m edata.scripts.dump
# Con directorio personalizado
python -m edata.scripts.dump --storage-dir /ruta/personalizada
```
Este script te guiará paso a paso para:
1. Configurar credenciales de Datadis
2. Seleccionar el suministro a procesar
3. Definir el rango de fechas
4. Descargar y almacenar todos los datos
## Ejemplo de uso

@@ -64,11 +98,12 @@

``` python
import asyncio
from datetime import datetime
import json
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
# importamos el modelo de reglas de tarificación
from edata.models.pricing import PricingRules
# importamos el helper principal
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
# importamos utilidades para serialización
from edata import utils

@@ -89,23 +124,48 @@ # Preparar reglas de tarificación (si se quiere)

# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
async def main():
# Instanciar el helper
# 'datadis_authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'storage_dir_path' permite especificar dónde almacenar la base de datos local
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
storage_dir_path=None, # por defecto usa ./edata.storage/
)
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
# Solicitar actualización de todo el histórico (los datos se almacenan en SQLite)
success = await edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
if success:
# Imprimir atributos resumen calculados
print("Atributos calculados:")
for key, value in edata.attributes.items():
if value is not None:
print(f" {key}: {value}")
# Los datos se almacenan automáticamente en la base de datos SQLite
# ubicada en edata.storage/edata.db (por defecto)
print(f"\nDatos almacenados en la base de datos local")
else:
print("Error durante la actualización de datos")
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
# Ejecutar el ejemplo
if __name__ == "__main__":
asyncio.run(main())
```
# Imprimir atributos
print(edata.attributes)
```
## Contribuir
Este proyecto está en desarrollo activo. Las contribuciones son bienvenidas:
1. Fork del repositorio
2. Crear una rama para tu feature: `git checkout -b feature/nueva-funcionalidad`
3. Commit de tus cambios: `git commit -am 'Añadir nueva funcionalidad'`
4. Push a la rama: `git push origin feature/nueva-funcionalidad`
5. Crear un Pull Request
## Licencia
Este proyecto está licenciado bajo GPLv3. Ver el archivo [LICENSE](LICENSE) para más detalles.

@@ -1,3 +0,1 @@

[easy_install]
[egg_info]

@@ -4,0 +2,0 @@ tag_build =

Metadata-Version: 2.1
Name: e-data
Version: 1.2.22
Summary: Python library for managing spanish energy data from various web providers
Home-page: https://github.com/uvejota/python-edata
Author: VMG
Author-email: vmayorg@outlook.es
License: GPLv3
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.6.0
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: dateparser>=1.1.2
Requires-Dist: freezegun>=1.2.1
Requires-Dist: holidays>=0.14.2
Requires-Dist: pytest>=7.1.2
Requires-Dist: python_dateutil>=2.8.2
Requires-Dist: requests>=2.28.1
Requires-Dist: voluptuous>=0.13.1
Requires-Dist: Jinja2>=3.1.2
[![Downloads](https://pepy.tech/badge/e-data)](https://pepy.tech/project/e-data)
[![Downloads](https://pepy.tech/badge/e-data/month)](https://pepy.tech/project/e-data)
[![Downloads](https://pepy.tech/badge/e-data/week)](https://pepy.tech/project/e-data)
# python-edata
Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).
_**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_
## Instalación
Puedes instalar la última versión estable mediante:
``` bash
pip install e-data
```
Si quieres probar la versión `dev` o contribuir a su desarrollo, clona este repositorio e instala manualmente las dependencias:
``` bash
pip install -r requirements.txt
```
## Estructura
El paquete consta de tres módulos diferenciados:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
Estos módulos corresponden a la siguiente estructura del paquete:
```
edata/
· __init__.py
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
```
## Ejemplo de uso
Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones:
* No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto)
* El username suele ser el NIF del titular
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura.
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular.
``` python
from datetime import datetime
import json
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
# Preparar reglas de tarificación (si se quiere)
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
# podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc)
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
)
# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
# Imprimir atributos
print(edata.attributes)
```
dateparser>=1.1.2
freezegun>=1.2.1
holidays>=0.14.2
pytest>=7.1.2
python_dateutil>=2.8.2
requests>=2.28.1
voluptuous>=0.13.1
Jinja2>=3.1.2
"""Datadis API connector.
To fetch data from datadis.es private API.
There a few issues that are workarounded:
- You have to wait 24h between two identical requests.
- Datadis server does not like ranges greater than 1 month.
"""
import contextlib
from datetime import datetime, timedelta
import glob
import hashlib
import json
import logging
import os
import tempfile
from dateutil.relativedelta import relativedelta
import requests
from ..definitions import ConsumptionData, ContractData, MaxPowerData, SupplyData
from ..processors import utils
_LOGGER = logging.getLogger(__name__)
# Token-related constants
URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login"
TOKEN_USERNAME = "username"
TOKEN_PASSWD = "password"
# Supplies-related constants
URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies"
GET_SUPPLIES_MANDATORY_FIELDS = [
"cups",
"validDateFrom",
"validDateTo",
"pointType",
"distributorCode",
]
# Contracts-related constants
URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail"
GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [
"startDate",
"endDate",
"marketer",
"contractedPowerkW",
]
# Consumption-related constants
URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data"
GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [
"time",
"date",
"consumptionKWh",
"obtainMethod",
]
MAX_CONSUMPTIONS_MONTHS = (
1 # max consumptions in a single request (fixed to 1 due to datadis limitations)
)
# Maximeter-related constants
URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power"
GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"]
# Timing constants
TIMEOUT = 3 * 60 # requests timeout
QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again...
# Cache-related constants
RECENT_CACHE_SUBDIR = "cache"
def migrate_storage(storage_dir):
"""Migrate storage from older versions."""
with contextlib.suppress(FileNotFoundError):
os.remove(os.path.join(storage_dir, "edata_recent_queries.json"))
os.remove(os.path.join(storage_dir, "edata_recent_queries_cache.json"))
class DatadisConnector:
"""A Datadis private API connector."""
def __init__(
self,
username: str,
password: str,
enable_smart_fetch: bool = True,
storage_path: str | None = None,
) -> None:
"""DatadisConnector constructor."""
# initialize some things
self._usr = username
self._pwd = password
self._session = requests.Session()
self._token = {}
self._smart_fetch = enable_smart_fetch
self._recent_queries = {}
self._recent_cache = {}
self._warned_queries = []
if storage_path is not None:
self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR)
migrate_storage(storage_path)
else:
self._recent_cache_dir = os.path.join(
tempfile.gettempdir(), RECENT_CACHE_SUBDIR
)
os.makedirs(self._recent_cache_dir, exist_ok=True)
def _update_recent_queries(self, query: str, data: dict | None = None) -> None:
"""Cache a successful query to avoid exceeding query limits."""
# identify the query by a md5 hash
hash_query = hashlib.md5(query.encode()).hexdigest()
# remove expired cache files
with contextlib.suppress(FileNotFoundError):
for cache_file in glob.glob(os.path.join(self._recent_cache_dir, "*")):
if (
datetime.now()
- datetime.fromtimestamp(os.path.getmtime(cache_file))
) > QUERY_LIMIT:
_LOGGER.info("Removing cache item '%s'", cache_file)
os.remove(cache_file)
# dump current cache to disk
try:
with open(
os.path.join(self._recent_cache_dir, hash_query),
"w",
encoding="utf8",
) as dst_file:
json.dump(data, dst_file)
_LOGGER.info("Updating cache item '%s'", hash_query)
except Exception as e:
_LOGGER.warning("Unknown error while updating cache: %s", e)
def _is_recent_query(self, query: str) -> bool:
"""Check if a query has been done recently to avoid exceeding query limits."""
hash_query = hashlib.md5(query.encode()).hexdigest()
cache_file = os.path.join(self._recent_cache_dir, hash_query)
return (
os.path.exists(cache_file)
and (datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file)))
< QUERY_LIMIT
)
def _get_cache_for_query(self, query: str) -> dict | None:
"""Return cached response for a query."""
hash_query = hashlib.md5(query.encode()).hexdigest()
cache_file = os.path.join(self._recent_cache_dir, hash_query)
try:
with open(cache_file, encoding="utf8") as cache:
return json.load(cache)
except (FileNotFoundError, json.decoder.JSONDecodeError):
return None
def _get_token(self):
"""Private method that fetches a new token if needed."""
_LOGGER.info("No token found, fetching a new one")
is_valid_token = False
self._session = requests.Session()
response = self._session.post(
URL_TOKEN,
data={
TOKEN_USERNAME: self._usr.encode("utf-8"),
TOKEN_PASSWD: self._pwd.encode("utf-8"),
},
)
if response.status_code == 200:
# store token encoded
self._token["encoded"] = response.text
# prepare session authorization bearer
self._session.headers["Authorization"] = "Bearer " + self._token["encoded"]
is_valid_token = True
else:
_LOGGER.error("Unknown error while retrieving token, got %s", response.text)
return is_valid_token
def login(self):
"""Test to login with provided credentials."""
return self._get_token()
def _get(
self,
url: str,
request_data: dict | None = None,
refresh_token: bool = False,
is_retry: bool = False,
ignore_recent_queries: bool = False,
):
"""Get request for Datadis API."""
if request_data is None:
data = {}
else:
data = request_data
# refresh token if needed (recursive approach)
is_valid_token = False
response = []
if refresh_token:
is_valid_token = self._get_token()
if is_valid_token or not refresh_token:
# build get parameters
params = "?" if len(data) > 0 else ""
for param in data:
key = param
value = data[param]
params = params + f"{key}={value}&"
anonym_params = "?" if len(data) > 0 else ""
# build anonymized params for logging
for anonym_param in data:
key = anonym_param
if key == "cups":
value = "xxxx" + data[anonym_param][-5:]
elif key == "authorizedNif":
value = "xxxx"
else:
value = data[anonym_param]
anonym_params = anonym_params + f"{key}={value}&"
# check if query is already in cache
if not ignore_recent_queries and self._is_recent_query(url + params):
_cache = self._get_cache_for_query(url + params)
if _cache is not None:
_LOGGER.info(
"Returning cached response for '%s'", url + anonym_params
)
return _cache
return []
# run the query
try:
_LOGGER.info("GET %s", url + anonym_params)
reply = self._session.get(
url + params,
headers={"Accept-Encoding": "identity"},
timeout=TIMEOUT,
)
except requests.exceptions.Timeout:
_LOGGER.warning("Timeout at %s", url + anonym_params)
return []
# eval response
if reply.status_code == 200:
# we're here if reply seems valid
_LOGGER.info("Got 200 OK")
if reply.json():
response = reply.json()
if not ignore_recent_queries:
self._update_recent_queries(url + params, response)
else:
# this mostly happens when datadis provides an empty response
_LOGGER.info("Got an empty response")
if not ignore_recent_queries:
self._update_recent_queries(url + params)
elif reply.status_code == 401 and not refresh_token:
# we're here if we were unauthorized so we will refresh the token
response = self._get(
url,
request_data=data,
refresh_token=True,
ignore_recent_queries=ignore_recent_queries,
)
elif reply.status_code == 429:
# we're here if we exceeded datadis API rates (24h)
_LOGGER.warning(
"Got status code '%s' with message '%s'",
reply.status_code,
reply.text,
)
if not ignore_recent_queries:
self._update_recent_queries(url + params)
elif is_retry:
# otherwise, if this was a retried request... warn the user
if (url + params) not in self._warned_queries:
_LOGGER.warning(
"Got status code '%s' with message '%s'. %s. %s",
reply.status_code,
reply.text,
"Query temporary disabled",
"Future 500 code errors for this query will be silenced until restart",
)
if not ignore_recent_queries:
self._update_recent_queries(url + params)
self._warned_queries.append(url + params)
else:
# finally, retry since an unexpected error took place (mostly 500 errors - server fault)
response = self._get(
url,
request_data,
is_retry=True,
ignore_recent_queries=ignore_recent_queries,
)
return response
def get_supplies(self, authorized_nif: str | None = None):
"""Datadis 'get_supplies' query."""
data = {}
# If authorized_nif is provided, we have to include it as parameter
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
# Request the resource
response = self._get(
URL_GET_SUPPLIES, request_data=data, ignore_recent_queries=True
)
# Response is a list of serialized supplies.
# We will iter through them to transform them into SupplyData objects
supplies = []
# Build tomorrow Y/m/d string since we will use it as the 'date_end' of
# active supplies
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
# check data integrity (maybe this can be supressed if datadis proves to be reliable)
if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS):
supplies.append(
SupplyData(
cups=i["cups"], # the supply identifier
date_start=datetime.strptime(
(
i["validDateFrom"]
if i["validDateFrom"] != ""
else "1970/01/01"
),
"%Y/%m/%d",
), # start date of the supply. 1970/01/01 if unset.
date_end=datetime.strptime(
(
i["validDateTo"]
if i["validDateTo"] != ""
else tomorrow_str
),
"%Y/%m/%d",
), # end date of the supply, tomorrow if unset
# the following parameters are not crucial, so they can be none
address=i.get("address", None),
postal_code=i.get("postalCode", None),
province=i.get("province", None),
municipality=i.get("municipality", None),
distributor=i.get("distributor", None),
# these two are mandatory, we will use them to fetch contracts data
pointType=i["pointType"],
distributorCode=i["distributorCode"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching supplies data, got %s",
response,
)
return supplies
def get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
):
"""Datadis get_contract_detail query."""
data = {"cups": cups, "distributorCode": distributor_code}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = self._get(
URL_GET_CONTRACT_DETAIL, request_data=data, ignore_recent_queries=True
)
contracts = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS):
contracts.append(
ContractData(
date_start=datetime.strptime(
i["startDate"] if i["startDate"] != "" else "1970/01/01",
"%Y/%m/%d",
),
date_end=datetime.strptime(
i["endDate"] if i["endDate"] != "" else tomorrow_str,
"%Y/%m/%d",
),
marketer=i["marketer"],
distributorCode=distributor_code,
power_p1=(
i["contractedPowerkW"][0]
if isinstance(i["contractedPowerkW"], list)
else None
),
power_p2=(
i["contractedPowerkW"][1]
if (len(i["contractedPowerkW"]) > 1)
else None
),
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching contracts data, got %s",
response,
)
return contracts
def get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
is_smart_fetch: bool = False,
):
"""Datadis get_consumption_data query."""
if self._smart_fetch and not is_smart_fetch:
_start = start_date
consumptions = []
while _start < end_date:
_end = min(
_start + relativedelta(months=MAX_CONSUMPTIONS_MONTHS), end_date
)
consumptions = utils.extend_by_key(
consumptions,
self.get_consumption_data(
cups,
distributor_code,
_start,
_end,
measurement_type,
point_type,
authorized_nif,
is_smart_fetch=True,
),
"datetime",
)
_start = _end
return consumptions
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
"measurementType": measurement_type,
"pointType": point_type,
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = self._get(URL_GET_CONSUMPTION_DATA, request_data=data)
consumptions = []
for i in response:
if "consumptionKWh" in i:
if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS):
hour = str(int(i["time"].split(":")[0]) - 1)
date_as_dt = datetime.strptime(
f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M"
)
if not (start_date <= date_as_dt <= end_date):
continue # skip element if dt is out of range
_surplus = i.get("surplusEnergyKWh", 0)
if _surplus is None:
_surplus = 0
consumptions.append(
ConsumptionData(
datetime=date_as_dt,
delta_h=1,
value_kWh=i["consumptionKWh"],
surplus_kWh=_surplus,
real=i["obtainMethod"] == "Real",
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching consumption data, got %s",
response,
)
return consumptions
def get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
):
"""Datadis get_max_power query."""
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = self._get(URL_GET_MAX_POWER, request_data=data)
maxpower_values = []
for i in response:
if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS):
maxpower_values.append(
MaxPowerData(
datetime=datetime.strptime(
f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M"
),
value_kW=i["maxPower"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching maximeter data, got %s",
response,
)
return maxpower_values
"""A REData API connector"""
import datetime as dt
import logging
import requests
from dateutil import parser
from ..definitions import PricingData
_LOGGER = logging.getLogger(__name__)
REQUESTS_TIMEOUT = 15
URL_REALTIME_PRICES = (
"https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real"
"?time_trunc=hour"
"&geo_ids={geo_id}"
"&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}"
)
class REDataConnector:
"""Main class for REData connector"""
def __init__(
self,
) -> None:
"""Init method for REDataConnector"""
def get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month"""
url = URL_REALTIME_PRICES.format(
geo_id=8744 if is_ceuta_melilla else 8741,
start=dt_from,
end=dt_to,
)
data = []
res = requests.get(url, timeout=REQUESTS_TIMEOUT)
if res.status_code == 200 and res.json():
res_json = res.json()
try:
res_list = res_json["included"][0]["attributes"]["values"]
except IndexError:
_LOGGER.error(
"%s returned a malformed response: %s ",
url,
res.text,
)
return data
for element in res_list:
data.append(
PricingData(
datetime=parser.parse(element["datetime"]).replace(tzinfo=None),
value_eur_kWh=element["value"] / 1000,
delta_h=1,
)
)
else:
_LOGGER.error(
"%s returned %s with code %s",
url,
res.text,
res.status_code,
)
return data
"""Definitions for data structures."""
import voluptuous as vol
import datetime as dt
from typing import TypedDict
ATTRIBUTES = {
"cups": None,
"contract_p1_kW": "kW",
"contract_p2_kW": "kW",
"yesterday_kWh": "kWh",
"yesterday_hours": "h",
"yesterday_p1_kWh": "kWh",
"yesterday_p2_kWh": "kWh",
"yesterday_p3_kWh": "kWh",
"yesterday_surplus_kWh": "kWh",
"yesterday_surplus_p1_kWh": "kWh",
"yesterday_surplus_p2_kWh": "kWh",
"yesterday_surplus_p3_kWh": "kWh",
"last_registered_date": None,
"last_registered_day_kWh": "kWh",
"last_registered_day_hours": "h",
"last_registered_day_p1_kWh": "kWh",
"last_registered_day_p2_kWh": "kWh",
"last_registered_day_p3_kWh": "kWh",
"last_registered_day_surplus_kWh": "kWh",
"last_registered_day_surplus_p1_kWh": "kWh",
"last_registered_day_surplus_p2_kWh": "kWh",
"last_registered_day_surplus_p3_kWh": "kWh",
"month_kWh": "kWh",
"month_daily_kWh": "kWh",
"month_days": "d",
"month_p1_kWh": "kWh",
"month_p2_kWh": "kWh",
"month_p3_kWh": "kWh",
"month_surplus_kWh": "kWh",
"month_surplus_p1_kWh": "kWh",
"month_surplus_p2_kWh": "kWh",
"month_surplus_p3_kWh": "kWh",
"month_€": "€",
"last_month_kWh": "kWh",
"last_month_daily_kWh": "kWh",
"last_month_days": "d",
"last_month_p1_kWh": "kWh",
"last_month_p2_kWh": "kWh",
"last_month_p3_kWh": "kWh",
"last_month_surplus_kWh": "kWh",
"last_month_surplus_p1_kWh": "kWh",
"last_month_surplus_p2_kWh": "kWh",
"last_month_surplus_p3_kWh": "kWh",
"last_month_€": "€",
"max_power_kW": "kW",
"max_power_date": None,
"max_power_mean_kW": "kW",
"max_power_90perc_kW": "kW",
}
# Energy term with taxes
DEFAULT_BILLING_ENERGY_FORMULA = "electricity_tax * iva_tax * kwh_eur * kwh"
# Power term with taxes
DEFAULT_BILLING_POWER_FORMULA = "electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24"
# Others term with taxes
DEFAULT_BILLING_OTHERS_FORMULA = "iva_tax * meter_month_eur / 30 / 24"
# Surplus term with taxes
DEFAULT_BILLING_SURPLUS_FORMULA = (
"electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur"
)
# Sum energy and power terms, and substract surplus until 0.
# An alternative would be "[(energy_term + power_term - surplus_term), 0]|max + others_term"
DEFAULT_BILLING_MAIN_FORMULA = "energy_term + power_term + others_term"
class SupplyData(TypedDict):
"""Data structure to represent a supply."""
cups: str
date_start: dt.datetime
date_end: dt.datetime
address: str | None
postal_code: str | None
province: str | None
municipality: str | None
distributor: str | None
pointType: int
distributorCode: str
SupplySchema = vol.Schema(
{
vol.Required("cups"): str,
vol.Required("date_start"): dt.datetime,
vol.Required("date_end"): dt.datetime,
vol.Required("address"): vol.Union(str, None),
vol.Required("postal_code"): vol.Union(str, None),
vol.Required("province"): vol.Union(str, None),
vol.Required("municipality"): vol.Union(str, None),
vol.Required("distributor"): vol.Union(str, None),
vol.Required("pointType"): int,
vol.Required("distributorCode"): str,
}
)
class ContractData(TypedDict):
"""Data structure to represent a contract."""
date_start: dt.datetime
date_end: dt.datetime
marketer: str
distributorCode: str
power_p1: float | None
power_p2: float | None
ContractSchema = vol.Schema(
{
vol.Required("date_start"): dt.datetime,
vol.Required("date_end"): dt.datetime,
vol.Required("marketer"): str,
vol.Required("distributorCode"): str,
vol.Required("power_p1"): vol.Union(vol.Coerce(float), None),
vol.Required("power_p2"): vol.Union(vol.Coerce(float), None),
}
)
class ConsumptionData(TypedDict):
"""Data structure to represent a consumption."""
datetime: dt.datetime
delta_h: float
value_kWh: float
surplus_kWh: float
real: bool
ConsumptionSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("delta_h"): vol.Coerce(float),
vol.Required("value_kWh"): vol.Coerce(float),
vol.Optional("surplus_kWh", default=0): vol.Coerce(float),
vol.Required("real"): bool,
}
)
class MaxPowerData(TypedDict):
"""Data structure to represent a MaxPower."""
datetime: dt.datetime
value_kW: float
MaxPowerSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_kW"): vol.Coerce(float),
}
)
class PricingData(TypedDict):
"""Data structure to represent pricing data."""
datetime: dt.datetime
value_eur_kWh: float
delta_h: float
PricingSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_eur_kWh"): vol.Coerce(float),
vol.Required("delta_h"): vol.Coerce(float),
}
)
class PricingRules(TypedDict):
"""Data structure to represent custom pricing rules."""
p1_kw_year_eur: float
p2_kw_year_eur: float
p1_kwh_eur: float | None
p2_kwh_eur: float | None
p3_kwh_eur: float | None
surplus_p1_kwh_eur: float | None
surplus_p2_kwh_eur: float | None
surplus_p3_kwh_eur: float | None
meter_month_eur: float
market_kw_year_eur: float
electricity_tax: float
iva_tax: float
energy_formula: str | None
power_formula: str | None
others_formula: str | None
surplus_formula: str | None
cycle_start_day: int | None
PricingRulesSchema = vol.Schema(
{
vol.Required("p1_kw_year_eur"): vol.Coerce(float),
vol.Required("p2_kw_year_eur"): vol.Coerce(float),
vol.Optional("p1_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("p2_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("p3_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("surplus_p1_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Optional("surplus_p2_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Optional("surplus_p3_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Required("meter_month_eur"): vol.Coerce(float),
vol.Required("market_kw_year_eur"): vol.Coerce(float),
vol.Required("electricity_tax"): vol.Coerce(float),
vol.Required("iva_tax"): vol.Coerce(float),
vol.Optional("energy_formula", default=DEFAULT_BILLING_ENERGY_FORMULA): str,
vol.Optional("power_formula", default=DEFAULT_BILLING_POWER_FORMULA): str,
vol.Optional("others_formula", default=DEFAULT_BILLING_OTHERS_FORMULA): str,
vol.Optional("surplus_formula", default=DEFAULT_BILLING_SURPLUS_FORMULA): str,
vol.Optional("main_formula", default=DEFAULT_BILLING_MAIN_FORMULA): str,
vol.Optional("cycle_start_day", default=1): vol.Range(1, 30),
}
)
DEFAULT_PVPC_RULES = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
)
class ConsumptionAggData(TypedDict):
"""A dict holding a Consumption item."""
datetime: dt.datetime
value_kWh: float
value_p1_kWh: float
value_p2_kWh: float
value_p3_kWh: float
surplus_kWh: float
surplus_p1_kWh: float
surplus_p2_kWh: float
surplus_p3_kWh: float
delta_h: float
ConsumptionAggSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_kWh"): vol.Coerce(float),
vol.Required("value_p1_kWh"): vol.Coerce(float),
vol.Required("value_p2_kWh"): vol.Coerce(float),
vol.Required("value_p3_kWh"): vol.Coerce(float),
vol.Optional("surplus_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p1_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p2_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p3_kWh", default=0): vol.Coerce(float),
vol.Required("delta_h"): vol.Coerce(float),
}
)
class PricingAggData(TypedDict):
"""A dict holding a Billing item."""
datetime: dt.datetime
value_eur: float
energy_term: float
power_term: float
others_term: float
surplus_term: float
delta_h: float
PricingAggSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_eur"): vol.Coerce(float),
vol.Required("energy_term"): vol.Coerce(float),
vol.Required("power_term"): vol.Coerce(float),
vol.Required("others_term"): vol.Coerce(float),
vol.Optional("surplus_term", default=0): vol.Coerce(float),
vol.Optional("delta_h", default=1): vol.Coerce(float),
},
)
class EdataData(TypedDict):
"""A Typed Dict to handle Edata Aggregated Data."""
supplies: list[SupplyData]
contracts: list[ContractData]
consumptions: list[ConsumptionData]
maximeter: list[MaxPowerData]
pvpc: list[PricingData]
consumptions_daily_sum: list[ConsumptionAggData]
consumptions_monthly_sum: list[ConsumptionAggData]
cost_hourly_sum: list[PricingAggData]
cost_daily_sum: list[PricingAggData]
cost_monthly_sum: list[PricingAggData]
EdataSchema = vol.Schema(
{
vol.Required("supplies"): [SupplySchema],
vol.Required("contracts"): [ContractSchema],
vol.Required("consumptions"): [ConsumptionSchema],
vol.Required("maximeter"): [MaxPowerSchema],
vol.Optional("pvpc", default=[]): [PricingSchema],
vol.Optional("consumptions_daily_sum", []): [ConsumptionAggSchema],
vol.Optional("consumptions_monthly_sum", []): [ConsumptionAggSchema],
vol.Optional("cost_hourly_sum", default=[]): [PricingAggSchema],
vol.Optional("cost_daily_sum", default=[]): [PricingAggSchema],
vol.Optional("cost_monthly_sum", default=[]): [PricingAggSchema],
}
)
"""Base definitions for processors."""
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any
class Processor(ABC):
"""A base class for data processors."""
_LABEL = "Processor"
def __init__(self, input_data: Any, auto: bool = True) -> None:
"""Init method."""
self._input = deepcopy(input_data)
self._output = None
if auto:
self.do_process()
@abstractmethod
def do_process(self):
"""Process method."""
@property
def output(self):
"""An output property."""
return deepcopy(self._output)
"""Billing data processors."""
import contextlib
from datetime import datetime, timedelta
import logging
from typing import Optional, TypedDict
from jinja2 import Environment
import voluptuous
from ..definitions import (
ConsumptionData,
ConsumptionSchema,
ContractData,
ContractSchema,
PricingAggData,
PricingData,
PricingRules,
PricingRulesSchema,
PricingSchema,
)
from ..processors import utils
from ..processors.base import Processor
_LOGGER = logging.getLogger(__name__)
class BillingOutput(TypedDict):
"""A dict holding BillingProcessor output property."""
hourly: list[PricingAggData]
daily: list[PricingAggData]
monthly: list[PricingAggData]
class BillingInput(TypedDict):
"""A dict holding BillingProcessor input data."""
contracts: list[ContractData]
consumptions: list[ConsumptionData]
prices: Optional[list[PricingData]]
rules: PricingRules
class BillingProcessor(Processor):
"""A billing processor for edata."""
def do_process(self):
"""Process billing and get hourly/daily/monthly metrics."""
self._output = BillingOutput(hourly=[], daily=[], monthly=[])
_schema = voluptuous.Schema(
{
voluptuous.Required("contracts"): [ContractSchema],
voluptuous.Required("consumptions"): [ConsumptionSchema],
voluptuous.Optional("prices", default=None): voluptuous.Union(
[voluptuous.Union(PricingSchema)], None
),
voluptuous.Required("rules"): PricingRulesSchema,
}
)
self._input = _schema(self._input)
self._cycle_offset = self._input["rules"]["cycle_start_day"] - 1
# joint data by datetime
_data = {
x["datetime"]: {
"datetime": x["datetime"],
"kwh": x["value_kWh"],
"surplus_kwh": x["surplus_kWh"] if x["surplus_kWh"] is not None else 0,
}
for x in self._input["consumptions"]
}
for contract in self._input["contracts"]:
start = contract["date_start"]
end = contract["date_end"]
finish = False
while not finish:
if start in _data:
_data[start]["p1_kw"] = contract["power_p1"]
_data[start]["p2_kw"] = contract["power_p2"]
start = start + timedelta(hours=1)
finish = not (end > start)
if self._input["prices"]:
for x in self._input["prices"]:
start = x["datetime"]
if start in _data:
_data[start]["kwh_eur"] = x["value_eur_kWh"]
env = Environment()
energy_expr = env.compile_expression(
f'({self._input["rules"]["energy_formula"]})|float'
)
power_expr = env.compile_expression(
f'({self._input["rules"]["power_formula"]})|float'
)
others_expr = env.compile_expression(
f'({self._input["rules"]["others_formula"]})|float'
)
surplus_expr = env.compile_expression(
f'({self._input["rules"]["surplus_formula"]})|float'
)
main_expr = env.compile_expression(
f'({self._input["rules"]["main_formula"]})|float'
)
_data = sorted([_data[x] for x in _data], key=lambda x: x["datetime"])
hourly = []
for x in _data:
x.update(self._input["rules"])
tariff = utils.get_pvpc_tariff(x["datetime"])
if "kwh_eur" not in x:
if tariff == "p1":
x["kwh_eur"] = x["p1_kwh_eur"]
elif tariff == "p2":
x["kwh_eur"] = x["p2_kwh_eur"]
elif tariff == "p3":
x["kwh_eur"] = x["p3_kwh_eur"]
if x["kwh_eur"] is None:
continue
if tariff == "p1":
x["surplus_kwh_eur"] = x["surplus_p1_kwh_eur"]
elif tariff == "p2":
x["surplus_kwh_eur"] = x["surplus_p2_kwh_eur"]
elif tariff == "p3":
x["surplus_kwh_eur"] = x["surplus_p3_kwh_eur"]
_energy_term = 0
_power_term = 0
_others_term = 0
_surplus_term = 0
with contextlib.suppress(Exception):
_energy_term = round(energy_expr(**x), 6)
_power_term = round(power_expr(**x), 6)
_others_term = round(others_expr(**x), 6)
_surplus_term = round(surplus_expr(**x), 6)
new_item = PricingAggData(
datetime=x["datetime"],
energy_term=_energy_term,
power_term=_power_term,
others_term=_others_term,
surplus_term=_surplus_term,
value_eur=0,
delta_h=1,
)
hourly.append(new_item)
self._output["hourly"] = hourly
last_day_dt = None
last_month_dt = None
for hour in hourly:
curr_hour_dt: datetime = hour["datetime"]
curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0)
curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace(
day=1
)
if last_day_dt is None or curr_day_dt != last_day_dt:
self._output["daily"].append(
PricingAggData(
datetime=curr_day_dt,
energy_term=hour["energy_term"],
power_term=hour["power_term"],
others_term=hour["others_term"],
surplus_term=hour["surplus_term"],
value_eur=hour["value_eur"],
delta_h=hour["delta_h"],
)
)
else:
self._output["daily"][-1]["energy_term"] += hour["energy_term"]
self._output["daily"][-1]["power_term"] += hour["power_term"]
self._output["daily"][-1]["others_term"] += hour["others_term"]
self._output["daily"][-1]["surplus_term"] += hour["surplus_term"]
self._output["daily"][-1]["delta_h"] += hour["delta_h"]
self._output["daily"][-1]["value_eur"] += hour["value_eur"]
if last_month_dt is None or curr_month_dt != last_month_dt:
self._output["monthly"].append(
PricingAggData(
datetime=curr_month_dt,
energy_term=hour["energy_term"],
power_term=hour["power_term"],
others_term=hour["others_term"],
surplus_term=hour["surplus_term"],
value_eur=hour["value_eur"],
delta_h=hour["delta_h"],
)
)
else:
self._output["monthly"][-1]["energy_term"] += hour["energy_term"]
self._output["monthly"][-1]["power_term"] += hour["power_term"]
self._output["monthly"][-1]["others_term"] += hour["others_term"]
self._output["monthly"][-1]["surplus_term"] += hour["surplus_term"]
self._output["monthly"][-1]["value_eur"] += hour["value_eur"]
self._output["monthly"][-1]["delta_h"] += hour["delta_h"]
last_day_dt = curr_day_dt
last_month_dt = curr_month_dt
for item in self._output:
for cost in self._output[item]:
cost["value_eur"] = round(main_expr(**cost, **self._input["rules"]), 6)
cost["energy_term"] = round(cost["energy_term"], 6)
cost["power_term"] = round(cost["power_term"], 6)
cost["others_term"] = round(cost["others_term"], 6)
cost["surplus_term"] = round(cost["surplus_term"], 6)
"""Consumption data processors."""
import logging
from collections.abc import Iterable
from typing import TypedDict
from datetime import datetime, timedelta
import voluptuous
from ..definitions import ConsumptionAggData, ConsumptionSchema
from . import utils
from .base import Processor
_LOGGER = logging.getLogger(__name__)
class ConsumptionOutput(TypedDict):
"""A dict holding ConsumptionProcessor output property."""
daily: Iterable[ConsumptionAggData]
monthly: Iterable[ConsumptionAggData]
class ConsumptionProcessor(Processor):
"""A consumptions processor."""
def do_process(self):
"""Calculate daily and monthly consumption stats."""
self._output = ConsumptionOutput(daily=[], monthly=[])
last_day_dt = None
last_month_dt = None
_schema = voluptuous.Schema(
{
voluptuous.Required("consumptions"): [ConsumptionSchema],
voluptuous.Optional("cycle_start_day", default=1): voluptuous.Range(
1, 30
),
}
)
self._input = _schema(self._input)
self._cycle_offset = self._input["cycle_start_day"] - 1
for consumption in self._input["consumptions"]:
curr_hour_dt: datetime = consumption["datetime"]
curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0)
curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace(
day=1
)
tariff = utils.get_pvpc_tariff(curr_hour_dt)
kwh = consumption["value_kWh"]
surplus_kwh = consumption["surplus_kWh"]
delta_h = consumption["delta_h"]
kwh_by_tariff = [0, 0, 0]
surplus_kwh_by_tariff = [0, 0, 0]
match tariff:
case "p1":
kwh_by_tariff[0] = kwh
surplus_kwh_by_tariff[0] = surplus_kwh
case "p2":
kwh_by_tariff[1] = kwh
surplus_kwh_by_tariff[1] = surplus_kwh
case "p3":
kwh_by_tariff[2] = kwh
surplus_kwh_by_tariff[2] = surplus_kwh
if last_day_dt is None or curr_day_dt != last_day_dt:
self._output["daily"].append(
ConsumptionAggData(
datetime=curr_day_dt,
value_kWh=kwh,
delta_h=delta_h,
value_p1_kWh=kwh_by_tariff[0],
value_p2_kWh=kwh_by_tariff[1],
value_p3_kWh=kwh_by_tariff[2],
surplus_kWh=surplus_kwh,
surplus_p1_kWh=surplus_kwh_by_tariff[0],
surplus_p2_kWh=surplus_kwh_by_tariff[1],
surplus_p3_kWh=surplus_kwh_by_tariff[2],
)
)
else:
self._output["daily"][-1]["value_kWh"] += kwh
self._output["daily"][-1]["value_p1_kWh"] += kwh_by_tariff[0]
self._output["daily"][-1]["value_p2_kWh"] += kwh_by_tariff[1]
self._output["daily"][-1]["value_p3_kWh"] += kwh_by_tariff[2]
self._output["daily"][-1]["surplus_kWh"] += surplus_kwh
self._output["daily"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[0]
self._output["daily"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[1]
self._output["daily"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[2]
self._output["daily"][-1]["delta_h"] += delta_h
if last_month_dt is None or curr_month_dt != last_month_dt:
self._output["monthly"].append(
ConsumptionAggData(
datetime=curr_month_dt,
value_kWh=kwh,
delta_h=delta_h,
value_p1_kWh=kwh_by_tariff[0],
value_p2_kWh=kwh_by_tariff[1],
value_p3_kWh=kwh_by_tariff[2],
surplus_kWh=surplus_kwh,
surplus_p1_kWh=surplus_kwh_by_tariff[0],
surplus_p2_kWh=surplus_kwh_by_tariff[1],
surplus_p3_kWh=surplus_kwh_by_tariff[2],
)
)
else:
self._output["monthly"][-1]["value_kWh"] += kwh
self._output["monthly"][-1]["value_p1_kWh"] += kwh_by_tariff[0]
self._output["monthly"][-1]["value_p2_kWh"] += kwh_by_tariff[1]
self._output["monthly"][-1]["value_p3_kWh"] += kwh_by_tariff[2]
self._output["monthly"][-1]["surplus_kWh"] += surplus_kwh
self._output["monthly"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[
0
]
self._output["monthly"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[
1
]
self._output["monthly"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[
2
]
self._output["monthly"][-1]["delta_h"] += delta_h
last_day_dt = curr_day_dt
last_month_dt = curr_month_dt
# Round to two decimals
for item in self._output:
for cons in self._output[item]:
for key in cons:
if isinstance(cons[key], float):
cons[key] = round(cons[key], 2)
"""Maximeter data processors."""
import logging
from datetime import datetime
from typing import TypedDict
from dateparser import parse
import voluptuous
from edata.definitions import MaxPowerSchema
from edata.processors import utils
from .base import Processor
_LOGGER = logging.getLogger(__name__)
class MaximeterStats(TypedDict):
"""A dict holding MaximeterProcessor stats."""
value_max_kW: float
date_max: datetime
value_mean_kW: float
value_tile90_kW: float
class MaximeterOutput(TypedDict):
"""A dict holding MaximeterProcessor output property."""
stats: MaximeterStats
class MaximeterProcessor(Processor):
"""A processor for Maximeter data."""
def do_process(self):
"""Calculate maximeter stats."""
self._output = MaximeterOutput(stats={})
_schema = voluptuous.Schema([MaxPowerSchema])
self._input = _schema(self._input)
_values = [x["value_kW"] for x in self._input]
_max_kW = max(_values)
_dt_max_kW = parse(str(self._input[_values.index(_max_kW)]["datetime"]))
_mean_kW = sum(_values) / len(_values)
_tile90_kW = utils.percentile(_values, 0.9)
self._output["stats"] = MaximeterOutput(
value_max_kW=round(_max_kW, 2),
date_max=_dt_max_kW,
value_mean_kW=round(_mean_kW, 2),
value_tile90_kW=round(_tile90_kW, 2),
)
"""Generic utilities for processing data."""
import json
import logging
from copy import deepcopy
from datetime import date, datetime, timedelta
from json import JSONEncoder
import holidays
import math
import functools
import contextlib
_LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
HOURS_P1 = [10, 11, 12, 13, 18, 19, 20, 21]
HOURS_P2 = [8, 9, 14, 15, 16, 17, 22, 23]
WEEKDAYS_P3 = [5, 6]
def is_empty(lst):
"""Check if a list is empty."""
return len(lst) == 0
def extract_dt_ranges(lst, dt_from, dt_to, gap_interval=timedelta(hours=1)):
"""Filter a list of dicts between two datetimes."""
new_lst = []
missing = []
oldest_dt = None
newest_dt = None
last_dt = None
if len(lst) > 0:
sorted_lst = sorted(lst, key=lambda i: i["datetime"])
last_dt = dt_from
for i in sorted_lst:
if dt_from <= i["datetime"] <= dt_to:
if (i["datetime"] - last_dt) > gap_interval:
missing.append({"from": last_dt, "to": i["datetime"]})
if i.get("value_kWh", 1) > 0:
if oldest_dt is None or i["datetime"] < oldest_dt:
oldest_dt = i["datetime"]
if newest_dt is None or i["datetime"] > newest_dt:
newest_dt = i["datetime"]
if i["datetime"] != last_dt: # remove duplicates
new_lst.append(i)
last_dt = i["datetime"]
if dt_to > last_dt:
missing.append({"from": last_dt, "to": dt_to})
_LOGGER.debug("found data from %s to %s", oldest_dt, newest_dt)
else:
missing.append({"from": dt_from, "to": dt_to})
return new_lst, missing
def extend_by_key(old_lst, new_lst, key):
"""Extend a list of dicts by key."""
lst = deepcopy(old_lst)
temp_list = []
for new_element in new_lst:
for old_element in lst:
if new_element[key] == old_element[key]:
for i in old_element:
old_element[i] = new_element[i]
break
else:
temp_list.append(new_element)
lst.extend(temp_list)
return lst
def extend_and_filter(old_lst, new_lst, key, dt_from, dt_to):
data = extend_by_key(old_lst, new_lst, key)
data, _ = extract_dt_ranges(
data,
dt_from,
dt_to,
gap_interval=timedelta(days=365), # trick
)
return data
def get_by_key(lst, key, value):
"""Obtain an element of a list of dicts by key=value."""
for i in lst:
if i[key] == value:
return i
return None
def get_pvpc_tariff(a_datetime):
"""Evals the PVPC tariff for a given datetime."""
hdays = holidays.country_holidays("ES")
hour = a_datetime.hour
weekday = a_datetime.weekday()
if weekday in WEEKDAYS_P3 or a_datetime.date() in hdays:
return "p3"
elif hour in HOURS_P1:
return "p1"
elif hour in HOURS_P2:
return "p2"
else:
return "p3"
def serialize_dict(data: dict) -> dict:
"""Serialize dicts as json."""
class DateTimeEncoder(JSONEncoder):
"""Replace datetime objects with ISO strings."""
def default(self, o):
if isinstance(o, (date, datetime)):
return o.isoformat()
return json.loads(json.dumps(data, cls=DateTimeEncoder))
def deserialize_dict(serialized_dict: dict) -> dict:
"""Deserializes a json replacing ISOTIME strings into datetime."""
def datetime_parser(json_dict):
"""Parse JSON while converting ISO strings into datetime objects."""
for key, value in json_dict.items():
if "date" in key:
with contextlib.suppress(Exception):
json_dict[key] = datetime.fromisoformat(value)
return json_dict
return json.loads(json.dumps(serialized_dict), object_hook=datetime_parser)
def percentile(N, percent, key=lambda x: x):
"""Find the percentile of a list of values."""
if not N:
return None
k = (len(N) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(N[int(k)])
d0 = key(N[int(f)]) * (c - k)
d1 = key(N[int(c)]) * (k - f)
return d0 + d1
import json
import logging
import os
from .processors import utils
from . import const as const
from . import definitions as defs
_LOGGER = logging.getLogger(__name__)
DEFAULT_STORAGE_DIR = os.getenv("HOME")
RECENT_CACHE_FILENAME = "edata_{id}.json"
compile_storage_id = lambda cups: cups.lower()
def check_storage_integrity(data: defs.EdataData):
"""Check if an EdataData object follows a schema."""
return defs.EdataSchema(data)
def load_storage(cups: str, storage_dir: str | None = None):
"""Load EdataData storage from its config dir."""
if storage_dir is None:
storage_dir = DEFAULT_STORAGE_DIR
_subdir = os.path.join(storage_dir, const.PROG_NAME)
_recent_cache = os.path.join(
_subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups))
)
os.makedirs(_subdir, exist_ok=True)
with open(_recent_cache, encoding="utf-8") as f:
return check_storage_integrity(utils.deserialize_dict(json.load(f)))
def dump_storage(cups: str, storage: defs.EdataData, storage_dir: str | None = None):
"""Update EdataData storage."""
if storage_dir is None:
storage_dir = DEFAULT_STORAGE_DIR
_subdir = os.path.join(storage_dir, const.PROG_NAME)
_recent_cache = os.path.join(
_subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups))
)
os.makedirs(_subdir, exist_ok=True)
with open(_recent_cache, "w", encoding="utf-8") as f:
json.dump(utils.serialize_dict(check_storage_integrity(storage)), f)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note: To use the 'upload' functionality of this file, you must:
# $ pipenv install twine --dev
import io
import os
import sys
from shutil import rmtree
from setuptools import Command, find_packages, setup
# Package meta-data.
NAME = "e-data"
DESCRIPTION = (
"Python library for managing spanish energy data from various web providers"
)
URL = "https://github.com/uvejota/python-edata"
EMAIL = "vmayorg@outlook.es"
AUTHOR = "VMG"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "1.2.22"
# What packages are required for this module to be executed?
REQUIRED = [
"dateparser>=1.1.2",
"freezegun>=1.2.1",
"holidays>=0.14.2",
"pytest>=7.1.2",
"python_dateutil>=2.8.2",
"requests>=2.28.1",
"voluptuous>=0.13.1",
"Jinja2>=3.1.2",
]
# What packages are optional?
EXTRAS = {
# 'fancy feature': ['django'],
}
# The rest you shouldn't have to touch too much :)
# ------------------------------------------------
# Except, perhaps the License and Trove Classifiers!
# If you do change the License, remember to change the Trove Classifier for that!
here = os.path.abspath(os.path.dirname(__file__))
# Import the README and use it as the long-description.
# Note: this will only work if 'README.md' is present in your MANIFEST.in file!
try:
with io.open(os.path.join(here, "README.md"), encoding="utf-8") as f:
long_description = "\n" + f.read()
except FileNotFoundError:
long_description = DESCRIPTION
# Load the package's __version__.py module as a dictionary.
about = {}
if not VERSION:
project_slug = NAME.lower().replace("-", "_").replace(" ", "_")
with open(os.path.join(here, project_slug, "__version__.py")) as f:
exec(f.read(), about)
else:
about["__version__"] = VERSION
class UploadCommand(Command):
"""Support setup.py upload."""
description = "Build and publish the package."
user_options = []
@staticmethod
def status(s):
"""Prints things in bold."""
print("\033[1m{0}\033[0m".format(s))
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
try:
self.status("Removing previous builds…")
rmtree(os.path.join(here, "dist"))
except OSError:
pass
self.status("Building Source and Wheel (universal) distribution…")
os.system("{0} setup.py sdist bdist_wheel --universal".format(sys.executable))
self.status("Uploading the package to PyPI via Twine…")
os.system("twine upload dist/*")
self.status("Pushing git tags…")
os.system("git tag v{0}".format(about["__version__"]))
os.system("git push --tags")
sys.exit()
# Where the magic happens:
setup(
name=NAME,
version=about["__version__"],
description=DESCRIPTION,
long_description=long_description,
long_description_content_type="text/markdown",
author=AUTHOR,
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url=URL,
packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]),
# If your package is a single module, use this instead of 'packages':
py_modules=[],
# entry_points={
# 'console_scripts': ['mycli=mymodule:cli'],
# },
install_requires=REQUIRED,
extras_require=EXTRAS,
include_package_data=True,
license="GPLv3",
classifiers=[
# Trove classifiers
# Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
],
# $ setup.py publish support.
cmdclass={
"upload": UploadCommand,
},
)