e-data
Advanced tools
| """Datadis API connector. | ||
| To fetch data from datadis.es private API. | ||
| There a few issues that are workarounded: | ||
| - You have to wait 24h between two identical requests. | ||
| - Datadis server does not like ranges greater than 1 month. | ||
| """ | ||
| import asyncio | ||
| import hashlib | ||
| import logging | ||
| import os | ||
| import tempfile | ||
| from datetime import datetime, timedelta | ||
| import aiohttp | ||
| import diskcache | ||
| from dateutil.relativedelta import relativedelta | ||
| from edata import utils | ||
| from edata.models import Consumption, Contract, MaxPower, Supply | ||
| _LOGGER = logging.getLogger(__name__) | ||
| # Request timeout constant | ||
| REQUESTS_TIMEOUT = 30 | ||
| # Token-related constants | ||
| URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login" | ||
| TOKEN_USERNAME = "username" | ||
| TOKEN_PASSWD = "password" | ||
| # Supplies-related constants | ||
| URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies" | ||
| GET_SUPPLIES_MANDATORY_FIELDS = [ | ||
| "cups", | ||
| "validDateFrom", | ||
| "validDateTo", | ||
| "pointType", | ||
| "distributorCode", | ||
| ] | ||
| # Contracts-related constants | ||
| URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail" | ||
| GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [ | ||
| "startDate", | ||
| "endDate", | ||
| "marketer", | ||
| "contractedPowerkW", | ||
| ] | ||
| # Consumption-related constants | ||
| URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data" | ||
| GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [ | ||
| "time", | ||
| "date", | ||
| "consumptionKWh", | ||
| "obtainMethod", | ||
| ] | ||
| MAX_CONSUMPTIONS_MONTHS = ( | ||
| 1 # max consumptions in a single request (fixed to 1 due to datadis limitations) | ||
| ) | ||
| # Maximeter-related constants | ||
| URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power" | ||
| GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"] | ||
| # Timing constants | ||
| TIMEOUT = 3 * 60 # requests timeout | ||
| QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again... | ||
| # Cache-related constants | ||
| RECENT_CACHE_SUBDIR = "cache" | ||
| class DatadisConnector: | ||
| """A Datadis private API connector.""" | ||
| def __init__( | ||
| self, | ||
| username: str, | ||
| password: str, | ||
| enable_smart_fetch: bool = True, | ||
| storage_path: str | None = None, | ||
| ) -> None: | ||
| """DatadisConnector constructor.""" | ||
| # initialize some things | ||
| self._usr = username | ||
| self._pwd = password | ||
| self._token = {} | ||
| self._smart_fetch = enable_smart_fetch | ||
| self._warned_queries = [] | ||
| if storage_path is not None: | ||
| self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR) | ||
| else: | ||
| self._recent_cache_dir = os.path.join( | ||
| tempfile.gettempdir(), RECENT_CACHE_SUBDIR | ||
| ) | ||
| os.makedirs(self._recent_cache_dir, exist_ok=True) | ||
| # Initialize diskcache for persistent caching | ||
| self._cache = diskcache.Cache( | ||
| self._recent_cache_dir, | ||
| size_limit=100 * 1024 * 1024, # 100MB limit | ||
| eviction_policy="least-recently-used", | ||
| ) | ||
| async def login(self): | ||
| """Test to login with provided credentials.""" | ||
| return await self._get_token() | ||
| async def get_supplies(self, authorized_nif: str | None = None) -> list[Supply]: | ||
| """Datadis 'get_supplies' query (async version).""" | ||
| data = {} | ||
| # If authorized_nif is provided, we have to include it as parameter | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| # Request the resource using get method | ||
| response = await self._get( | ||
| URL_GET_SUPPLIES, request_data=data, ignore_recent_queries=True | ||
| ) | ||
| # Response is a list of serialized supplies. | ||
| # We will iter through them to transform them into Supply objects | ||
| supplies = [] | ||
| # Build tomorrow Y/m/d string since we will use it as the 'date_end' of | ||
| # active supplies | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| # check data integrity (maybe this can be supressed if datadis proves to be reliable) | ||
| if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS): | ||
| supplies.append( | ||
| Supply( | ||
| cups=i["cups"], # the supply identifier | ||
| date_start=datetime.strptime( | ||
| ( | ||
| i["validDateFrom"] | ||
| if i["validDateFrom"] != "" | ||
| else "1970/01/01" | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), # start date of the supply. 1970/01/01 if unset. | ||
| date_end=datetime.strptime( | ||
| ( | ||
| i["validDateTo"] | ||
| if i["validDateTo"] != "" | ||
| else tomorrow_str | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), # end date of the supply, tomorrow if unset | ||
| # the following parameters are not crucial, so they can be none | ||
| address=i.get("address", None), | ||
| postal_code=i.get("postalCode", None), | ||
| province=i.get("province", None), | ||
| municipality=i.get("municipality", None), | ||
| distributor=i.get("distributor", None), | ||
| # these two are mandatory, we will use them to fetch contracts data | ||
| point_type=i["pointType"], | ||
| distributor_code=i["distributorCode"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching supplies data, got %s", | ||
| response, | ||
| ) | ||
| return supplies | ||
| async def _get( | ||
| self, | ||
| url: str, | ||
| request_data: dict | None = None, | ||
| refresh_token: bool = False, | ||
| is_retry: bool = False, | ||
| ignore_recent_queries: bool = False, | ||
| ): | ||
| """Get request for Datadis API (async version).""" | ||
| if request_data is None: | ||
| data = {} | ||
| else: | ||
| data = request_data | ||
| # build get parameters | ||
| params = "?" if len(data) > 0 else "" | ||
| for param in data: | ||
| key = param | ||
| value = data[param] | ||
| params = params + f"{key}={value}&" | ||
| anonym_params = "?" if len(data) > 0 else "" | ||
| # build anonymized params for logging | ||
| for anonym_param in data: | ||
| key = anonym_param | ||
| if key == "cups": | ||
| value = "xxxx" + data[anonym_param][-5:] | ||
| elif key == "authorizedNif": | ||
| value = "xxxx" | ||
| else: | ||
| value = data[anonym_param] | ||
| anonym_params = anonym_params + f"{key}={value}&" | ||
| # Check diskcache first (unless ignoring cache) | ||
| if not ignore_recent_queries: | ||
| cache_data = { | ||
| "url": url, | ||
| "request_data": request_data, | ||
| "refresh_token": refresh_token, | ||
| "is_retry": is_retry, | ||
| } | ||
| cache_key = hashlib.sha256(str(cache_data).encode()).hexdigest() | ||
| try: | ||
| # Run cache get operation in thread to avoid blocking | ||
| cached_result = await asyncio.to_thread(self._cache.get, cache_key) | ||
| if cached_result is not None and isinstance( | ||
| cached_result, (list, dict) | ||
| ): | ||
| _LOGGER.info( | ||
| "Returning cached response for '%s'", url + anonym_params | ||
| ) | ||
| return cached_result | ||
| except Exception as e: | ||
| _LOGGER.warning("Error reading cache: %s", e) | ||
| # refresh token if needed (recursive approach) | ||
| is_valid_token = False | ||
| response = [] | ||
| if refresh_token: | ||
| is_valid_token = await self._get_token() | ||
| if is_valid_token or not refresh_token: | ||
| # run the query | ||
| timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| _LOGGER.info("GET %s", url + anonym_params) | ||
| headers = {"Accept-Encoding": "identity"} | ||
| # Ensure we have a token | ||
| if not self._token.get("encoded"): | ||
| await self._get_token() | ||
| headers["Authorization"] = f"Bearer {self._token['encoded']}" | ||
| async with session.get(url + params, headers=headers) as reply: | ||
| # eval response | ||
| if reply.status == 200: | ||
| # we're here if reply seems valid | ||
| _LOGGER.info("Got 200 OK") | ||
| try: | ||
| response_json = await reply.json(content_type=None) | ||
| if response_json: | ||
| response = response_json | ||
| # Store in diskcache with 24h TTL | ||
| if not ignore_recent_queries and isinstance( | ||
| response, (list, dict) | ||
| ): | ||
| try: | ||
| cache_data = { | ||
| "url": url, | ||
| "request_data": request_data, | ||
| "refresh_token": refresh_token, | ||
| "is_retry": is_retry, | ||
| } | ||
| cache_key = hashlib.sha256( | ||
| str(cache_data).encode() | ||
| ).hexdigest() | ||
| ttl_seconds = int( | ||
| QUERY_LIMIT.total_seconds() | ||
| ) | ||
| # Run cache set operation in thread to avoid blocking | ||
| await asyncio.to_thread( | ||
| self._cache.set, | ||
| cache_key, | ||
| response, | ||
| expire=ttl_seconds, | ||
| ) | ||
| _LOGGER.info( | ||
| "Cached response for %s with TTL %d seconds", | ||
| url, | ||
| ttl_seconds, | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.warning( | ||
| "Error storing in cache: %s", e | ||
| ) | ||
| else: | ||
| # this mostly happens when datadis provides an empty response | ||
| _LOGGER.info("Got an empty response") | ||
| except Exception as e: | ||
| # Handle non-JSON responses | ||
| _LOGGER.info("Got an empty or non-JSON response") | ||
| _LOGGER.exception(e) | ||
| elif reply.status == 401 and not refresh_token: | ||
| # we're here if we were unauthorized so we will refresh the token | ||
| response = await self._get( | ||
| url, | ||
| request_data=data, | ||
| refresh_token=True, | ||
| ignore_recent_queries=ignore_recent_queries, | ||
| ) | ||
| elif reply.status == 429: | ||
| # we're here if we exceeded datadis API rates (24h) | ||
| _LOGGER.warning( | ||
| "Got status code '%s' with message '%s'", | ||
| reply.status, | ||
| await reply.text(), | ||
| ) | ||
| elif is_retry: | ||
| # otherwise, if this was a retried request... warn the user | ||
| if (url + params) not in self._warned_queries: | ||
| _LOGGER.warning( | ||
| "Got status code '%s' with message '%s'. %s. %s", | ||
| reply.status, | ||
| await reply.text(), | ||
| "Query temporary disabled", | ||
| "Future 500 code errors for this query will be silenced until restart", | ||
| ) | ||
| self._warned_queries.append(url + params) | ||
| else: | ||
| # finally, retry since an unexpected error took place (mostly 500 errors - server fault) | ||
| response = await self._get( | ||
| url, | ||
| request_data, | ||
| is_retry=True, | ||
| ignore_recent_queries=ignore_recent_queries, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| _LOGGER.warning("Timeout at %s", url + anonym_params) | ||
| return [] | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| "Error during async request to %s: %s", url + anonym_params, e | ||
| ) | ||
| return [] | ||
| return response | ||
| async def get_contract_detail( | ||
| self, cups: str, distributor_code: str, authorized_nif: str | None = None | ||
| ) -> list[Contract]: | ||
| """Datadis get_contract_detail query (async version).""" | ||
| data = {"cups": cups, "distributorCode": distributor_code} | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._get( | ||
| URL_GET_CONTRACT_DETAIL, request_data=data, ignore_recent_queries=True | ||
| ) | ||
| contracts = [] | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS): | ||
| contracts.append( | ||
| Contract( | ||
| date_start=datetime.strptime( | ||
| i["startDate"] if i["startDate"] != "" else "1970/01/01", | ||
| "%Y/%m/%d", | ||
| ), | ||
| date_end=datetime.strptime( | ||
| i["endDate"] if i["endDate"] != "" else tomorrow_str, | ||
| "%Y/%m/%d", | ||
| ), | ||
| marketer=i["marketer"], | ||
| distributor_code=distributor_code, | ||
| power_p1=( | ||
| i["contractedPowerkW"][0] | ||
| if isinstance(i["contractedPowerkW"], list) | ||
| else None | ||
| ), | ||
| power_p2=( | ||
| i["contractedPowerkW"][1] | ||
| if (len(i["contractedPowerkW"]) > 1) | ||
| else None | ||
| ), | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching contracts data, got %s", | ||
| response, | ||
| ) | ||
| return contracts | ||
| async def get_consumption_data( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| authorized_nif: str | None = None, | ||
| is_smart_fetch: bool = False, | ||
| ) -> list[Consumption]: | ||
| """Datadis get_consumption_data query (async version).""" | ||
| if self._smart_fetch and not is_smart_fetch: | ||
| _start = start_date | ||
| consumptions_dicts = [] | ||
| while _start < end_date: | ||
| _end = min( | ||
| _start + relativedelta(months=MAX_CONSUMPTIONS_MONTHS), end_date | ||
| ) | ||
| batch_consumptions = await self.get_consumption_data( | ||
| cups, | ||
| distributor_code, | ||
| _start, | ||
| _end, | ||
| measurement_type, | ||
| point_type, | ||
| authorized_nif, | ||
| is_smart_fetch=True, | ||
| ) | ||
| # Convert to dicts for extend_by_key function | ||
| batch_dicts = [c.model_dump() for c in batch_consumptions] | ||
| consumptions_dicts = utils.extend_by_key( | ||
| consumptions_dicts, | ||
| batch_dicts, | ||
| "datetime", | ||
| ) | ||
| _start = _end | ||
| # Convert back to Pydantic models | ||
| return [Consumption(**c) for c in consumptions_dicts] | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| "measurementType": measurement_type, | ||
| "pointType": point_type, | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._get(URL_GET_CONSUMPTION_DATA, request_data=data) | ||
| consumptions = [] | ||
| for i in response: | ||
| if "consumptionKWh" in i: | ||
| if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS): | ||
| hour = str(int(i["time"].split(":")[0]) - 1) | ||
| date_as_dt = datetime.strptime( | ||
| f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M" | ||
| ) | ||
| if not (start_date <= date_as_dt <= end_date): | ||
| continue # skip element if dt is out of range | ||
| _surplus = i.get("surplusEnergyKWh", 0) | ||
| if _surplus is None: | ||
| _surplus = 0 | ||
| consumptions.append( | ||
| Consumption( | ||
| datetime=date_as_dt, | ||
| delta_h=1, | ||
| value_kwh=i["consumptionKWh"], | ||
| surplus_kwh=_surplus, | ||
| real=i["obtainMethod"] == "Real", | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching consumption data, got %s", | ||
| response, | ||
| ) | ||
| return consumptions | ||
| async def get_max_power( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: str | None = None, | ||
| ) -> list[MaxPower]: | ||
| """Datadis get_max_power query (async version).""" | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._get(URL_GET_MAX_POWER, request_data=data) | ||
| maxpower_values = [] | ||
| for i in response: | ||
| if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS): | ||
| maxpower_values.append( | ||
| MaxPower( | ||
| datetime=datetime.strptime( | ||
| f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M" | ||
| ), | ||
| value_kw=i["maxPower"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching maximeter data, got %s", | ||
| response, | ||
| ) | ||
| return maxpower_values | ||
| async def _get_token(self): | ||
| """Private method that fetches a new token if needed (async version).""" | ||
| _LOGGER.info("Fetching token for async requests") | ||
| is_valid_token = False | ||
| timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT) | ||
| # Prepare data as URL-encoded string, same as sync version | ||
| form_data = { | ||
| TOKEN_USERNAME: self._usr, | ||
| TOKEN_PASSWD: self._pwd, | ||
| } | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.post( | ||
| URL_TOKEN, | ||
| data=form_data, | ||
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||
| ) as response: | ||
| if response.status == 200: | ||
| # store token encoded | ||
| self._token["encoded"] = await response.text() | ||
| is_valid_token = True | ||
| else: | ||
| _LOGGER.error( | ||
| "Unknown error while retrieving async token, got %s", | ||
| await response.text(), | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.error("Error during async token fetch: %s", e) | ||
| return is_valid_token |
| """A REData API connector""" | ||
| import asyncio | ||
| import datetime as dt | ||
| import logging | ||
| import aiohttp | ||
| from dateutil import parser | ||
| from edata.models.pricing import PricingData | ||
| _LOGGER = logging.getLogger(__name__) | ||
| REQUESTS_TIMEOUT = 15 | ||
| URL_REALTIME_PRICES = ( | ||
| "https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real" | ||
| "?time_trunc=hour" | ||
| "&geo_ids={geo_id}" | ||
| "&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}" | ||
| ) | ||
| class REDataConnector: | ||
| """Main class for REData connector""" | ||
| def __init__( | ||
| self, | ||
| ) -> None: | ||
| """Init method for REDataConnector""" | ||
| async def get_realtime_prices( | ||
| self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False | ||
| ) -> list: | ||
| """GET query to fetch realtime pvpc prices, historical data is limited to current month (async version)""" | ||
| url = URL_REALTIME_PRICES.format( | ||
| geo_id=8744 if is_ceuta_melilla else 8741, | ||
| start=dt_from, | ||
| end=dt_to, | ||
| ) | ||
| data = [] | ||
| timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.get(url) as response: | ||
| if response.status == 200: | ||
| res_json = await response.json() | ||
| if res_json: | ||
| try: | ||
| res_list = res_json["included"][0]["attributes"][ | ||
| "values" | ||
| ] | ||
| except (IndexError, KeyError): | ||
| _LOGGER.error( | ||
| "%s returned a malformed response: %s ", | ||
| url, | ||
| await response.text(), | ||
| ) | ||
| return data | ||
| for element in res_list: | ||
| data.append( | ||
| PricingData( | ||
| datetime=parser.parse( | ||
| element["datetime"] | ||
| ).replace(tzinfo=None), | ||
| value_eur_kwh=element["value"] / 1000, | ||
| delta_h=1, | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.error( | ||
| "%s returned %s with code %s", | ||
| url, | ||
| await response.text(), | ||
| response.status, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| _LOGGER.error("Timeout error when fetching data from %s", url) | ||
| except aiohttp.ClientError as e: | ||
| _LOGGER.error( | ||
| "HTTP client error when fetching data from %s: %s", url, e | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.error("Unexpected error when fetching data from %s: %s", url, e) | ||
| return data |
| """Pydantic models for edata. | ||
| This module contains all data models using Pydantic for robust validation, | ||
| serialization and better developer experience. | ||
| """ | ||
| from edata.models.consumption import Consumption, ConsumptionAggregated | ||
| from edata.models.contract import Contract | ||
| from edata.models.maximeter import MaxPower | ||
| from edata.models.pricing import PricingAggregated, PricingData, PricingRules | ||
| from edata.models.supply import Supply | ||
| __all__ = [ | ||
| "Supply", | ||
| "Contract", | ||
| "Consumption", | ||
| "ConsumptionAggregated", | ||
| "PricingData", | ||
| "PricingRules", | ||
| "PricingAggregated", | ||
| "MaxPower", | ||
| ] |
| """Base models and common functionality for edata Pydantic models.""" | ||
| from datetime import datetime | ||
| from typing import Any, Dict | ||
| from pydantic import BaseModel, ConfigDict, field_validator | ||
| class EdataBaseModel(BaseModel): | ||
| """Base model for all edata entities with common configuration.""" | ||
| model_config = ConfigDict( | ||
| # Validate assignments to ensure data integrity | ||
| validate_assignment=True, | ||
| # Use enum values instead of enum objects for serialization | ||
| use_enum_values=True, | ||
| # Extra fields are forbidden to catch typos and ensure schema compliance | ||
| extra="forbid", | ||
| # Validate default values | ||
| validate_default=True, | ||
| # Allow serialization of datetime objects | ||
| arbitrary_types_allowed=False, | ||
| # Convert strings to datetime objects when possible | ||
| str_strip_whitespace=True, | ||
| ) | ||
| def model_dump_for_storage(self) -> Dict[str, Any]: | ||
| """Serialize model for storage, handling special types like datetime.""" | ||
| return self.model_dump(mode="json") | ||
| @classmethod | ||
| def from_storage(cls, data: Dict[str, Any]): | ||
| """Create model instance from storage data.""" | ||
| return cls.model_validate(data) | ||
| class TimestampMixin(BaseModel): | ||
| """Mixin for models that have datetime fields.""" | ||
| @field_validator("*", mode="before") | ||
| @classmethod | ||
| def validate_datetime_fields(cls, v, info): | ||
| """Convert datetime strings to datetime objects if needed.""" | ||
| field_name = info.field_name | ||
| if field_name and ("datetime" in field_name or "date" in field_name): | ||
| if isinstance(v, str): | ||
| try: | ||
| from dateutil import parser | ||
| return parser.parse(v) | ||
| except (ValueError, TypeError): | ||
| pass | ||
| return v | ||
| class EnergyMixin(BaseModel): | ||
| """Mixin for models dealing with energy values.""" | ||
| @field_validator("*", mode="before") | ||
| @classmethod | ||
| def validate_energy_fields(cls, v, info): | ||
| """Validate energy-related fields.""" | ||
| field_name = info.field_name | ||
| if field_name and ("kwh" in field_name.lower() or "kw" in field_name.lower()): | ||
| if v is not None and v < 0: | ||
| raise ValueError(f"{field_name} cannot be negative") | ||
| return v | ||
| def validate_cups(v: str) -> str: | ||
| """Validate CUPS (Spanish electricity supply point code) format.""" | ||
| if not v: | ||
| raise ValueError("CUPS cannot be empty") | ||
| # Remove spaces and convert to uppercase | ||
| cups = v.replace(" ", "").upper() | ||
| # Basic CUPS format validation (ES + 18-20 alphanumeric characters) | ||
| if not cups.startswith("ES"): | ||
| raise ValueError("CUPS must start with 'ES'") | ||
| if len(cups) < 20 or len(cups) > 22: | ||
| raise ValueError("CUPS must be 20-22 characters long") | ||
| return cups | ||
| def validate_positive_number(v: float) -> float: | ||
| """Validate that a number is positive.""" | ||
| if v is not None and v < 0: | ||
| raise ValueError("Value must be positive") | ||
| return v | ||
| def validate_reasonable_datetime(v: datetime) -> datetime: | ||
| """Validate that datetime is within reasonable bounds.""" | ||
| if v.year < 2000: | ||
| raise ValueError("Date cannot be before year 2000") | ||
| # Allow future dates for contracts and supplies (they can be valid until future dates) | ||
| # Only restrict to really unreasonable future dates | ||
| if v.year > datetime.now().year + 50: | ||
| raise ValueError("Date cannot be more than 50 years in the future") | ||
| return v |
| """Consumption (consumo) related Pydantic models.""" | ||
| from datetime import datetime as dt | ||
| from pydantic import Field, field_validator | ||
| from edata.models.base import ( | ||
| EdataBaseModel, | ||
| EnergyMixin, | ||
| TimestampMixin, | ||
| validate_positive_number, | ||
| validate_reasonable_datetime, | ||
| ) | ||
| class Consumption(EdataBaseModel, TimestampMixin, EnergyMixin): | ||
| """Pydantic model for electricity consumption data.""" | ||
| datetime: dt = Field(..., description="Timestamp of the consumption measurement") | ||
| delta_h: float = Field( | ||
| ..., description="Time interval in hours for this measurement", gt=0, le=24 | ||
| ) | ||
| value_kwh: float = Field(..., description="Energy consumption in kWh", ge=0) | ||
| surplus_kwh: float = Field( | ||
| default=0.0, description="Energy surplus/generation in kWh", ge=0 | ||
| ) | ||
| real: bool = Field( | ||
| default=True, description="Whether this is a real measurement or estimated" | ||
| ) | ||
| @field_validator("datetime") | ||
| @classmethod | ||
| def validate_datetime_range(cls, v: dt) -> dt: | ||
| """Validate datetime is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| @field_validator("value_kwh", "surplus_kwh") | ||
| @classmethod | ||
| def validate_energy_values(cls, v: float) -> float: | ||
| """Validate energy values are positive.""" | ||
| return validate_positive_number(v) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| return f"Consumption({self.datetime}, {self.value_kwh}kWh)" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"Consumption(datetime={self.datetime}, value_kwh={self.value_kwh}, real={self.real})" | ||
| class ConsumptionAggregated(EdataBaseModel, TimestampMixin, EnergyMixin): | ||
| """Pydantic model for aggregated consumption data (daily/monthly summaries).""" | ||
| datetime: dt = Field( | ||
| ..., description="Timestamp representing the start of the aggregation period" | ||
| ) | ||
| value_kwh: float = Field( | ||
| ..., description="Total energy consumption in kWh for the period", ge=0 | ||
| ) | ||
| value_p1_kwh: float = Field( | ||
| default=0.0, description="Energy consumption in period P1 (kWh)", ge=0 | ||
| ) | ||
| value_p2_kwh: float = Field( | ||
| default=0.0, description="Energy consumption in period P2 (kWh)", ge=0 | ||
| ) | ||
| value_p3_kwh: float = Field( | ||
| default=0.0, description="Energy consumption in period P3 (kWh)", ge=0 | ||
| ) | ||
| surplus_kwh: float = Field( | ||
| default=0.0, | ||
| description="Total energy surplus/generation in kWh for the period", | ||
| ge=0, | ||
| ) | ||
| surplus_p1_kwh: float = Field( | ||
| default=0.0, description="Energy surplus in period P1 (kWh)", ge=0 | ||
| ) | ||
| surplus_p2_kwh: float = Field( | ||
| default=0.0, description="Energy surplus in period P2 (kWh)", ge=0 | ||
| ) | ||
| surplus_p3_kwh: float = Field( | ||
| default=0.0, description="Energy surplus in period P3 (kWh)", ge=0 | ||
| ) | ||
| delta_h: float = Field( | ||
| ..., description="Duration of the aggregation period in hours", gt=0 | ||
| ) | ||
| @field_validator("datetime") | ||
| @classmethod | ||
| def validate_datetime_range(cls, v: dt) -> dt: | ||
| """Validate datetime is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| period = "day" if self.delta_h <= 24 else "month" | ||
| return f"ConsumptionAgg({self.datetime.date()}, {self.value_kwh}kWh/{period})" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"ConsumptionAggregated(datetime={self.datetime}, value_kwh={self.value_kwh}, delta_h={self.delta_h})" |
| """Contract (contrato) related Pydantic models.""" | ||
| from datetime import datetime | ||
| from typing import Optional | ||
| from pydantic import Field, field_validator | ||
| from edata.models.base import ( | ||
| EdataBaseModel, | ||
| TimestampMixin, | ||
| validate_positive_number, | ||
| validate_reasonable_datetime, | ||
| ) | ||
| class Contract(EdataBaseModel, TimestampMixin): | ||
| """Pydantic model for electricity contract data.""" | ||
| date_start: datetime = Field(..., description="Contract start date") | ||
| date_end: datetime = Field(..., description="Contract end date") | ||
| marketer: str = Field(..., description="Energy marketer company name", min_length=1) | ||
| distributor_code: str = Field( | ||
| ..., description="Distributor company code", min_length=1 | ||
| ) | ||
| power_p1: Optional[float] = Field( | ||
| None, description="Contracted power for period P1 (kW)", ge=0 | ||
| ) | ||
| power_p2: Optional[float] = Field( | ||
| None, description="Contracted power for period P2 (kW)", ge=0 | ||
| ) | ||
| @field_validator("date_start", "date_end") | ||
| @classmethod | ||
| def validate_date_range(cls, v: datetime) -> datetime: | ||
| """Validate date is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| @field_validator("power_p1", "power_p2") | ||
| @classmethod | ||
| def validate_power_values(cls, v: Optional[float]) -> Optional[float]: | ||
| """Validate power values are positive.""" | ||
| if v is not None: | ||
| return validate_positive_number(v) | ||
| return v | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| return f"Contract(marketer={self.marketer}, power_p1={self.power_p1}kW)" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"Contract(marketer={self.marketer}, date_start={self.date_start}, date_end={self.date_end})" |
| from datetime import datetime as DateTime | ||
| from typing import List, Optional | ||
| from pydantic import Field | ||
| from sqlalchemy import UniqueConstraint | ||
| from sqlmodel import Field, Relationship, SQLModel | ||
| from edata.models import Consumption, Contract, MaxPower, PricingData, Supply | ||
| class SupplyModel(Supply, SQLModel, table=True): | ||
| """SQLModel for electricity supply data inheriting from Pydantic model.""" | ||
| __tablename__: str = "supplies" | ||
| # Override cups field to add primary key | ||
| cups: str = Field(primary_key=True, min_length=20, max_length=22) | ||
| # Add database-specific fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) | ||
| # Relationships | ||
| contracts: List["ContractModel"] = Relationship(back_populates="supply") | ||
| consumptions: List["ConsumptionModel"] = Relationship(back_populates="supply") | ||
| maximeter: List["MaxPowerModel"] = Relationship(back_populates="supply") | ||
| class ContractModel(Contract, SQLModel, table=True): | ||
| """SQLModel for electricity contract data inheriting from Pydantic model.""" | ||
| __tablename__: str = "contracts" | ||
| __table_args__ = (UniqueConstraint("cups", "date_start"),) | ||
| # Add ID field for database | ||
| id: Optional[int] = Field(default=None, primary_key=True) | ||
| # Add CUPS field for foreign key | ||
| cups: str = Field(foreign_key="supplies.cups") | ||
| # Add database-specific fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) | ||
| # Relationships | ||
| supply: Optional["SupplyModel"] = Relationship(back_populates="contracts") | ||
| class ConsumptionModel(Consumption, SQLModel, table=True): | ||
| """SQLModel for electricity consumption data inheriting from Pydantic model.""" | ||
| __tablename__: str = "consumptions" | ||
| __table_args__ = (UniqueConstraint("cups", "datetime"),) | ||
| # Add ID field for database | ||
| id: Optional[int] = Field(default=None, primary_key=True) | ||
| # Add CUPS field for foreign key | ||
| cups: str = Field(foreign_key="supplies.cups") | ||
| # Add database-specific fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) | ||
| # Relationships | ||
| supply: Optional["SupplyModel"] = Relationship(back_populates="consumptions") | ||
| class MaxPowerModel(MaxPower, SQLModel, table=True): | ||
| """SQLModel for maximum power demand data inheriting from Pydantic model.""" | ||
| __tablename__: str = "maximeter" | ||
| __table_args__ = (UniqueConstraint("cups", "datetime"),) | ||
| # Add ID field for database | ||
| id: Optional[int] = Field(default=None, primary_key=True) | ||
| # Add CUPS field for foreign key | ||
| cups: str = Field(foreign_key="supplies.cups") | ||
| # Add database-specific fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) | ||
| # Relationships | ||
| supply: Optional["SupplyModel"] = Relationship(back_populates="maximeter") | ||
| class PVPCPricesModel(PricingData, SQLModel, table=True): | ||
| """SQLModel for PVPC pricing data inheriting from Pydantic model.""" | ||
| __tablename__: str = "pvpc_prices" | ||
| __table_args__ = (UniqueConstraint("datetime", "geo_id"),) | ||
| # Add ID field for database | ||
| id: Optional[int] = Field(default=None, primary_key=True) | ||
| # Add database-specific fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) | ||
| # Add required fields for geographic specificity | ||
| geo_id: int = Field( | ||
| description="Geographic identifier (8741=Peninsula, 8744=Ceuta/Melilla)" | ||
| ) | ||
| class BillingModel(SQLModel, table=True): | ||
| """SQLModel for billing calculations per hour.""" | ||
| __tablename__: str = "billing" | ||
| __table_args__ = (UniqueConstraint("cups", "datetime", "pricing_config_hash"),) | ||
| # Primary key | ||
| id: Optional[int] = Field(default=None, primary_key=True) | ||
| # Foreign key to supply | ||
| cups: str = Field(foreign_key="supplies.cups") | ||
| datetime: DateTime = Field(description="Hour of the billing calculation") | ||
| # Calculated cost terms (the essential billing data) | ||
| energy_term: float = Field(default=0.0, description="Energy cost term in €") | ||
| power_term: float = Field(default=0.0, description="Power cost term in €") | ||
| others_term: float = Field(default=0.0, description="Other costs term in €") | ||
| surplus_term: float = Field(default=0.0, description="Surplus income term in €") | ||
| total_eur: float = Field(default=0.0, description="Total cost in €") | ||
| # Metadata | ||
| tariff: Optional[str] = Field( | ||
| default=None, description="Tariff period (p1, p2, p3)" | ||
| ) | ||
| pricing_config_hash: str = Field(description="Hash of pricing rules configuration") | ||
| # Audit fields | ||
| created_at: DateTime = Field(default_factory=DateTime.now) | ||
| updated_at: DateTime = Field(default_factory=DateTime.now) |
| """Maximeter (maxímetro) related Pydantic models.""" | ||
| from datetime import datetime as dt | ||
| from pydantic import Field, field_validator | ||
| from edata.models.base import ( | ||
| EdataBaseModel, | ||
| TimestampMixin, | ||
| validate_positive_number, | ||
| validate_reasonable_datetime, | ||
| ) | ||
| class MaxPower(EdataBaseModel, TimestampMixin): | ||
| """Pydantic model for maximum power demand data.""" | ||
| datetime: dt = Field(..., description="Timestamp when maximum power was recorded") | ||
| value_kw: float = Field(..., description="Maximum power demand in kW", ge=0) | ||
| @field_validator("datetime") | ||
| @classmethod | ||
| def validate_datetime_range(cls, v: dt) -> dt: | ||
| """Validate datetime is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| @field_validator("value_kw") | ||
| @classmethod | ||
| def validate_power_value(cls, v: float) -> float: | ||
| """Validate power value is positive.""" | ||
| return validate_positive_number(v) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| return f"MaxPower({self.datetime}, {self.value_kw}kW)" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"MaxPower(datetime={self.datetime}, value_kw={self.value_kw})" |
| """Pricing related Pydantic models.""" | ||
| from datetime import datetime as dt | ||
| from typing import Optional | ||
| from pydantic import Field, field_validator | ||
| from edata.models.base import ( | ||
| EdataBaseModel, | ||
| TimestampMixin, | ||
| validate_positive_number, | ||
| validate_reasonable_datetime, | ||
| ) | ||
| class PricingData(EdataBaseModel, TimestampMixin): | ||
| """Pydantic model for electricity pricing data (PVPC prices).""" | ||
| datetime: dt = Field(..., description="Timestamp of the price data") | ||
| value_eur_kwh: float = Field(..., description="Price in EUR per kWh", ge=0) | ||
| delta_h: float = Field( | ||
| default=1.0, description="Duration this price applies (hours)", gt=0, le=24 | ||
| ) | ||
| @field_validator("datetime") | ||
| @classmethod | ||
| def validate_datetime_range(cls, v: dt) -> dt: | ||
| """Validate datetime is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| @field_validator("value_eur_kwh") | ||
| @classmethod | ||
| def validate_price_value(cls, v: float) -> float: | ||
| """Validate price value is positive.""" | ||
| return validate_positive_number(v) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| return f"Price({self.datetime}, {self.value_eur_kwh:.4f}€/kWh)" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return ( | ||
| f"PricingData(datetime={self.datetime}, value_eur_kwh={self.value_eur_kwh})" | ||
| ) | ||
| class PricingRules(EdataBaseModel): | ||
| """Pydantic model for custom pricing rules configuration.""" | ||
| # Power term costs (yearly costs in EUR per kW) | ||
| p1_kw_year_eur: float = Field( | ||
| ..., description="P1 power term cost (EUR/kW/year)", ge=0 | ||
| ) | ||
| p2_kw_year_eur: float = Field( | ||
| ..., description="P2 power term cost (EUR/kW/year)", ge=0 | ||
| ) | ||
| # Energy term costs (optional for fixed pricing) | ||
| p1_kwh_eur: Optional[float] = Field( | ||
| None, description="P1 energy term cost (EUR/kWh) - None for PVPC", ge=0 | ||
| ) | ||
| p2_kwh_eur: Optional[float] = Field( | ||
| None, description="P2 energy term cost (EUR/kWh) - None for PVPC", ge=0 | ||
| ) | ||
| p3_kwh_eur: Optional[float] = Field( | ||
| None, description="P3 energy term cost (EUR/kWh) - None for PVPC", ge=0 | ||
| ) | ||
| # Surplus compensation (optional) | ||
| surplus_p1_kwh_eur: Optional[float] = Field( | ||
| None, description="P1 surplus compensation (EUR/kWh)", ge=0 | ||
| ) | ||
| surplus_p2_kwh_eur: Optional[float] = Field( | ||
| None, description="P2 surplus compensation (EUR/kWh)", ge=0 | ||
| ) | ||
| surplus_p3_kwh_eur: Optional[float] = Field( | ||
| None, description="P3 surplus compensation (EUR/kWh)", ge=0 | ||
| ) | ||
| # Fixed costs | ||
| meter_month_eur: float = Field( | ||
| ..., description="Monthly meter rental cost (EUR/month)", ge=0 | ||
| ) | ||
| market_kw_year_eur: float = Field( | ||
| ..., description="Market operator cost (EUR/kW/year)", ge=0 | ||
| ) | ||
| # Tax multipliers | ||
| electricity_tax: float = Field( | ||
| ..., description="Electricity tax multiplier (e.g., 1.05113 for 5.113%)", ge=1.0 | ||
| ) | ||
| iva_tax: float = Field( | ||
| ..., description="VAT tax multiplier (e.g., 1.21 for 21%)", ge=1.0 | ||
| ) | ||
| # Custom formulas (optional) | ||
| energy_formula: Optional[str] = Field( | ||
| "electricity_tax * iva_tax * kwh_eur * kwh", | ||
| description="Custom energy cost formula (Jinja2 template)", | ||
| ) | ||
| power_formula: Optional[str] = Field( | ||
| "electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24", | ||
| description="Custom power cost formula (Jinja2 template)", | ||
| ) | ||
| others_formula: Optional[str] = Field( | ||
| "iva_tax * meter_month_eur / 30 / 24", | ||
| description="Custom other costs formula (Jinja2 template)", | ||
| ) | ||
| surplus_formula: Optional[str] = Field( | ||
| "electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur", | ||
| description="Custom surplus compensation formula (Jinja2 template)", | ||
| ) | ||
| main_formula: Optional[str] = Field( | ||
| "energy_term + power_term + others_term", | ||
| description="Main cost calculation formula (Jinja2 template)", | ||
| ) | ||
| # Billing cycle | ||
| cycle_start_day: int = Field( | ||
| default=1, description="Day of month when billing cycle starts", ge=1, le=30 | ||
| ) | ||
| @property | ||
| def is_pvpc(self) -> bool: | ||
| """Check if this configuration uses PVPC (variable pricing).""" | ||
| return all( | ||
| price is None | ||
| for price in [self.p1_kwh_eur, self.p2_kwh_eur, self.p3_kwh_eur] | ||
| ) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| pricing_type = "PVPC" if self.is_pvpc else "Fixed" | ||
| return f"PricingRules({pricing_type}, P1={self.p1_kw_year_eur}€/kW/year)" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"PricingRules(p1_kw_year_eur={self.p1_kw_year_eur}, is_pvpc={self.is_pvpc})" | ||
| class PricingAggregated(EdataBaseModel, TimestampMixin): | ||
| """Pydantic model for aggregated pricing/billing data.""" | ||
| datetime: dt = Field( | ||
| ..., description="Timestamp representing the start of the billing period" | ||
| ) | ||
| value_eur: float = Field(..., description="Total cost in EUR for the period", ge=0) | ||
| energy_term: float = Field(default=0.0, description="Energy term cost (EUR)", ge=0) | ||
| power_term: float = Field(default=0.0, description="Power term cost (EUR)", ge=0) | ||
| others_term: float = Field(default=0.0, description="Other costs term (EUR)", ge=0) | ||
| surplus_term: float = Field( | ||
| default=0.0, description="Surplus compensation term (EUR)", ge=0 | ||
| ) | ||
| delta_h: float = Field( | ||
| default=1.0, description="Duration of the billing period in hours", gt=0 | ||
| ) | ||
| @field_validator("datetime") | ||
| @classmethod | ||
| def validate_datetime_range(cls, v: dt) -> dt: | ||
| """Validate datetime is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| def __str__(self) -> str: | ||
| """String representation.""" | ||
| period = ( | ||
| "hour" if self.delta_h <= 1 else "day" if self.delta_h <= 24 else "month" | ||
| ) | ||
| return f"Billing({self.datetime.date()}, {self.value_eur:.2f}€/{period})" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"PricingAggregated(datetime={self.datetime}, value_eur={self.value_eur}, delta_h={self.delta_h})" |
| """Supply (suministro) related Pydantic models.""" | ||
| from datetime import datetime | ||
| from typing import Optional | ||
| from pydantic import Field, field_validator | ||
| from edata.models.base import ( | ||
| EdataBaseModel, | ||
| TimestampMixin, | ||
| validate_cups, | ||
| validate_reasonable_datetime, | ||
| ) | ||
| class Supply(EdataBaseModel, TimestampMixin): | ||
| """Pydantic model for electricity supply data (suministro eléctrico).""" | ||
| cups: str = Field( | ||
| ..., | ||
| description="CUPS (Código Universal de Punto de Suministro) - Universal Supply Point Code", | ||
| min_length=20, | ||
| max_length=22, | ||
| ) | ||
| date_start: datetime = Field(..., description="Supply contract start date") | ||
| date_end: datetime = Field(..., description="Supply contract end date") | ||
| address: Optional[str] = Field(None, description="Supply point address") | ||
| postal_code: Optional[str] = Field( | ||
| None, description="Postal code of the supply point", pattern=r"^\d{5}$" | ||
| ) | ||
| province: Optional[str] = Field(None, description="Province name") | ||
| municipality: Optional[str] = Field(None, description="Municipality name") | ||
| distributor: Optional[str] = Field( | ||
| None, description="Electricity distributor company name" | ||
| ) | ||
| point_type: int = Field(..., description="Type of supply point", ge=1, le=5) | ||
| distributor_code: str = Field( | ||
| ..., description="Distributor company code", min_length=1 | ||
| ) | ||
| @field_validator("cups") | ||
| @classmethod | ||
| def validate_cups_format(cls, v: str) -> str: | ||
| """Validate CUPS format.""" | ||
| return validate_cups(v) | ||
| @field_validator("date_start", "date_end") | ||
| @classmethod | ||
| def validate_date_range(cls, v: datetime) -> datetime: | ||
| """Validate date is reasonable.""" | ||
| return validate_reasonable_datetime(v) | ||
| @field_validator("date_end") | ||
| @classmethod | ||
| def validate_end_after_start(cls, v: datetime, info) -> datetime: | ||
| """Validate that end date is after start date.""" | ||
| if ( | ||
| hasattr(info.data, "date_start") | ||
| and info.data["date_start"] | ||
| and v <= info.data["date_start"] | ||
| ): | ||
| raise ValueError("End date must be after start date") | ||
| return v | ||
| def __str__(self) -> str: | ||
| """String representation showing anonymized CUPS.""" | ||
| return f"Supply(cups=...{self.cups[-5:]}, distributor={self.distributor})" | ||
| def __repr__(self) -> str: | ||
| """Developer representation.""" | ||
| return f"Supply(cups={self.cups}, point_type={self.point_type})" |
| """Scripts de utilidades para edata.""" |
| #!/usr/bin/env python3 | ||
| """ | ||
| Script interactivo para hacer un dump completo de un CUPS a una base de datos. | ||
| """ | ||
| import argparse | ||
| import asyncio | ||
| import getpass | ||
| import logging | ||
| from datetime import datetime, timedelta | ||
| from typing import List, Optional | ||
| from edata.connectors.datadis import DatadisConnector | ||
| from edata.const import DEFAULT_STORAGE_DIR | ||
| from edata.helpers import EdataHelper | ||
| from edata.services.database import SupplyModel as DbSupply | ||
| from edata.services.supply import SupplyService | ||
| # Configure logging | ||
| logging.basicConfig( | ||
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | ||
| ) | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class DumpSupply: | ||
| """Clase para hacer dump interactivo completo de un CUPS.""" | ||
| def __init__(self, storage_dir: Optional[str] = None): | ||
| """Inicializar el dumper interactivo.""" | ||
| self.storage_dir = storage_dir or DEFAULT_STORAGE_DIR | ||
| self.username: Optional[str] = None | ||
| self.password: Optional[str] = None | ||
| self.authorized_nif: Optional[str] = None | ||
| self.connector: Optional[DatadisConnector] = None | ||
| self.supplies: List[DbSupply] = [] | ||
| def get_credentials(self) -> bool: | ||
| """Obtener credenciales del usuario de forma interactiva.""" | ||
| print("\n🔐 Configuración de credenciales Datadis") | ||
| print("=" * 50) | ||
| try: | ||
| self.username = input("📧 Usuario Datadis: ").strip() | ||
| if not self.username: | ||
| print("❌ El usuario es obligatorio") | ||
| return False | ||
| print(self.username) | ||
| self.password = getpass.getpass("🔑 Contraseña Datadis: ").strip() | ||
| if not self.password: | ||
| print("❌ La contraseña es obligatoria") | ||
| return False | ||
| print(self.password) | ||
| nif_input = input( | ||
| "🆔 NIF autorizado (opcional, Enter para omitir): " | ||
| ).strip() | ||
| self.authorized_nif = nif_input if nif_input else None | ||
| return True | ||
| except KeyboardInterrupt: | ||
| print("\n❌ Operación cancelada por el usuario") | ||
| return False | ||
| except Exception as e: | ||
| print(f"❌ Error obteniendo credenciales: {e}") | ||
| return False | ||
| async def test_connection(self) -> bool: | ||
| """Probar la conexión con Datadis.""" | ||
| print("\n🧪 Probando conexión con Datadis...") | ||
| try: | ||
| # Verificar que tenemos credenciales | ||
| if not self.username or not self.password: | ||
| print("❌ Credenciales no disponibles") | ||
| return False | ||
| self.connector = DatadisConnector(self.username, self.password) | ||
| # Probar autenticación | ||
| token_result = await self.connector.login() | ||
| if not token_result: | ||
| print("❌ Error de autenticación. Verifica tus credenciales.") | ||
| return False | ||
| print("✅ Conexión exitosa con Datadis") | ||
| return True | ||
| except Exception as e: | ||
| print(f"❌ Error conectando con Datadis: {e}") | ||
| return False | ||
| async def fetch_supplies(self) -> bool: | ||
| """Obtener y mostrar los suministros disponibles.""" | ||
| print("\n📋 Obteniendo suministros disponibles...") | ||
| try: | ||
| # Verificar que tenemos credenciales | ||
| if not self.username or not self.password: | ||
| print("❌ Credenciales no disponibles") | ||
| return False | ||
| supplies_service = SupplyService( | ||
| DatadisConnector( | ||
| username=self.username, | ||
| password=self.password, | ||
| storage_path=self.storage_dir, | ||
| ), | ||
| storage_dir=self.storage_dir, | ||
| ) | ||
| # Actualizar supplies | ||
| result = await supplies_service.update_supplies( | ||
| authorized_nif=self.authorized_nif | ||
| ) | ||
| if not result["success"]: | ||
| print( | ||
| f"❌ Error obteniendo suministros: {result.get('error', 'Error desconocido')}" | ||
| ) | ||
| return False | ||
| # Obtener todos los supplies desde la base de datos | ||
| self.supplies = await supplies_service.get_supplies() | ||
| if not self.supplies: | ||
| print("❌ No se encontraron suministros en tu cuenta") | ||
| return False | ||
| print(f"✅ Encontrados {len(self.supplies)} suministros") | ||
| return True | ||
| except Exception as e: | ||
| print(f"❌ Error obteniendo suministros: {e}") | ||
| return False | ||
| def display_supplies_menu(self) -> Optional[DbSupply]: | ||
| """Mostrar menú de suministros y obtener selección.""" | ||
| print("\n🏠 Selecciona un suministro para procesar:") | ||
| print("=" * 70) | ||
| for i, supply in enumerate(self.supplies, 1): | ||
| # Mostrar información del suministro | ||
| cups_short = supply.cups[-10:] if len(supply.cups) > 10 else supply.cups | ||
| address = supply.address or "Dirección no disponible" | ||
| if len(address) > 40: | ||
| address = address[:40] + "..." | ||
| print(f"{i:2d}. CUPS: {cups_short} | {address}") | ||
| print( | ||
| f" 📍 {supply.municipality or 'N/A'}, {supply.province or 'N/A'} ({supply.postal_code or 'N/A'})" | ||
| ) | ||
| print( | ||
| f" 📊 Tipo: {supply.point_type} | Distribuidor: {supply.distributor or 'N/A'}" | ||
| ) | ||
| print( | ||
| f" 📅 Válido: {supply.date_start.date()} - {supply.date_end.date()}" | ||
| ) | ||
| print() | ||
| try: | ||
| selection = input( | ||
| f"Selecciona un suministro (1-{len(self.supplies)}) o 'q' para salir: " | ||
| ).strip() | ||
| if selection.lower() == "q": | ||
| return None | ||
| index = int(selection) - 1 | ||
| if 0 <= index < len(self.supplies): | ||
| return self.supplies[index] | ||
| else: | ||
| print( | ||
| f"❌ Selección inválida. Debe estar entre 1 y {len(self.supplies)}" | ||
| ) | ||
| return self.display_supplies_menu() | ||
| except ValueError: | ||
| print("❌ Por favor introduce un número válido") | ||
| return self.display_supplies_menu() | ||
| except KeyboardInterrupt: | ||
| print("\n❌ Operación cancelada") | ||
| return None | ||
| def get_date_range(self) -> tuple[datetime, datetime]: | ||
| """Obtener rango de fechas del usuario.""" | ||
| print("\n📅 Configuración de fechas") | ||
| print("=" * 30) | ||
| print("Deja en blanco para usar valores por defecto (últimos 2 años)") | ||
| try: | ||
| date_from_str = input( | ||
| "📅 Fecha inicio (YYYY-MM-DD) [Enter = 2 años atrás]: " | ||
| ).strip() | ||
| date_to_str = input("📅 Fecha fin (YYYY-MM-DD) [Enter = hoy]: ").strip() | ||
| date_from = None | ||
| date_to = None | ||
| if date_from_str: | ||
| try: | ||
| date_from = datetime.strptime(date_from_str, "%Y-%m-%d") | ||
| except ValueError: | ||
| print( | ||
| "❌ Formato de fecha inicio inválido, usando valor por defecto" | ||
| ) | ||
| if date_to_str: | ||
| try: | ||
| date_to = datetime.strptime(date_to_str, "%Y-%m-%d") | ||
| except ValueError: | ||
| print("❌ Formato de fecha fin inválido, usando valor por defecto") | ||
| # Valores por defecto | ||
| if date_from is None: | ||
| date_from = datetime.now() - timedelta(days=730) | ||
| if date_to is None: | ||
| date_to = datetime.now() | ||
| print(f"📊 Período seleccionado: {date_from.date()} a {date_to.date()}") | ||
| return date_from, date_to | ||
| except KeyboardInterrupt: | ||
| print("❌ Usando valores por defecto") | ||
| default_from = datetime.now() - timedelta(days=730) | ||
| default_to = datetime.now() | ||
| return default_from, default_to | ||
| async def dump_selected_supply( | ||
| self, supply: DbSupply, date_from: datetime, date_to: datetime | ||
| ) -> bool: | ||
| """Hacer dump completo de un suministro seleccionado.""" | ||
| print(f"🚀 Iniciando dump para CUPS {supply.cups[-10:]}") | ||
| print("=" * 50) | ||
| try: | ||
| # Verificar que tenemos credenciales | ||
| if not self.username or not self.password: | ||
| print("❌ Credenciales no disponibles") | ||
| return False | ||
| # Crear EdataHelper para este CUPS | ||
| helper = EdataHelper( | ||
| datadis_username=self.username, | ||
| datadis_password=self.password, | ||
| cups=supply.cups, | ||
| datadis_authorized_nif=self.authorized_nif, | ||
| storage_dir_path=self.storage_dir, | ||
| ) | ||
| print(f"📅 Período: {date_from.date()} a {date_to.date()}") | ||
| print("⏳ Descargando datos... (esto puede tomar varios minutos)") | ||
| # Actualizar todos los datos | ||
| result = await helper.update(date_from=date_from, date_to=date_to) | ||
| if not result: | ||
| print("❌ Error durante la descarga de datos") | ||
| return False | ||
| print("✅ Datos descargados correctamente") | ||
| # Mostrar estadísticas | ||
| await self.display_final_statistics(helper) | ||
| return True | ||
| except Exception as e: | ||
| print(f"❌ Error durante el dump: {e}") | ||
| return False | ||
| async def display_final_statistics(self, helper: EdataHelper): | ||
| """Mostrar estadísticas finales del dump.""" | ||
| print("📊 Estadísticas del dump completado:") | ||
| print("=" * 50) | ||
| summary = helper.attributes | ||
| print(f"🏠 CUPS: {summary.get('cups', 'N/A')}") | ||
| # Información de contrato | ||
| if summary.get("contract_p1_kW") is not None: | ||
| print( | ||
| f"⚡ Potencia contratada P1: {summary.get('contract_p1_kW', 'N/A')} kW" | ||
| ) | ||
| if summary.get("contract_p2_kW") is not None: | ||
| print( | ||
| f"⚡ Potencia contratada P2: {summary.get('contract_p2_kW', 'N/A')} kW" | ||
| ) | ||
| # Información de consumo | ||
| if summary.get("yesterday_kWh") is not None: | ||
| print(f"📈 Consumo ayer: {summary.get('yesterday_kWh', 'N/A')} kWh") | ||
| if summary.get("month_kWh") is not None: | ||
| print(f"📈 Consumo mes actual: {summary.get('month_kWh', 'N/A')} kWh") | ||
| if summary.get("last_month_kWh") is not None: | ||
| print( | ||
| f"📈 Consumo mes anterior: {summary.get('last_month_kWh', 'N/A')} kWh" | ||
| ) | ||
| # Información de potencia máxima | ||
| if summary.get("max_power_kW") is not None: | ||
| print( | ||
| f"🔋 Potencia máxima registrada: {summary.get('max_power_kW', 'N/A')} kW" | ||
| ) | ||
| # Información de costes (si está disponible) | ||
| if summary.get("month_€") is not None: | ||
| print(f"💰 Coste mes actual: {summary.get('month_€', 'N/A')} €") | ||
| if summary.get("last_month_€") is not None: | ||
| print(f"💰 Coste mes anterior: {summary.get('last_month_€', 'N/A')} €") | ||
| print(f"\n💾 Datos almacenados en: {self.storage_dir}") | ||
| async def run_interactive_session(self) -> bool: | ||
| """Ejecutar sesión interactiva completa.""" | ||
| print("🏠 Extractor interactivo de datos eléctricos") | ||
| print("=" * 50) | ||
| print( | ||
| "Este script te ayudará a extraer todos los datos de tu suministro eléctrico" | ||
| ) | ||
| print() | ||
| try: | ||
| # 1. Obtener credenciales | ||
| if not self.get_credentials(): | ||
| return False | ||
| # 2. Probar conexión | ||
| if not await self.test_connection(): | ||
| return False | ||
| # 3. Obtener suministros | ||
| if not await self.fetch_supplies(): | ||
| return False | ||
| # 4. Mostrar menú y seleccionar suministro | ||
| selected_supply = self.display_supplies_menu() | ||
| if not selected_supply: | ||
| print("👋 Operación cancelada") | ||
| return False | ||
| print( | ||
| f"\n✅ Seleccionado: {selected_supply.cups[-10:]} - {selected_supply.address or 'Sin dirección'}" | ||
| ) | ||
| # 5. Configurar fechas | ||
| date_from, date_to = self.get_date_range() | ||
| # 6. Ejecutar dump | ||
| success = await self.dump_selected_supply( | ||
| selected_supply, date_from, date_to | ||
| ) | ||
| if success: | ||
| print("\n🎉 ¡Dump completado exitosamente!") | ||
| print("Todos los datos han sido almacenados en la base de datos local.") | ||
| return success | ||
| except KeyboardInterrupt: | ||
| print("\n\n👋 Operación cancelada por el usuario") | ||
| return False | ||
| except Exception as e: | ||
| print(f"\n❌ Error durante la sesión interactiva: {e}") | ||
| return False | ||
| async def main(): | ||
| """Función principal.""" | ||
| parser = argparse.ArgumentParser( | ||
| description="Extractor interactivo de datos eléctricos", | ||
| formatter_class=argparse.RawDescriptionHelpFormatter, | ||
| epilog=""" | ||
| Ejemplo de uso: | ||
| # Modo interactivo | ||
| python -m edata.scripts.dump | ||
| # Con directorio personalizado | ||
| python -m edata.scripts.dump --storage-dir /ruta/datos | ||
| """, | ||
| ) | ||
| parser.add_argument( | ||
| "--storage-dir", | ||
| default=".", | ||
| help="Directorio de almacenamiento (por defecto: directorio actual)", | ||
| ) | ||
| args = parser.parse_args() | ||
| # Crear dumper | ||
| dumper = DumpSupply(storage_dir=args.storage_dir) | ||
| # Ejecutar modo interactivo | ||
| success = await dumper.run_interactive_session() | ||
| if success: | ||
| exit(0) | ||
| else: | ||
| exit(1) | ||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
| """Services package for edata.""" | ||
| from edata.services.billing import BillingService | ||
| from edata.services.consumption import ConsumptionService | ||
| from edata.services.contract import ContractService | ||
| from edata.services.database import DatabaseService, get_database_service | ||
| from edata.services.maximeter import MaximeterService | ||
| from edata.services.supply import SupplyService | ||
| __all__ = [ | ||
| "DatabaseService", | ||
| "get_database_service", | ||
| "SupplyService", | ||
| "ContractService", | ||
| "ConsumptionService", | ||
| "MaximeterService", | ||
| "BillingService", | ||
| ] |
| """Billing service for managing energy prices and billing calculations.""" | ||
| import contextlib | ||
| import logging | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Dict, List, Optional | ||
| from jinja2 import Environment | ||
| from edata.connectors.redata import REDataConnector | ||
| from edata.models.pricing import PricingAggregated, PricingData, PricingRules | ||
| from edata.services.database import PVPCPricesModel, get_database_service | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class BillingService: | ||
| """Service for managing energy pricing and billing data.""" | ||
| def __init__(self, storage_dir: Optional[str] = None): | ||
| """Initialize billing service. | ||
| Args: | ||
| storage_dir: Directory for database storage | ||
| """ | ||
| self._redata = REDataConnector() | ||
| self._storage_dir = storage_dir | ||
| self._db_service = None | ||
| async def _get_db_service(self): | ||
| """Get database service, initializing if needed.""" | ||
| if self._db_service is None: | ||
| self._db_service = await get_database_service(self._storage_dir) | ||
| return self._db_service | ||
| async def update_pvpc_prices( | ||
| self, start_date: datetime, end_date: datetime, is_ceuta_melilla: bool = False | ||
| ) -> Dict[str, Any]: | ||
| """Update PVPC prices from REData API. | ||
| Args: | ||
| start_date: Start date for price data | ||
| end_date: End date for price data | ||
| is_ceuta_melilla: Whether to get prices for Ceuta/Melilla (True) or Peninsula (False) | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| geo_id = 8744 if is_ceuta_melilla else 8741 | ||
| region = "Ceuta/Melilla" if is_ceuta_melilla else "Peninsula" | ||
| _LOGGER.info( | ||
| f"Updating PVPC prices for {region} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| # Determine actual start date based on existing data | ||
| actual_start_date = start_date | ||
| db_service = await self._get_db_service() | ||
| last_price_record = await db_service.get_latest_pvpc_price(geo_id=geo_id) | ||
| if last_price_record: | ||
| # Start from the hour after the last price record | ||
| actual_start_date = max( | ||
| start_date, last_price_record.datetime + timedelta(hours=1) | ||
| ) | ||
| _LOGGER.info( | ||
| f"Found existing price data up to {last_price_record.datetime.date()}, fetching from {actual_start_date.date()}" | ||
| ) | ||
| else: | ||
| _LOGGER.info( | ||
| f"No existing price data found for {region}, fetching all data" | ||
| ) | ||
| # If actual start date is beyond end date, no new data needed | ||
| if actual_start_date >= end_date: | ||
| _LOGGER.info(f"No new price data needed for {region} (up to date)") | ||
| return { | ||
| "success": True, | ||
| "region": region, | ||
| "geo_id": geo_id, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": 0, | ||
| "saved": 0, | ||
| "updated": 0, | ||
| "skipped": "up_to_date", | ||
| }, | ||
| "message": "Price data is up to date", | ||
| } | ||
| try: | ||
| # Fetch price data from REData (only missing data) | ||
| prices = await self._redata.get_realtime_prices( | ||
| dt_from=actual_start_date, | ||
| dt_to=end_date, | ||
| is_ceuta_melilla=is_ceuta_melilla, | ||
| ) | ||
| # Save to database | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| for price in prices: | ||
| price_dict = price.model_dump() | ||
| price_dict["geo_id"] = geo_id | ||
| # Check if price already exists for this specific datetime and geo_id | ||
| existing = await db_service.get_pvpc_prices( | ||
| start_date=price.datetime, end_date=price.datetime, geo_id=geo_id | ||
| ) | ||
| if existing: | ||
| updated_count += 1 | ||
| else: | ||
| saved_count += 1 | ||
| await db_service.save_pvpc_price(price_dict) | ||
| result = { | ||
| "success": True, | ||
| "region": region, | ||
| "geo_id": geo_id, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": len(prices), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| }, | ||
| } | ||
| if actual_start_date > start_date: | ||
| result["message"] = ( | ||
| f"Fetched only missing price data from {actual_start_date.date()}" | ||
| ) | ||
| _LOGGER.info( | ||
| f"PVPC price update completed: {len(prices)} fetched, " | ||
| f"{saved_count} saved, {updated_count} updated" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating PVPC prices for {region}: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "region": region, | ||
| "geo_id": geo_id, | ||
| "error": str(e), | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": ( | ||
| actual_start_date.isoformat() | ||
| if "actual_start_date" in locals() | ||
| else start_date.isoformat() | ||
| ), | ||
| }, | ||
| } | ||
| def get_custom_prices( | ||
| self, pricing_rules: PricingRules, start_date: datetime, end_date: datetime | ||
| ) -> List[PricingData]: | ||
| """Calculate custom energy prices dynamically based on pricing rules. | ||
| Args: | ||
| pricing_rules: Custom pricing configuration | ||
| start_date: Start date for price data | ||
| end_date: End date for price data | ||
| Returns: | ||
| List of PricingData objects calculated on-the-fly | ||
| """ | ||
| if pricing_rules.is_pvpc: | ||
| raise ValueError("Use get_stored_pvpc_prices() for PVPC pricing rules") | ||
| _LOGGER.info( | ||
| f"Calculating custom prices from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| try: | ||
| # Import here to avoid circular imports | ||
| from edata.utils import get_pvpc_tariff | ||
| prices = [] | ||
| # Generate hourly prices based on custom rules | ||
| current_dt = start_date | ||
| while current_dt < end_date: | ||
| # Determine tariff period for this hour | ||
| tariff = get_pvpc_tariff(current_dt) | ||
| # Get the appropriate price based on tariff period | ||
| if tariff == "p1" and pricing_rules.p1_kwh_eur is not None: | ||
| price_eur_kwh = pricing_rules.p1_kwh_eur | ||
| elif tariff == "p2" and pricing_rules.p2_kwh_eur is not None: | ||
| price_eur_kwh = pricing_rules.p2_kwh_eur | ||
| elif tariff == "p3" and pricing_rules.p3_kwh_eur is not None: | ||
| price_eur_kwh = pricing_rules.p3_kwh_eur | ||
| else: | ||
| # Skip if no price defined for this period | ||
| current_dt += timedelta(hours=1) | ||
| continue | ||
| # Create PricingData object | ||
| price_data = PricingData( | ||
| datetime=current_dt, value_eur_kwh=price_eur_kwh, delta_h=1.0 | ||
| ) | ||
| prices.append(price_data) | ||
| current_dt += timedelta(hours=1) | ||
| _LOGGER.info(f"Generated {len(prices)} custom price points") | ||
| return prices | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error calculating custom prices: {str(e)}") | ||
| raise | ||
| async def get_stored_pvpc_prices( | ||
| self, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| geo_id: Optional[int] = None, | ||
| ) -> List[PVPCPricesModel]: | ||
| """Get stored PVPC prices from database. | ||
| Args: | ||
| start_date: Optional start date filter | ||
| end_date: Optional end date filter | ||
| geo_id: Optional geographic filter | ||
| Returns: | ||
| List of PVPCPrices objects | ||
| """ | ||
| db_service = await self._get_db_service() | ||
| return await db_service.get_pvpc_prices(start_date, end_date, geo_id) | ||
| async def get_prices( | ||
| self, | ||
| pricing_rules: PricingRules, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| is_ceuta_melilla: bool = False, | ||
| ) -> Optional[List[PricingData]]: | ||
| """Get prices automatically based on pricing rules configuration. | ||
| Args: | ||
| pricing_rules: Pricing configuration | ||
| start_date: Start date for price data | ||
| end_date: End date for price data | ||
| is_ceuta_melilla: Whether to get PVPC prices for Ceuta/Melilla | ||
| Returns: | ||
| List of PricingData objects or None if missing required data | ||
| """ | ||
| if pricing_rules.is_pvpc: | ||
| # Get stored PVPC prices from database | ||
| geo_id = 8744 if is_ceuta_melilla else 8741 | ||
| pvpc_prices = await self.get_stored_pvpc_prices( | ||
| start_date, end_date, geo_id | ||
| ) | ||
| # Return None if no PVPC prices found | ||
| if not pvpc_prices: | ||
| _LOGGER.warning( | ||
| f"No PVPC prices found for geo_id {geo_id} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| return None | ||
| # Convert PVPCPrices to PricingData | ||
| return [ | ||
| PricingData( | ||
| datetime=price.datetime, | ||
| value_eur_kwh=price.value_eur_kwh, | ||
| delta_h=price.delta_h, | ||
| ) | ||
| for price in pvpc_prices | ||
| ] | ||
| else: | ||
| # Check if custom pricing rules have required data | ||
| if ( | ||
| pricing_rules.p1_kwh_eur is None | ||
| and pricing_rules.p2_kwh_eur is None | ||
| and pricing_rules.p3_kwh_eur is None | ||
| ): | ||
| _LOGGER.warning("No custom energy prices defined in pricing rules") | ||
| return None | ||
| # Calculate custom prices dynamically | ||
| try: | ||
| custom_prices = self.get_custom_prices( | ||
| pricing_rules, start_date, end_date | ||
| ) | ||
| # Return None if no prices could be generated | ||
| if not custom_prices: | ||
| _LOGGER.warning( | ||
| f"No custom prices could be generated for period {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| return None | ||
| return custom_prices | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error generating custom prices: {str(e)}") | ||
| return None | ||
| async def get_cost( | ||
| self, | ||
| cups: str, | ||
| pricing_rules: PricingRules, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| is_ceuta_melilla: bool = False, | ||
| ) -> PricingAggregated: | ||
| """Get billing cost for a period based on pricing rules. | ||
| First checks the billing table for existing data with the pricing rules hash. | ||
| If not found, calls update_missing_costs to calculate and store the data. | ||
| Then returns the aggregated cost from the billing table. | ||
| Args: | ||
| cups: CUPS identifier for consumption data | ||
| pricing_rules: Pricing configuration | ||
| start_date: Start date for cost calculation | ||
| end_date: End date for cost calculation | ||
| is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices | ||
| Returns: | ||
| PricingAggregated object with cost breakdown for the period | ||
| """ | ||
| _LOGGER.info( | ||
| f"Getting cost for CUPS {cups} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| try: | ||
| # Generate pricing configuration hash | ||
| db_service = await self._get_db_service() | ||
| pricing_config_hash = db_service.generate_pricing_config_hash( | ||
| pricing_rules.model_dump() | ||
| ) | ||
| # Check if billing data already exists by looking for the latest billing record | ||
| latest_billing = await db_service.get_latest_billing( | ||
| cups=cups, pricing_config_hash=pricing_config_hash | ||
| ) | ||
| # Determine if we need to calculate missing costs | ||
| needs_calculation = False | ||
| actual_start_date = start_date | ||
| if not latest_billing: | ||
| # No billing data exists for this configuration | ||
| needs_calculation = True | ||
| _LOGGER.info( | ||
| f"No billing data found for hash {pricing_config_hash[:8]}..., calculating all costs" | ||
| ) | ||
| elif latest_billing.datetime < end_date - timedelta(hours=1): | ||
| # Billing data exists but is incomplete for the requested period | ||
| needs_calculation = True | ||
| actual_start_date = max( | ||
| start_date, latest_billing.datetime + timedelta(hours=1) | ||
| ) | ||
| _LOGGER.info( | ||
| f"Found billing data up to {latest_billing.datetime.date()}, calculating from {actual_start_date.date()}" | ||
| ) | ||
| # Calculate missing costs if needed | ||
| if needs_calculation: | ||
| update_result = await self.update_missing_costs( | ||
| cups, | ||
| pricing_rules, | ||
| actual_start_date, | ||
| end_date, | ||
| is_ceuta_melilla, | ||
| force_recalculate=False, | ||
| ) | ||
| if not update_result["success"]: | ||
| _LOGGER.error( | ||
| f"Failed to update costs: {update_result.get('error', 'Unknown error')}" | ||
| ) | ||
| return PricingAggregated( | ||
| datetime=start_date, | ||
| value_eur=0.0, | ||
| energy_term=0.0, | ||
| power_term=0.0, | ||
| others_term=0.0, | ||
| surplus_term=0.0, | ||
| delta_h=(end_date - start_date).total_seconds() / 3600, | ||
| ) | ||
| # Get the complete billing data for the requested period | ||
| existing_billing = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # Aggregate the billing data | ||
| total_value_eur = 0.0 | ||
| total_energy_term = 0.0 | ||
| total_power_term = 0.0 | ||
| total_others_term = 0.0 | ||
| total_surplus_term = 0.0 | ||
| total_hours = len(existing_billing) | ||
| for billing in existing_billing: | ||
| total_value_eur += billing.total_eur or 0.0 | ||
| total_energy_term += billing.energy_term or 0.0 | ||
| total_power_term += billing.power_term or 0.0 | ||
| total_others_term += billing.others_term or 0.0 | ||
| total_surplus_term += billing.surplus_term or 0.0 | ||
| result = PricingAggregated( | ||
| datetime=start_date, | ||
| value_eur=round(total_value_eur, 6), | ||
| energy_term=round(total_energy_term, 6), | ||
| power_term=round(total_power_term, 6), | ||
| others_term=round(total_others_term, 6), | ||
| surplus_term=round(total_surplus_term, 6), | ||
| delta_h=total_hours, | ||
| ) | ||
| _LOGGER.info( | ||
| f"Cost calculation completed for CUPS {cups}: " | ||
| f"€{total_value_eur:.2f} for {total_hours} hours" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting cost for CUPS {cups}: {str(e)}") | ||
| raise | ||
| async def update_missing_costs( | ||
| self, | ||
| cups: str, | ||
| pricing_rules: PricingRules, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| is_ceuta_melilla: bool = False, | ||
| force_recalculate: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Calculate and store billing costs in the database. | ||
| Args: | ||
| cups: CUPS identifier for consumption data | ||
| pricing_rules: Pricing configuration | ||
| start_date: Start date for cost calculation | ||
| end_date: End date for cost calculation | ||
| is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices | ||
| force_recalculate: If True, recalculate even if billing data exists | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| _LOGGER.info( | ||
| f"Updating costs for CUPS {cups} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| try: | ||
| # Generate pricing configuration hash | ||
| db_service = await self._get_db_service() | ||
| pricing_config_hash = db_service.generate_pricing_config_hash( | ||
| pricing_rules.model_dump() | ||
| ) | ||
| # Get existing billing data if not forcing recalculation | ||
| existing_billing = [] | ||
| if not force_recalculate: | ||
| existing_billing = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # Create set of existing datetime for quick lookup | ||
| existing_hours = {billing.datetime for billing in existing_billing} | ||
| # Get consumption data | ||
| consumptions = await db_service.get_consumptions(cups, start_date, end_date) | ||
| if not consumptions: | ||
| _LOGGER.warning( | ||
| f"No consumption data found for CUPS {cups} in the specified period" | ||
| ) | ||
| return { | ||
| "success": False, | ||
| "error": "No consumption data found", | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| }, | ||
| } | ||
| # Get contract data for power terms | ||
| contracts = await db_service.get_contracts(cups) | ||
| if not contracts: | ||
| _LOGGER.warning( | ||
| f"No contract data found for CUPS {cups}, using defaults" | ||
| ) | ||
| # Use default power values if no contracts found | ||
| default_contract = { | ||
| "power_p1": 3.45, # Default residential power | ||
| "power_p2": 3.45, | ||
| "date_start": start_date, | ||
| "date_end": end_date, | ||
| } | ||
| contracts = [type("MockContract", (), default_contract)()] | ||
| # Get pricing data | ||
| prices = await self.get_prices( | ||
| pricing_rules, start_date, end_date, is_ceuta_melilla | ||
| ) | ||
| if prices is None: | ||
| _LOGGER.warning( | ||
| f"No pricing data available for CUPS {cups} in the specified period" | ||
| ) | ||
| return { | ||
| "success": False, | ||
| "error": "No pricing data available", | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| }, | ||
| } | ||
| # Create price lookup by datetime | ||
| price_lookup = {price.datetime: price.value_eur_kwh for price in prices} | ||
| # Build data structure similar to billing processor | ||
| data = {} | ||
| for consumption in consumptions: | ||
| data[consumption.datetime] = { | ||
| "datetime": consumption.datetime, | ||
| "kwh": consumption.value_kwh, | ||
| "surplus_kwh": ( | ||
| consumption.surplus_kwh | ||
| if hasattr(consumption, "surplus_kwh") | ||
| and consumption.surplus_kwh is not None | ||
| else 0 | ||
| ), | ||
| } | ||
| # Add contract power data | ||
| for contract in contracts: | ||
| start_dt = getattr(contract, "date_start", start_date) | ||
| end_dt = getattr(contract, "date_end", end_date) | ||
| current = start_dt | ||
| while current <= end_dt and current <= end_date: | ||
| if current in data: | ||
| data[current]["p1_kw"] = getattr(contract, "power_p1", 3.45) | ||
| data[current]["p2_kw"] = getattr(contract, "power_p2", 3.45) | ||
| current += timedelta(hours=1) | ||
| # Add pricing data | ||
| for dt, kwh_eur in price_lookup.items(): | ||
| if dt in data: | ||
| data[dt]["kwh_eur"] = kwh_eur | ||
| # Prepare Jinja2 expressions for cost calculation | ||
| env = Environment() | ||
| energy_expr = env.compile_expression( | ||
| f"({pricing_rules.energy_formula})|float" | ||
| ) | ||
| power_expr = env.compile_expression( | ||
| f"({pricing_rules.power_formula})|float" | ||
| ) | ||
| others_expr = env.compile_expression( | ||
| f"({pricing_rules.others_formula})|float" | ||
| ) | ||
| surplus_expr = env.compile_expression( | ||
| f"({pricing_rules.surplus_formula})|float" | ||
| ) | ||
| main_expr = env.compile_expression(f"({pricing_rules.main_formula})|float") | ||
| # Calculate and save costs for each hour | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| skipped_count = 0 | ||
| for dt in sorted(data.keys()): | ||
| # Skip if already exists and not forcing recalculation | ||
| if not force_recalculate and dt in existing_hours: | ||
| skipped_count += 1 | ||
| continue | ||
| hour_data = data[dt] | ||
| # Add pricing rules to hour data | ||
| hour_data.update(pricing_rules.model_dump()) | ||
| # Import here to avoid circular imports | ||
| from edata.utils import get_pvpc_tariff | ||
| tariff = get_pvpc_tariff(hour_data["datetime"]) | ||
| # Set energy price if not already set | ||
| if "kwh_eur" not in hour_data: | ||
| if tariff == "p1" and pricing_rules.p1_kwh_eur is not None: | ||
| hour_data["kwh_eur"] = pricing_rules.p1_kwh_eur | ||
| elif tariff == "p2" and pricing_rules.p2_kwh_eur is not None: | ||
| hour_data["kwh_eur"] = pricing_rules.p2_kwh_eur | ||
| elif tariff == "p3" and pricing_rules.p3_kwh_eur is not None: | ||
| hour_data["kwh_eur"] = pricing_rules.p3_kwh_eur | ||
| else: | ||
| continue # Skip if no price available | ||
| # Set surplus price based on tariff | ||
| if tariff == "p1": | ||
| hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p1_kwh_eur or 0 | ||
| elif tariff == "p2": | ||
| hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p2_kwh_eur or 0 | ||
| elif tariff == "p3": | ||
| hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p3_kwh_eur or 0 | ||
| # Calculate individual cost terms | ||
| energy_term = 0.0 | ||
| power_term = 0.0 | ||
| others_term = 0.0 | ||
| surplus_term = 0.0 | ||
| with contextlib.suppress(Exception): | ||
| result = energy_expr(**hour_data) | ||
| energy_term = round(float(result), 6) if result is not None else 0.0 | ||
| result = power_expr(**hour_data) | ||
| power_term = round(float(result), 6) if result is not None else 0.0 | ||
| result = others_expr(**hour_data) | ||
| others_term = round(float(result), 6) if result is not None else 0.0 | ||
| result = surplus_expr(**hour_data) | ||
| surplus_term = ( | ||
| round(float(result), 6) if result is not None else 0.0 | ||
| ) | ||
| # Calculate total using main formula | ||
| cost_data = { | ||
| "energy_term": energy_term, | ||
| "power_term": power_term, | ||
| "others_term": others_term, | ||
| "surplus_term": surplus_term, | ||
| **pricing_rules.model_dump(), | ||
| } | ||
| total_eur = 0.0 | ||
| with contextlib.suppress(Exception): | ||
| result = main_expr(**cost_data) | ||
| total_eur = round(float(result), 6) if result is not None else 0.0 | ||
| # Prepare billing data (only calculated terms, not raw data) | ||
| billing_data = { | ||
| "cups": cups, | ||
| "datetime": dt, | ||
| "energy_term": energy_term, | ||
| "power_term": power_term, | ||
| "others_term": others_term, | ||
| "surplus_term": surplus_term, | ||
| "total_eur": total_eur, | ||
| "tariff": tariff, | ||
| "pricing_config_hash": pricing_config_hash, | ||
| } | ||
| # Save to database | ||
| await db_service.save_billing(billing_data) | ||
| if dt in existing_hours: | ||
| updated_count += 1 | ||
| else: | ||
| saved_count += 1 | ||
| result = { | ||
| "success": True, | ||
| "cups": cups, | ||
| "pricing_config_hash": pricing_config_hash, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "total_consumptions": len(consumptions), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| "skipped": skipped_count, | ||
| "processed": saved_count + updated_count, | ||
| }, | ||
| } | ||
| _LOGGER.info( | ||
| f"Billing cost update completed for CUPS {cups}: " | ||
| f"{saved_count} saved, {updated_count} updated, {skipped_count} skipped" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating costs for CUPS {cups}: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "error": str(e), | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| }, | ||
| } | ||
| async def get_daily_costs( | ||
| self, | ||
| cups: str, | ||
| pricing_rules: PricingRules, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| is_ceuta_melilla: bool = False, | ||
| ) -> List[PricingAggregated]: | ||
| """Get daily aggregated billing costs for a period. | ||
| Args: | ||
| cups: CUPS identifier for consumption data | ||
| pricing_rules: Pricing configuration | ||
| start_date: Start date for cost calculation | ||
| end_date: End date for cost calculation | ||
| is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices | ||
| Returns: | ||
| List of PricingAggregated objects, one per day | ||
| """ | ||
| _LOGGER.info( | ||
| f"Getting daily costs for CUPS {cups} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| try: | ||
| # Generate pricing configuration hash | ||
| db_service = await self._get_db_service() | ||
| pricing_config_hash = db_service.generate_pricing_config_hash( | ||
| pricing_rules.model_dump() | ||
| ) | ||
| # Get billing data for the period | ||
| billing_records = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # If no billing data exists, calculate and store it first | ||
| if not billing_records: | ||
| _LOGGER.info(f"No billing data found, calculating costs first") | ||
| update_result = await self.update_missing_costs( | ||
| cups, | ||
| pricing_rules, | ||
| start_date, | ||
| end_date, | ||
| is_ceuta_melilla, | ||
| force_recalculate=False, | ||
| ) | ||
| if not update_result["success"]: | ||
| _LOGGER.error( | ||
| f"Failed to update costs: {update_result.get('error', 'Unknown error')}" | ||
| ) | ||
| return [] | ||
| # Get the newly calculated billing data | ||
| billing_records = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # Group by day and aggregate | ||
| daily_aggregates = {} | ||
| for billing in billing_records: | ||
| # Get the date (without time) as key | ||
| date_key = billing.datetime.date() | ||
| if date_key not in daily_aggregates: | ||
| daily_aggregates[date_key] = { | ||
| "datetime": datetime.combine(date_key, datetime.min.time()), | ||
| "total_eur": 0.0, | ||
| "energy_term": 0.0, | ||
| "power_term": 0.0, | ||
| "others_term": 0.0, | ||
| "surplus_term": 0.0, | ||
| "hours": 0, | ||
| } | ||
| # Add this hour's costs | ||
| daily_aggregates[date_key]["total_eur"] += billing.total_eur or 0.0 | ||
| daily_aggregates[date_key]["energy_term"] += billing.energy_term or 0.0 | ||
| daily_aggregates[date_key]["power_term"] += billing.power_term or 0.0 | ||
| daily_aggregates[date_key]["others_term"] += billing.others_term or 0.0 | ||
| daily_aggregates[date_key]["surplus_term"] += ( | ||
| billing.surplus_term or 0.0 | ||
| ) | ||
| daily_aggregates[date_key]["hours"] += 1 | ||
| # Convert to PricingAggregated objects | ||
| result = [] | ||
| for date_key in sorted(daily_aggregates.keys()): | ||
| agg = daily_aggregates[date_key] | ||
| pricing_agg = PricingAggregated( | ||
| datetime=agg["datetime"], | ||
| value_eur=round(agg["total_eur"], 6), | ||
| energy_term=round(agg["energy_term"], 6), | ||
| power_term=round(agg["power_term"], 6), | ||
| others_term=round(agg["others_term"], 6), | ||
| surplus_term=round(agg["surplus_term"], 6), | ||
| delta_h=agg["hours"], | ||
| ) | ||
| result.append(pricing_agg) | ||
| _LOGGER.info(f"Generated {len(result)} daily cost aggregates") | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting daily costs for CUPS {cups}: {str(e)}") | ||
| raise | ||
| async def get_monthly_costs( | ||
| self, | ||
| cups: str, | ||
| pricing_rules: PricingRules, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| is_ceuta_melilla: bool = False, | ||
| ) -> List[PricingAggregated]: | ||
| """Get monthly aggregated billing costs for a period. | ||
| Args: | ||
| cups: CUPS identifier for consumption data | ||
| pricing_rules: Pricing configuration | ||
| start_date: Start date for cost calculation | ||
| end_date: End date for cost calculation | ||
| is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices | ||
| Returns: | ||
| List of PricingAggregated objects, one per month | ||
| """ | ||
| _LOGGER.info( | ||
| f"Getting monthly costs for CUPS {cups} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| try: | ||
| # Generate pricing configuration hash | ||
| db_service = await self._get_db_service() | ||
| pricing_config_hash = db_service.generate_pricing_config_hash( | ||
| pricing_rules.model_dump() | ||
| ) | ||
| # Get billing data for the period | ||
| billing_records = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # If no billing data exists, calculate and store it first | ||
| if not billing_records: | ||
| _LOGGER.info(f"No billing data found, calculating costs first") | ||
| update_result = await self.update_missing_costs( | ||
| cups, | ||
| pricing_rules, | ||
| start_date, | ||
| end_date, | ||
| is_ceuta_melilla, | ||
| force_recalculate=False, | ||
| ) | ||
| if not update_result["success"]: | ||
| _LOGGER.error( | ||
| f"Failed to update costs: {update_result.get('error', 'Unknown error')}" | ||
| ) | ||
| return [] | ||
| # Get the newly calculated billing data | ||
| billing_records = await db_service.get_billing( | ||
| cups=cups, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| pricing_config_hash=pricing_config_hash, | ||
| ) | ||
| # Group by month and aggregate | ||
| monthly_aggregates = {} | ||
| for billing in billing_records: | ||
| # Get year-month as key | ||
| month_key = (billing.datetime.year, billing.datetime.month) | ||
| if month_key not in monthly_aggregates: | ||
| # Create datetime for first day of month | ||
| month_start = datetime(month_key[0], month_key[1], 1) | ||
| monthly_aggregates[month_key] = { | ||
| "datetime": month_start, | ||
| "total_eur": 0.0, | ||
| "energy_term": 0.0, | ||
| "power_term": 0.0, | ||
| "others_term": 0.0, | ||
| "surplus_term": 0.0, | ||
| "hours": 0, | ||
| } | ||
| # Add this hour's costs | ||
| monthly_aggregates[month_key]["total_eur"] += billing.total_eur or 0.0 | ||
| monthly_aggregates[month_key]["energy_term"] += ( | ||
| billing.energy_term or 0.0 | ||
| ) | ||
| monthly_aggregates[month_key]["power_term"] += billing.power_term or 0.0 | ||
| monthly_aggregates[month_key]["others_term"] += ( | ||
| billing.others_term or 0.0 | ||
| ) | ||
| monthly_aggregates[month_key]["surplus_term"] += ( | ||
| billing.surplus_term or 0.0 | ||
| ) | ||
| monthly_aggregates[month_key]["hours"] += 1 | ||
| # Convert to PricingAggregated objects | ||
| result = [] | ||
| for month_key in sorted(monthly_aggregates.keys()): | ||
| agg = monthly_aggregates[month_key] | ||
| pricing_agg = PricingAggregated( | ||
| datetime=agg["datetime"], | ||
| value_eur=round(agg["total_eur"], 6), | ||
| energy_term=round(agg["energy_term"], 6), | ||
| power_term=round(agg["power_term"], 6), | ||
| others_term=round(agg["others_term"], 6), | ||
| surplus_term=round(agg["surplus_term"], 6), | ||
| delta_h=agg["hours"], | ||
| ) | ||
| result.append(pricing_agg) | ||
| _LOGGER.info(f"Generated {len(result)} monthly cost aggregates") | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting monthly costs for CUPS {cups}: {str(e)}") | ||
| raise | ||
| async def get_billing_summary( | ||
| self, | ||
| cups: str, | ||
| pricing_rules: PricingRules, | ||
| target_date: Optional[datetime] = None, | ||
| is_ceuta_melilla: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Get billing summary data compatible with EdataHelper attributes. | ||
| Args: | ||
| cups: CUPS identifier | ||
| pricing_rules: Pricing configuration | ||
| target_date: Reference date for calculations (defaults to today) | ||
| is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices | ||
| Returns: | ||
| Dict with summary attributes matching EdataHelper format | ||
| """ | ||
| from dateutil.relativedelta import relativedelta | ||
| if target_date is None: | ||
| target_date = datetime.now() | ||
| # Calculate date ranges | ||
| month_starts = target_date.replace( | ||
| day=1, hour=0, minute=0, second=0, microsecond=0 | ||
| ) | ||
| last_month_starts = month_starts - relativedelta(months=1) | ||
| # Initialize summary attributes | ||
| summary: Dict[str, Any] = {"month_€": None, "last_month_€": None} | ||
| try: | ||
| # Get current month cost | ||
| current_month_costs = await self.get_monthly_costs( | ||
| cups=cups, | ||
| pricing_rules=pricing_rules, | ||
| start_date=month_starts, | ||
| end_date=month_starts + relativedelta(months=1), | ||
| is_ceuta_melilla=is_ceuta_melilla, | ||
| ) | ||
| if current_month_costs: | ||
| current_month_data = next( | ||
| ( | ||
| c | ||
| for c in current_month_costs | ||
| if c.datetime.year == month_starts.year | ||
| and c.datetime.month == month_starts.month | ||
| ), | ||
| None, | ||
| ) | ||
| if current_month_data: | ||
| summary["month_€"] = current_month_data.value_eur | ||
| # Get last month cost | ||
| last_month_costs = await self.get_monthly_costs( | ||
| cups=cups, | ||
| pricing_rules=pricing_rules, | ||
| start_date=last_month_starts, | ||
| end_date=month_starts, | ||
| is_ceuta_melilla=is_ceuta_melilla, | ||
| ) | ||
| if last_month_costs: | ||
| last_month_data = next( | ||
| ( | ||
| c | ||
| for c in last_month_costs | ||
| if c.datetime.year == last_month_starts.year | ||
| and c.datetime.month == last_month_starts.month | ||
| ), | ||
| None, | ||
| ) | ||
| if last_month_data: | ||
| summary["last_month_€"] = last_month_data.value_eur | ||
| except Exception as e: | ||
| _LOGGER.warning( | ||
| f"Error calculating billing summary for CUPS {cups}: {str(e)}" | ||
| ) | ||
| # Round numeric values to 2 decimal places | ||
| for key, value in summary.items(): | ||
| if isinstance(value, float): | ||
| summary[key] = round(value, 2) | ||
| return summary |
| """Consumption service for fetching and updating consumption data.""" | ||
| import logging | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Dict, List, Optional | ||
| from edata.connectors.datadis import DatadisConnector | ||
| from edata.models.consumption import Consumption, ConsumptionAggregated | ||
| from edata.services.database import ConsumptionModel as DbConsumption | ||
| from edata.services.database import DatabaseService, get_database_service | ||
| from edata.utils import get_pvpc_tariff | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class ConsumptionService: | ||
| """Service for managing consumption data fetching and storage.""" | ||
| def __init__( | ||
| self, | ||
| datadis_connector: DatadisConnector, | ||
| storage_dir: Optional[str] = None, | ||
| ): | ||
| """Initialize consumption service. | ||
| Args: | ||
| datadis_connector: Configured Datadis connector instance | ||
| storage_dir: Directory for database and cache storage | ||
| """ | ||
| self._datadis = datadis_connector | ||
| self._storage_dir = storage_dir | ||
| self._db_service = None | ||
| async def _get_db_service(self) -> DatabaseService: | ||
| """Get database service, initializing if needed.""" | ||
| if self._db_service is None: | ||
| self._db_service = await get_database_service(self._storage_dir) | ||
| return self._db_service | ||
| async def update_consumptions( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str = "0", | ||
| point_type: int = 5, | ||
| authorized_nif: Optional[str] = None, | ||
| force_full_update: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Update consumption data for a CUPS in the specified date range. | ||
| Args: | ||
| cups: CUPS identifier | ||
| distributor_code: Distributor company code | ||
| start_date: Start date for consumption data | ||
| end_date: End date for consumption data | ||
| measurement_type: Type of measurement (default "0" for hourly) | ||
| point_type: Type of supply point (default 5) | ||
| authorized_nif: Authorized NIF if accessing on behalf of someone | ||
| force_full_update: If True, fetch all data ignoring existing records | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| _LOGGER.info( | ||
| f"Updating consumptions for CUPS {cups[-5:]:>5} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| # Determine actual start date based on existing data | ||
| actual_start_date = start_date | ||
| if not force_full_update: | ||
| last_consumption_date = await self.get_last_consumption_date(cups) | ||
| if last_consumption_date: | ||
| # Start from the day after the last consumption | ||
| actual_start_date = max( | ||
| start_date, last_consumption_date + timedelta(hours=1) | ||
| ) | ||
| _LOGGER.info( | ||
| f"Found existing data up to {last_consumption_date.date()}, fetching from {actual_start_date.date()}" | ||
| ) | ||
| else: | ||
| _LOGGER.info( | ||
| f"No existing consumption data found for CUPS {cups[-5:]:>5}, fetching all data" | ||
| ) | ||
| # If actual start date is beyond end date, no new data needed | ||
| if actual_start_date >= end_date: | ||
| _LOGGER.info( | ||
| f"No new consumption data needed for CUPS {cups[-5:]:>5} (up to date)" | ||
| ) | ||
| return { | ||
| "success": True, | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": 0, | ||
| "saved": 0, | ||
| "updated": 0, | ||
| "skipped": "up_to_date", | ||
| }, | ||
| "message": "Data is up to date", | ||
| } | ||
| try: | ||
| # Fetch consumption data from datadis (only missing data) | ||
| consumptions = await self._datadis.get_consumption_data( | ||
| cups=cups, | ||
| distributor_code=distributor_code, | ||
| start_date=actual_start_date, | ||
| end_date=end_date, | ||
| measurement_type=measurement_type, | ||
| point_type=point_type, | ||
| authorized_nif=authorized_nif, | ||
| ) | ||
| # Save to database | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| for consumption in consumptions: | ||
| # Convert Pydantic model to dict and add CUPS | ||
| consumption_dict = consumption.model_dump() | ||
| consumption_dict["cups"] = cups | ||
| # Check if consumption already exists | ||
| db_service = await self._get_db_service() | ||
| existing = await db_service.get_consumptions( | ||
| cups=cups, | ||
| start_date=consumption.datetime, | ||
| end_date=consumption.datetime, | ||
| ) | ||
| if existing: | ||
| updated_count += 1 | ||
| else: | ||
| saved_count += 1 | ||
| await db_service.save_consumption(consumption_dict) | ||
| result = { | ||
| "success": True, | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": len(consumptions), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| }, | ||
| } | ||
| if actual_start_date > start_date: | ||
| result["message"] = ( | ||
| f"Fetched only missing data from {actual_start_date.date()}" | ||
| ) | ||
| _LOGGER.info( | ||
| f"Consumption update completed: {len(consumptions)} fetched, " | ||
| f"{saved_count} saved, {updated_count} updated" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating consumptions for CUPS {cups}: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "cups": cups, | ||
| "error": str(e), | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": ( | ||
| actual_start_date.isoformat() | ||
| if "actual_start_date" in locals() | ||
| else start_date.isoformat() | ||
| ), | ||
| }, | ||
| } | ||
| async def update_consumption_range_by_months( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str = "0", | ||
| point_type: int = 5, | ||
| authorized_nif: Optional[str] = None, | ||
| force_full_update: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Update consumption data month by month to respect datadis limits. | ||
| Args: | ||
| cups: CUPS identifier | ||
| distributor_code: Distributor company code | ||
| start_date: Start date for consumption data | ||
| end_date: End date for consumption data | ||
| measurement_type: Type of measurement (default "0" for hourly) | ||
| point_type: Type of supply point (default 5) | ||
| authorized_nif: Authorized NIF if accessing on behalf of someone | ||
| force_full_update: If True, fetch all data ignoring existing records | ||
| Returns: | ||
| Dict with operation results and statistics for all months | ||
| """ | ||
| _LOGGER.info( | ||
| f"Updating consumption range for CUPS {cups[-5:]:>5} " | ||
| f"from {start_date.date()} to {end_date.date()} by months" | ||
| ) | ||
| results = [] | ||
| current_date = start_date | ||
| while current_date < end_date: | ||
| # Calculate month end | ||
| if current_date.month == 12: | ||
| month_end = current_date.replace( | ||
| year=current_date.year + 1, month=1, day=1 | ||
| ) | ||
| else: | ||
| month_end = current_date.replace(month=current_date.month + 1, day=1) | ||
| # Don't go past the requested end date | ||
| actual_end = min(month_end, end_date) | ||
| # Update consumptions for this month | ||
| consumption_result = await self.update_consumptions( | ||
| cups=cups, | ||
| distributor_code=distributor_code, | ||
| start_date=current_date, | ||
| end_date=actual_end, | ||
| measurement_type=measurement_type, | ||
| point_type=point_type, | ||
| authorized_nif=authorized_nif, | ||
| force_full_update=force_full_update, | ||
| ) | ||
| result_entry = { | ||
| "month": current_date.strftime("%Y-%m"), | ||
| "consumption": consumption_result, | ||
| } | ||
| results.append(result_entry) | ||
| current_date = month_end | ||
| # Calculate totals | ||
| total_consumptions_fetched = sum( | ||
| r["consumption"]["stats"]["fetched"] | ||
| for r in results | ||
| if r["consumption"]["success"] | ||
| ) | ||
| total_consumptions_saved = sum( | ||
| r["consumption"]["stats"]["saved"] | ||
| for r in results | ||
| if r["consumption"]["success"] | ||
| ) | ||
| total_consumptions_updated = sum( | ||
| r["consumption"]["stats"]["updated"] | ||
| for r in results | ||
| if r["consumption"]["success"] | ||
| ) | ||
| summary = { | ||
| "success": all(r["consumption"]["success"] for r in results), | ||
| "cups": cups, | ||
| "period": {"start": start_date.isoformat(), "end": end_date.isoformat()}, | ||
| "months_processed": len(results), | ||
| "total_stats": { | ||
| "consumptions_fetched": total_consumptions_fetched, | ||
| "consumptions_saved": total_consumptions_saved, | ||
| "consumptions_updated": total_consumptions_updated, | ||
| }, | ||
| "monthly_results": results, | ||
| } | ||
| _LOGGER.info( | ||
| f"Consumption range update completed: {len(results)} months processed, " | ||
| f"{total_consumptions_fetched} consumptions fetched" | ||
| ) | ||
| return summary | ||
| async def get_stored_consumptions( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> List[DbConsumption]: | ||
| """Get stored consumptions from database. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Optional start date filter | ||
| end_date: Optional end date filter | ||
| Returns: | ||
| List of database Consumption objects | ||
| """ | ||
| db_service = await self._get_db_service() | ||
| return await db_service.get_consumptions(cups, start_date, end_date) | ||
| async def get_last_consumption_date(self, cups: str) -> Optional[datetime]: | ||
| """Get the date of the last consumption record in the database. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Datetime of last consumption or None if no data exists | ||
| """ | ||
| db_service = await self._get_db_service() | ||
| latest_consumption = await db_service.get_latest_consumption(cups) | ||
| if latest_consumption: | ||
| return latest_consumption.datetime | ||
| return None | ||
| async def get_daily_consumptions( | ||
| self, cups: str, start_date: datetime, end_date: datetime | ||
| ) -> List[ConsumptionAggregated]: | ||
| """Calculate daily consumption aggregations. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Start date for aggregation | ||
| end_date: End date for aggregation | ||
| Returns: | ||
| List of daily consumption aggregations | ||
| """ | ||
| # Get hourly consumptions from database | ||
| db_service = await self._get_db_service() | ||
| db_consumptions = await db_service.get_consumptions(cups, start_date, end_date) | ||
| # Convert to Pydantic models for processing | ||
| consumptions = [] | ||
| for db_cons in db_consumptions: | ||
| cons = Consumption( | ||
| datetime=db_cons.datetime, | ||
| delta_h=db_cons.delta_h, | ||
| value_kwh=db_cons.value_kwh, | ||
| surplus_kwh=db_cons.surplus_kwh or 0.0, | ||
| real=db_cons.real or True, | ||
| ) | ||
| consumptions.append(cons) | ||
| # Sort by datetime | ||
| consumptions.sort(key=lambda x: x.datetime) | ||
| # Aggregate by day | ||
| daily_aggregations = {} | ||
| for consumption in consumptions: | ||
| curr_day = consumption.datetime.replace( | ||
| hour=0, minute=0, second=0, microsecond=0 | ||
| ) | ||
| # Determine tariff period | ||
| tariff = get_pvpc_tariff(consumption.datetime) | ||
| # Initialize daily aggregation if not exists | ||
| if curr_day not in daily_aggregations: | ||
| daily_aggregations[curr_day] = { | ||
| "datetime": curr_day, | ||
| "value_kwh": 0.0, | ||
| "value_p1_kwh": 0.0, | ||
| "value_p2_kwh": 0.0, | ||
| "value_p3_kwh": 0.0, | ||
| "surplus_kwh": 0.0, | ||
| "surplus_p1_kwh": 0.0, | ||
| "surplus_p2_kwh": 0.0, | ||
| "surplus_p3_kwh": 0.0, | ||
| "delta_h": 0.0, | ||
| } | ||
| # Add consumption values | ||
| daily_aggregations[curr_day]["value_kwh"] += consumption.value_kwh | ||
| daily_aggregations[curr_day]["surplus_kwh"] += consumption.surplus_kwh | ||
| daily_aggregations[curr_day]["delta_h"] += consumption.delta_h | ||
| # Add by tariff period | ||
| if tariff == "p1": | ||
| daily_aggregations[curr_day]["value_p1_kwh"] += consumption.value_kwh | ||
| daily_aggregations[curr_day][ | ||
| "surplus_p1_kwh" | ||
| ] += consumption.surplus_kwh | ||
| elif tariff == "p2": | ||
| daily_aggregations[curr_day]["value_p2_kwh"] += consumption.value_kwh | ||
| daily_aggregations[curr_day][ | ||
| "surplus_p2_kwh" | ||
| ] += consumption.surplus_kwh | ||
| elif tariff == "p3": | ||
| daily_aggregations[curr_day]["value_p3_kwh"] += consumption.value_kwh | ||
| daily_aggregations[curr_day][ | ||
| "surplus_p3_kwh" | ||
| ] += consumption.surplus_kwh | ||
| # Convert to ConsumptionAggregated objects and round values | ||
| result = [] | ||
| for day_data in sorted( | ||
| daily_aggregations.values(), key=lambda x: x["datetime"] | ||
| ): | ||
| # Round all float values to 2 decimal places | ||
| for key, value in day_data.items(): | ||
| if isinstance(value, float): | ||
| day_data[key] = round(value, 2) | ||
| aggregated = ConsumptionAggregated(**day_data) | ||
| result.append(aggregated) | ||
| return result | ||
| async def get_monthly_consumptions( | ||
| self, | ||
| cups: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| cycle_start_day: int = 1, | ||
| ) -> List[ConsumptionAggregated]: | ||
| """Calculate monthly consumption aggregations. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Start date for aggregation | ||
| end_date: End date for aggregation | ||
| cycle_start_day: Day of month when billing cycle starts (1-30) | ||
| Returns: | ||
| List of monthly consumption aggregations | ||
| """ | ||
| # Get hourly consumptions from database | ||
| db_service = await self._get_db_service() | ||
| db_consumptions = await db_service.get_consumptions(cups, start_date, end_date) | ||
| # Convert to Pydantic models for processing | ||
| consumptions = [] | ||
| for db_cons in db_consumptions: | ||
| cons = Consumption( | ||
| datetime=db_cons.datetime, | ||
| delta_h=db_cons.delta_h, | ||
| value_kwh=db_cons.value_kwh, | ||
| surplus_kwh=db_cons.surplus_kwh or 0.0, | ||
| real=db_cons.real or True, | ||
| ) | ||
| consumptions.append(cons) | ||
| # Sort by datetime | ||
| consumptions.sort(key=lambda x: x.datetime) | ||
| # Calculate cycle offset | ||
| cycle_offset = cycle_start_day - 1 | ||
| # Aggregate by month (considering billing cycle) | ||
| monthly_aggregations = {} | ||
| for consumption in consumptions: | ||
| curr_day = consumption.datetime.replace( | ||
| hour=0, minute=0, second=0, microsecond=0 | ||
| ) | ||
| # Adjust for billing cycle start day | ||
| billing_month_date = (curr_day - timedelta(days=cycle_offset)).replace( | ||
| day=1 | ||
| ) | ||
| # Determine tariff period | ||
| tariff = get_pvpc_tariff(consumption.datetime) | ||
| # Initialize monthly aggregation if not exists | ||
| if billing_month_date not in monthly_aggregations: | ||
| monthly_aggregations[billing_month_date] = { | ||
| "datetime": billing_month_date, | ||
| "value_kwh": 0.0, | ||
| "value_p1_kwh": 0.0, | ||
| "value_p2_kwh": 0.0, | ||
| "value_p3_kwh": 0.0, | ||
| "surplus_kwh": 0.0, | ||
| "surplus_p1_kwh": 0.0, | ||
| "surplus_p2_kwh": 0.0, | ||
| "surplus_p3_kwh": 0.0, | ||
| "delta_h": 0.0, | ||
| } | ||
| # Add consumption values | ||
| monthly_aggregations[billing_month_date][ | ||
| "value_kwh" | ||
| ] += consumption.value_kwh | ||
| monthly_aggregations[billing_month_date][ | ||
| "surplus_kwh" | ||
| ] += consumption.surplus_kwh | ||
| monthly_aggregations[billing_month_date]["delta_h"] += consumption.delta_h | ||
| # Add by tariff period | ||
| if tariff == "p1": | ||
| monthly_aggregations[billing_month_date][ | ||
| "value_p1_kwh" | ||
| ] += consumption.value_kwh | ||
| monthly_aggregations[billing_month_date][ | ||
| "surplus_p1_kwh" | ||
| ] += consumption.surplus_kwh | ||
| elif tariff == "p2": | ||
| monthly_aggregations[billing_month_date][ | ||
| "value_p2_kwh" | ||
| ] += consumption.value_kwh | ||
| monthly_aggregations[billing_month_date][ | ||
| "surplus_p2_kwh" | ||
| ] += consumption.surplus_kwh | ||
| elif tariff == "p3": | ||
| monthly_aggregations[billing_month_date][ | ||
| "value_p3_kwh" | ||
| ] += consumption.value_kwh | ||
| monthly_aggregations[billing_month_date][ | ||
| "surplus_p3_kwh" | ||
| ] += consumption.surplus_kwh | ||
| # Convert to ConsumptionAggregated objects and round values | ||
| result = [] | ||
| for month_data in sorted( | ||
| monthly_aggregations.values(), key=lambda x: x["datetime"] | ||
| ): | ||
| # Round all float values to 2 decimal places | ||
| for key, value in month_data.items(): | ||
| if isinstance(value, float): | ||
| month_data[key] = round(value, 2) | ||
| aggregated = ConsumptionAggregated(**month_data) | ||
| result.append(aggregated) | ||
| return result | ||
| async def get_consumption_summary( | ||
| self, cups: str, target_date: Optional[datetime] = None | ||
| ) -> Dict[str, Any]: | ||
| """Get consumption summary data compatible with EdataHelper attributes. | ||
| Args: | ||
| cups: CUPS identifier | ||
| target_date: Reference date for calculations (defaults to today) | ||
| Returns: | ||
| Dict with summary attributes matching EdataHelper format | ||
| """ | ||
| from datetime import timedelta | ||
| from dateutil.relativedelta import relativedelta | ||
| if target_date is None: | ||
| target_date = datetime.now() | ||
| # Calculate date ranges | ||
| today_starts = target_date.replace(hour=0, minute=0, second=0, microsecond=0) | ||
| yesterday_starts = today_starts - timedelta(days=1) | ||
| month_starts = target_date.replace( | ||
| day=1, hour=0, minute=0, second=0, microsecond=0 | ||
| ) | ||
| last_month_starts = month_starts - relativedelta(months=1) | ||
| # Get daily and monthly aggregations | ||
| daily_consumptions = await self.get_daily_consumptions( | ||
| cups=cups, start_date=yesterday_starts, end_date=today_starts | ||
| ) | ||
| monthly_consumptions = await self.get_monthly_consumptions( | ||
| cups=cups, | ||
| start_date=last_month_starts, | ||
| end_date=month_starts + relativedelta(months=1), | ||
| ) | ||
| # Get all consumptions to find last registered data | ||
| all_consumptions = await self.get_stored_consumptions(cups=cups) | ||
| # Initialize summary attributes | ||
| summary: Dict[str, Any] = { | ||
| # Yesterday consumption | ||
| "yesterday_kWh": None, | ||
| "yesterday_hours": None, | ||
| "yesterday_p1_kWh": None, | ||
| "yesterday_p2_kWh": None, | ||
| "yesterday_p3_kWh": None, | ||
| "yesterday_surplus_kWh": None, | ||
| "yesterday_surplus_p1_kWh": None, | ||
| "yesterday_surplus_p2_kWh": None, | ||
| "yesterday_surplus_p3_kWh": None, | ||
| # Current month consumption | ||
| "month_kWh": None, | ||
| "month_surplus_kWh": None, | ||
| "month_days": None, | ||
| "month_daily_kWh": None, | ||
| "month_p1_kWh": None, | ||
| "month_p2_kWh": None, | ||
| "month_p3_kWh": None, | ||
| "month_surplus_p1_kWh": None, | ||
| "month_surplus_p2_kWh": None, | ||
| "month_surplus_p3_kWh": None, | ||
| # Last month consumption | ||
| "last_month_kWh": None, | ||
| "last_month_surplus_kWh": None, | ||
| "last_month_days": None, | ||
| "last_month_daily_kWh": None, | ||
| "last_month_p1_kWh": None, | ||
| "last_month_p2_kWh": None, | ||
| "last_month_p3_kWh": None, | ||
| "last_month_surplus_p1_kWh": None, | ||
| "last_month_surplus_p2_kWh": None, | ||
| "last_month_surplus_p3_kWh": None, | ||
| # Last registered data | ||
| "last_registered_date": None, | ||
| "last_registered_day_kWh": None, | ||
| "last_registered_day_surplus_kWh": None, | ||
| "last_registered_day_hours": None, | ||
| "last_registered_day_p1_kWh": None, | ||
| "last_registered_day_p2_kWh": None, | ||
| "last_registered_day_p3_kWh": None, | ||
| "last_registered_day_surplus_p1_kWh": None, | ||
| "last_registered_day_surplus_p2_kWh": None, | ||
| "last_registered_day_surplus_p3_kWh": None, | ||
| } | ||
| # Fill yesterday data | ||
| yesterday_data = next( | ||
| ( | ||
| d | ||
| for d in daily_consumptions | ||
| if d.datetime.date() == yesterday_starts.date() | ||
| ), | ||
| None, | ||
| ) | ||
| if yesterday_data: | ||
| summary["yesterday_kWh"] = yesterday_data.value_kwh | ||
| summary["yesterday_hours"] = yesterday_data.delta_h | ||
| summary["yesterday_p1_kWh"] = yesterday_data.value_p1_kwh | ||
| summary["yesterday_p2_kWh"] = yesterday_data.value_p2_kwh | ||
| summary["yesterday_p3_kWh"] = yesterday_data.value_p3_kwh | ||
| summary["yesterday_surplus_kWh"] = yesterday_data.surplus_kwh | ||
| summary["yesterday_surplus_p1_kWh"] = yesterday_data.surplus_p1_kwh | ||
| summary["yesterday_surplus_p2_kWh"] = yesterday_data.surplus_p2_kwh | ||
| summary["yesterday_surplus_p3_kWh"] = yesterday_data.surplus_p3_kwh | ||
| # Fill current month data | ||
| current_month_data = next( | ||
| ( | ||
| m | ||
| for m in monthly_consumptions | ||
| if m.datetime.year == month_starts.year | ||
| and m.datetime.month == month_starts.month | ||
| ), | ||
| None, | ||
| ) | ||
| if current_month_data: | ||
| summary["month_kWh"] = current_month_data.value_kwh | ||
| summary["month_surplus_kWh"] = current_month_data.surplus_kwh | ||
| summary["month_days"] = ( | ||
| current_month_data.delta_h / 24 if current_month_data.delta_h else None | ||
| ) | ||
| summary["month_daily_kWh"] = ( | ||
| (current_month_data.value_kwh / (current_month_data.delta_h / 24)) | ||
| if current_month_data.delta_h and current_month_data.delta_h > 0 | ||
| else None | ||
| ) | ||
| summary["month_p1_kWh"] = current_month_data.value_p1_kwh | ||
| summary["month_p2_kWh"] = current_month_data.value_p2_kwh | ||
| summary["month_p3_kWh"] = current_month_data.value_p3_kwh | ||
| summary["month_surplus_p1_kWh"] = current_month_data.surplus_p1_kwh | ||
| summary["month_surplus_p2_kWh"] = current_month_data.surplus_p2_kwh | ||
| summary["month_surplus_p3_kWh"] = current_month_data.surplus_p3_kwh | ||
| # Fill last month data | ||
| last_month_data = next( | ||
| ( | ||
| m | ||
| for m in monthly_consumptions | ||
| if m.datetime.year == last_month_starts.year | ||
| and m.datetime.month == last_month_starts.month | ||
| ), | ||
| None, | ||
| ) | ||
| if last_month_data: | ||
| summary["last_month_kWh"] = last_month_data.value_kwh | ||
| summary["last_month_surplus_kWh"] = last_month_data.surplus_kwh | ||
| summary["last_month_days"] = ( | ||
| last_month_data.delta_h / 24 if last_month_data.delta_h else None | ||
| ) | ||
| summary["last_month_daily_kWh"] = ( | ||
| (last_month_data.value_kwh / (last_month_data.delta_h / 24)) | ||
| if last_month_data.delta_h and last_month_data.delta_h > 0 | ||
| else None | ||
| ) | ||
| summary["last_month_p1_kWh"] = last_month_data.value_p1_kwh | ||
| summary["last_month_p2_kWh"] = last_month_data.value_p2_kwh | ||
| summary["last_month_p3_kWh"] = last_month_data.value_p3_kwh | ||
| summary["last_month_surplus_p1_kWh"] = last_month_data.surplus_p1_kwh | ||
| summary["last_month_surplus_p2_kWh"] = last_month_data.surplus_p2_kwh | ||
| summary["last_month_surplus_p3_kWh"] = last_month_data.surplus_p3_kwh | ||
| # Fill last registered data | ||
| if all_consumptions: | ||
| # Sort by datetime and get the last one | ||
| last_consumption = max(all_consumptions, key=lambda c: c.datetime) | ||
| summary["last_registered_date"] = last_consumption.datetime | ||
| # Get the last day's aggregated data | ||
| last_day_start = last_consumption.datetime.replace( | ||
| hour=0, minute=0, second=0, microsecond=0 | ||
| ) | ||
| last_day_end = last_day_start + timedelta(days=1) | ||
| last_day_daily = await self.get_daily_consumptions( | ||
| cups=cups, start_date=last_day_start, end_date=last_day_end | ||
| ) | ||
| if last_day_daily: | ||
| last_day_data = last_day_daily[0] | ||
| summary["last_registered_day_kWh"] = last_day_data.value_kwh | ||
| summary["last_registered_day_surplus_kWh"] = last_day_data.surplus_kwh | ||
| summary["last_registered_day_hours"] = last_day_data.delta_h | ||
| summary["last_registered_day_p1_kWh"] = last_day_data.value_p1_kwh | ||
| summary["last_registered_day_p2_kWh"] = last_day_data.value_p2_kwh | ||
| summary["last_registered_day_p3_kWh"] = last_day_data.value_p3_kwh | ||
| summary["last_registered_day_surplus_p1_kWh"] = ( | ||
| last_day_data.surplus_p1_kwh | ||
| ) | ||
| summary["last_registered_day_surplus_p2_kWh"] = ( | ||
| last_day_data.surplus_p2_kwh | ||
| ) | ||
| summary["last_registered_day_surplus_p3_kWh"] = ( | ||
| last_day_data.surplus_p3_kwh | ||
| ) | ||
| # Round numeric values to 2 decimal places | ||
| for key, value in summary.items(): | ||
| if isinstance(value, float): | ||
| summary[key] = round(value, 2) | ||
| return summary |
| """Contract service for fetching and managing contract data.""" | ||
| import logging | ||
| from datetime import datetime | ||
| from typing import Any, Dict, List, Optional | ||
| from edata.connectors.datadis import DatadisConnector | ||
| from edata.services.database import ContractModel, DatabaseService, get_database_service | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class ContractService: | ||
| """Service for managing contract data fetching and storage.""" | ||
| def __init__( | ||
| self, | ||
| datadis_connector: DatadisConnector, | ||
| storage_dir: Optional[str] = None, | ||
| ): | ||
| """Initialize contract service. | ||
| Args: | ||
| datadis_connector: Configured Datadis connector instance | ||
| storage_dir: Directory for database and cache storage | ||
| """ | ||
| self._datadis = datadis_connector | ||
| self._storage_dir = storage_dir | ||
| self._db_service = None | ||
| async def _get_db_service(self) -> DatabaseService: | ||
| """Get database service, initializing if needed.""" | ||
| if self._db_service is None: | ||
| self._db_service = await get_database_service(self._storage_dir) | ||
| return self._db_service | ||
| async def update_contracts( | ||
| self, cups: str, distributor_code: str, authorized_nif: Optional[str] = None | ||
| ) -> Dict[str, Any]: | ||
| """Update contract data for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| distributor_code: Distributor code for the CUPS | ||
| authorized_nif: Optional authorized NIF for access | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| _LOGGER.info(f"Updating contracts for CUPS {cups[-5:]}") | ||
| try: | ||
| # Fetch contract data from Datadis | ||
| contracts_data = await self._datadis.get_contract_detail( | ||
| cups=cups, | ||
| distributor_code=distributor_code, | ||
| authorized_nif=authorized_nif, | ||
| ) | ||
| if not contracts_data: | ||
| _LOGGER.warning(f"No contract data found for CUPS {cups[-5:]}") | ||
| return { | ||
| "success": True, | ||
| "stats": { | ||
| "fetched": 0, | ||
| "saved": 0, | ||
| "updated": 0, | ||
| "total_stored": 0, | ||
| }, | ||
| } | ||
| # Get existing contracts to avoid duplicates | ||
| db_service = await self._get_db_service() | ||
| existing = await db_service.get_contracts(cups=cups) | ||
| existing_periods = {(c.date_start, c.date_end) for c in existing} | ||
| # Save contracts to database | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| for contract in contracts_data: | ||
| contract_dict = contract.model_dump() | ||
| contract_dict["cups"] = cups | ||
| # Check if this contract period already exists | ||
| period_key = (contract.date_start, contract.date_end) | ||
| if period_key in existing_periods: | ||
| updated_count += 1 | ||
| _LOGGER.debug( | ||
| f"Updating existing contract for CUPS {cups[-5:]} " | ||
| f"period {contract.date_start.date()}-{contract.date_end.date()}" | ||
| ) | ||
| else: | ||
| saved_count += 1 | ||
| _LOGGER.debug( | ||
| f"Saving new contract for CUPS {cups[-5:]} " | ||
| f"period {contract.date_start.date()}-{contract.date_end.date()}" | ||
| ) | ||
| # Save to database | ||
| await db_service.save_contract(contract_dict) | ||
| # Get total contracts stored for this CUPS | ||
| all_contracts = await db_service.get_contracts(cups=cups) | ||
| total_stored = len(all_contracts) | ||
| result = { | ||
| "success": True, | ||
| "stats": { | ||
| "fetched": len(contracts_data), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| "total_stored": total_stored, | ||
| }, | ||
| } | ||
| _LOGGER.info( | ||
| f"Contract update completed for CUPS {cups[-5:]}: " | ||
| f"{len(contracts_data)} fetched, {saved_count} saved, {updated_count} updated" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating contracts for CUPS {cups[-5:]}: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "error": str(e), | ||
| "stats": {"fetched": 0, "saved": 0, "updated": 0, "total_stored": 0}, | ||
| } | ||
| async def get_contracts( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> List[ContractModel]: | ||
| """Get stored contract data for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Optional start date filter | ||
| end_date: Optional end date filter | ||
| Returns: | ||
| List of Contract objects | ||
| """ | ||
| _LOGGER.debug( | ||
| f"Getting contracts for CUPS {cups[-5:]}" | ||
| f"{f' from {start_date.date()}' if start_date else ''}" | ||
| f"{f' to {end_date.date()}' if end_date else ''}" | ||
| ) | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| contracts = await db_service.get_contracts(cups=cups) | ||
| # Apply date filters if provided | ||
| if start_date or end_date: | ||
| filtered_contracts = [] | ||
| for contract in contracts: | ||
| # Check if contract period overlaps with requested period | ||
| if start_date and contract.date_end < start_date: | ||
| continue | ||
| if end_date and contract.date_start > end_date: | ||
| continue | ||
| filtered_contracts.append(contract) | ||
| contracts = filtered_contracts | ||
| _LOGGER.debug(f"Found {len(contracts)} contracts for CUPS {cups[-5:]}") | ||
| return contracts | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting contracts for CUPS {cups[-5:]}: {str(e)}") | ||
| return [] | ||
| async def get_active_contract( | ||
| self, cups: str, reference_date: Optional[datetime] = None | ||
| ) -> Optional[ContractModel]: | ||
| """Get the active contract for a CUPS at a specific date. | ||
| Args: | ||
| cups: CUPS identifier | ||
| reference_date: Date to check for active contract (defaults to now) | ||
| Returns: | ||
| Active contract if found, None otherwise | ||
| """ | ||
| if reference_date is None: | ||
| reference_date = datetime.now() | ||
| _LOGGER.debug( | ||
| f"Getting active contract for CUPS {cups[-5:]} at {reference_date.date()}" | ||
| ) | ||
| try: | ||
| contracts = await self.get_contracts(cups=cups) | ||
| for contract in contracts: | ||
| if contract.date_start <= reference_date <= contract.date_end: | ||
| _LOGGER.debug( | ||
| f"Found active contract for CUPS {cups[-5:]} " | ||
| f"period {contract.date_start.date()}-{contract.date_end.date()}" | ||
| ) | ||
| return contract | ||
| _LOGGER.warning( | ||
| f"No active contract found for CUPS {cups[-5:]} at {reference_date.date()}" | ||
| ) | ||
| return None | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting active contract for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return None | ||
| async def get_latest_contract(self, cups: str) -> Optional[ContractModel]: | ||
| """Get the most recent contract for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Latest contract if found, None otherwise | ||
| """ | ||
| _LOGGER.debug(f"Getting latest contract for CUPS {cups[-5:]}") | ||
| try: | ||
| contracts = await self.get_contracts(cups=cups) | ||
| if not contracts: | ||
| _LOGGER.warning(f"No contracts found for CUPS {cups[-5:]}") | ||
| return None | ||
| # Sort by end date descending to get the most recent | ||
| latest_contract = max(contracts, key=lambda c: c.date_end) | ||
| _LOGGER.debug( | ||
| f"Found latest contract for CUPS {cups[-5:]} " | ||
| f"period {latest_contract.date_start.date()}-{latest_contract.date_end.date()}" | ||
| ) | ||
| return latest_contract | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting latest contract for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return None | ||
| async def get_contract_summary(self, cups: str) -> Dict[str, Any]: | ||
| """Get contract summary attributes for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Dict with contract summary attributes | ||
| """ | ||
| _LOGGER.debug(f"Getting contract summary for CUPS {cups[-5:]}") | ||
| try: | ||
| # Get the most recent contract | ||
| latest_contract = await self.get_latest_contract(cups) | ||
| if not latest_contract: | ||
| _LOGGER.warning(f"No contracts found for CUPS {cups[-5:]}") | ||
| return { | ||
| "contract_p1_kW": None, | ||
| "contract_p2_kW": None, | ||
| } | ||
| summary = { | ||
| "contract_p1_kW": latest_contract.power_p1, | ||
| "contract_p2_kW": latest_contract.power_p2, | ||
| # Add other contract-related summary attributes here as needed | ||
| } | ||
| _LOGGER.debug(f"Contract summary calculated for CUPS {cups[-5:]}") | ||
| return summary | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting contract summary for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return { | ||
| "contract_p1_kW": None, | ||
| "contract_p2_kW": None, | ||
| } | ||
| async def get_contract_stats(self, cups: str) -> Dict[str, Any]: | ||
| """Get statistics about contracts for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Dict with contract statistics | ||
| """ | ||
| _LOGGER.debug(f"Getting contract statistics for CUPS {cups[-5:]}") | ||
| try: | ||
| contracts = await self.get_contracts(cups=cups) | ||
| if not contracts: | ||
| return { | ||
| "total_contracts": 0, | ||
| "date_range": None, | ||
| "power_ranges": {}, | ||
| } | ||
| # Calculate date range | ||
| earliest_start = min(c.date_start for c in contracts) | ||
| latest_end = max(c.date_end for c in contracts) | ||
| # Calculate power ranges | ||
| p1_powers = [c.power_p1 for c in contracts if c.power_p1 is not None] | ||
| p2_powers = [c.power_p2 for c in contracts if c.power_p2 is not None] | ||
| power_ranges = {} | ||
| if p1_powers: | ||
| power_ranges["p1_kw"] = {"min": min(p1_powers), "max": max(p1_powers)} | ||
| if p2_powers: | ||
| power_ranges["p2_kw"] = {"min": min(p2_powers), "max": max(p2_powers)} | ||
| stats = { | ||
| "total_contracts": len(contracts), | ||
| "date_range": { | ||
| "earliest_start": earliest_start, | ||
| "latest_end": latest_end, | ||
| }, | ||
| "power_ranges": power_ranges, | ||
| } | ||
| _LOGGER.debug(f"Contract statistics calculated for CUPS {cups[-5:]}") | ||
| return stats | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting contract statistics for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return {} |
| """Database service for edata using SQLModel and SQLite with async support.""" | ||
| import hashlib | ||
| import os | ||
| from datetime import datetime as DateTime | ||
| from typing import List, Optional | ||
| from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine | ||
| from sqlmodel import SQLModel, desc, select | ||
| from edata.const import DEFAULT_STORAGE_DIR | ||
| from edata.models.database import ( | ||
| BillingModel, | ||
| ConsumptionModel, | ||
| ContractModel, | ||
| MaxPowerModel, | ||
| PVPCPricesModel, | ||
| SupplyModel, | ||
| ) | ||
| class DatabaseService: | ||
| """Service for managing the SQLite database with async support.""" | ||
| def __init__(self, storage_dir: Optional[str] = None): | ||
| """Initialize database service. | ||
| Args: | ||
| storage_dir: Directory to store database, defaults to same as cache | ||
| """ | ||
| if storage_dir is None: | ||
| storage_dir = DEFAULT_STORAGE_DIR | ||
| self._db_dir = os.path.join(storage_dir) | ||
| os.makedirs(self._db_dir, exist_ok=True) | ||
| db_path = os.path.join(self._db_dir, "edata.db") | ||
| # Use aiosqlite for async SQLite operations | ||
| self._engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}") | ||
| async def create_tables(self): | ||
| """Create tables asynchronously.""" | ||
| async with self._engine.begin() as conn: | ||
| await conn.run_sync(SQLModel.metadata.create_all) | ||
| def get_session(self) -> AsyncSession: | ||
| """Get an async database session.""" | ||
| return AsyncSession(self._engine) | ||
| async def save_supply(self, supply_data: dict) -> SupplyModel: | ||
| """Save or update a supply record.""" | ||
| async with self.get_session() as session: | ||
| # Check if supply exists | ||
| existing = await session.get(SupplyModel, supply_data["cups"]) | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in supply_data.items(): | ||
| if hasattr(existing, key) and key != "cups": | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| supply = SupplyModel(**supply_data) | ||
| session.add(supply) | ||
| await session.commit() | ||
| await session.refresh(supply) | ||
| return supply | ||
| async def save_contract(self, contract_data: dict) -> ContractModel: | ||
| """Save or update a contract record.""" | ||
| async with self.get_session() as session: | ||
| # Check if contract exists (by cups + date_start) | ||
| stmt = select(ContractModel).where( | ||
| ContractModel.cups == contract_data["cups"], | ||
| ContractModel.date_start == contract_data["date_start"], | ||
| ) | ||
| result = await session.execute(stmt) | ||
| existing = result.scalar_one_or_none() | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in contract_data.items(): | ||
| if hasattr(existing, key): | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| contract = ContractModel(**contract_data) | ||
| session.add(contract) | ||
| await session.commit() | ||
| await session.refresh(contract) | ||
| return contract | ||
| async def save_consumption(self, consumption_data: dict) -> ConsumptionModel: | ||
| """Save or update a consumption record.""" | ||
| async with self.get_session() as session: | ||
| # Check if consumption exists (by cups + datetime) | ||
| stmt = select(ConsumptionModel).where( | ||
| ConsumptionModel.cups == consumption_data["cups"], | ||
| ConsumptionModel.datetime == consumption_data["datetime"], | ||
| ) | ||
| result = await session.execute(stmt) | ||
| existing = result.scalar_one_or_none() | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in consumption_data.items(): | ||
| if hasattr(existing, key): | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| consumption = ConsumptionModel(**consumption_data) | ||
| session.add(consumption) | ||
| await session.commit() | ||
| await session.refresh(consumption) | ||
| return consumption | ||
| async def save_maxpower(self, maxpower_data: dict) -> MaxPowerModel: | ||
| """Save or update a maxpower record.""" | ||
| async with self.get_session() as session: | ||
| # Check if maxpower exists (by cups + datetime) | ||
| stmt = select(MaxPowerModel).where( | ||
| MaxPowerModel.cups == maxpower_data["cups"], | ||
| MaxPowerModel.datetime == maxpower_data["datetime"], | ||
| ) | ||
| result = await session.execute(stmt) | ||
| existing = result.scalar_one_or_none() | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in maxpower_data.items(): | ||
| if hasattr(existing, key): | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| maxpower = MaxPowerModel(**maxpower_data) | ||
| session.add(maxpower) | ||
| await session.commit() | ||
| await session.refresh(maxpower) | ||
| return maxpower | ||
| async def get_supply(self, cups: str) -> Optional[SupplyModel]: | ||
| """Get a supply by CUPS.""" | ||
| async with self.get_session() as session: | ||
| return await session.get(SupplyModel, cups) | ||
| async def get_supplies(self, cups: Optional[str] = None) -> List[SupplyModel]: | ||
| """Get supplies, optionally filtered by CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(SupplyModel) | ||
| if cups: | ||
| stmt = stmt.where(SupplyModel.cups == cups) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_supply( | ||
| self, cups: Optional[str] = None | ||
| ) -> Optional[SupplyModel]: | ||
| """Get the most recently updated supply, optionally filtered by CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(SupplyModel) | ||
| if cups: | ||
| stmt = stmt.where(SupplyModel.cups == cups) | ||
| stmt = stmt.order_by(desc(SupplyModel.updated_at)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def get_contracts(self, cups: str) -> List[ContractModel]: | ||
| """Get all contracts for a CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(ContractModel).where(ContractModel.cups == cups) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_contract(self, cups: str) -> Optional[ContractModel]: | ||
| """Get the most recently started contract for a CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(ContractModel).where(ContractModel.cups == cups) | ||
| stmt = stmt.order_by(desc(ContractModel.date_start)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def get_consumptions( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[DateTime] = None, | ||
| end_date: Optional[DateTime] = None, | ||
| ) -> List[ConsumptionModel]: | ||
| """Get consumptions for a CUPS within date range.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(ConsumptionModel).where(ConsumptionModel.cups == cups) | ||
| if start_date: | ||
| stmt = stmt.where(ConsumptionModel.datetime >= start_date) | ||
| if end_date: | ||
| stmt = stmt.where(ConsumptionModel.datetime <= end_date) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_consumption(self, cups: str) -> Optional[ConsumptionModel]: | ||
| """Get the most recent consumption record for a CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(ConsumptionModel).where(ConsumptionModel.cups == cups) | ||
| stmt = stmt.order_by(desc(ConsumptionModel.datetime)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def get_maxpower_readings( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[DateTime] = None, | ||
| end_date: Optional[DateTime] = None, | ||
| ) -> List[MaxPowerModel]: | ||
| """Get maxpower readings for a CUPS within date range.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(MaxPowerModel).where(MaxPowerModel.cups == cups) | ||
| if start_date: | ||
| stmt = stmt.where(MaxPowerModel.datetime >= start_date) | ||
| if end_date: | ||
| stmt = stmt.where(MaxPowerModel.datetime <= end_date) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_maxpower(self, cups: str) -> Optional[MaxPowerModel]: | ||
| """Get the most recent maxpower reading for a CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(MaxPowerModel).where(MaxPowerModel.cups == cups) | ||
| stmt = stmt.order_by(desc(MaxPowerModel.datetime)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def save_pvpc_price(self, price_data: dict) -> PVPCPricesModel: | ||
| """Save or update a PVPC price record.""" | ||
| async with self.get_session() as session: | ||
| # Check if price exists (by datetime and geo_id) | ||
| stmt = select(PVPCPricesModel).where( | ||
| PVPCPricesModel.datetime == price_data["datetime"] | ||
| ) | ||
| if "geo_id" in price_data: | ||
| stmt = stmt.where(PVPCPricesModel.geo_id == price_data["geo_id"]) | ||
| result = await session.execute(stmt) | ||
| existing = result.scalar_one_or_none() | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in price_data.items(): | ||
| if hasattr(existing, key): | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| price = PVPCPricesModel(**price_data) | ||
| session.add(price) | ||
| await session.commit() | ||
| await session.refresh(price) | ||
| return price | ||
| async def get_pvpc_prices( | ||
| self, | ||
| start_date: Optional[DateTime] = None, | ||
| end_date: Optional[DateTime] = None, | ||
| geo_id: Optional[int] = None, | ||
| ) -> List[PVPCPricesModel]: | ||
| """Get PVPC prices within date range.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(PVPCPricesModel) | ||
| if start_date: | ||
| stmt = stmt.where(PVPCPricesModel.datetime >= start_date) | ||
| if end_date: | ||
| stmt = stmt.where(PVPCPricesModel.datetime <= end_date) | ||
| if geo_id is not None: | ||
| stmt = stmt.where(PVPCPricesModel.geo_id == geo_id) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_pvpc_price( | ||
| self, geo_id: Optional[int] = None | ||
| ) -> Optional[PVPCPricesModel]: | ||
| """Get the most recent PVPC price, optionally filtered by geo_id.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(PVPCPricesModel) | ||
| if geo_id is not None: | ||
| stmt = stmt.where(PVPCPricesModel.geo_id == geo_id) | ||
| stmt = stmt.order_by(desc(PVPCPricesModel.datetime)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def save_billing(self, billing_data: dict) -> BillingModel: | ||
| """Save or update a billing record.""" | ||
| async with self.get_session() as session: | ||
| # Check if billing exists (by cups + datetime + pricing_config_hash) | ||
| stmt = select(BillingModel).where( | ||
| BillingModel.cups == billing_data["cups"], | ||
| BillingModel.datetime == billing_data["datetime"], | ||
| BillingModel.pricing_config_hash == billing_data["pricing_config_hash"], | ||
| ) | ||
| result = await session.execute(stmt) | ||
| existing = result.scalar_one_or_none() | ||
| if existing: | ||
| # Update existing record | ||
| for key, value in billing_data.items(): | ||
| if hasattr(existing, key): | ||
| setattr(existing, key, value) | ||
| existing.updated_at = DateTime.now() | ||
| session.add(existing) | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| return existing | ||
| else: | ||
| # Create new record | ||
| billing = BillingModel(**billing_data) | ||
| session.add(billing) | ||
| await session.commit() | ||
| await session.refresh(billing) | ||
| return billing | ||
| async def get_billing( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[DateTime] = None, | ||
| end_date: Optional[DateTime] = None, | ||
| pricing_config_hash: Optional[str] = None, | ||
| ) -> List[BillingModel]: | ||
| """Get billing records for a CUPS within date range.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(BillingModel).where(BillingModel.cups == cups) | ||
| if start_date: | ||
| stmt = stmt.where(BillingModel.datetime >= start_date) | ||
| if end_date: | ||
| stmt = stmt.where(BillingModel.datetime <= end_date) | ||
| if pricing_config_hash: | ||
| stmt = stmt.where( | ||
| BillingModel.pricing_config_hash == pricing_config_hash | ||
| ) | ||
| result = await session.execute(stmt) | ||
| return list(result.scalars().all()) | ||
| async def get_latest_billing( | ||
| self, cups: str, pricing_config_hash: Optional[str] = None | ||
| ) -> Optional[BillingModel]: | ||
| """Get the most recent billing record for a CUPS.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(BillingModel).where(BillingModel.cups == cups) | ||
| if pricing_config_hash: | ||
| stmt = stmt.where( | ||
| BillingModel.pricing_config_hash == pricing_config_hash | ||
| ) | ||
| stmt = stmt.order_by(desc(BillingModel.datetime)) | ||
| result = await session.execute(stmt) | ||
| return result.scalar_one_or_none() | ||
| async def delete_billing( | ||
| self, | ||
| cups: str, | ||
| pricing_config_hash: str, | ||
| start_date: Optional[DateTime] = None, | ||
| end_date: Optional[DateTime] = None, | ||
| ) -> int: | ||
| """Delete billing records for a specific configuration and optional date range.""" | ||
| async with self.get_session() as session: | ||
| stmt = select(BillingModel).where( | ||
| BillingModel.cups == cups, | ||
| BillingModel.pricing_config_hash == pricing_config_hash, | ||
| ) | ||
| if start_date: | ||
| stmt = stmt.where(BillingModel.datetime >= start_date) | ||
| if end_date: | ||
| stmt = stmt.where(BillingModel.datetime <= end_date) | ||
| result = await session.execute(stmt) | ||
| billing_records = list(result.scalars().all()) | ||
| count = len(billing_records) | ||
| for record in billing_records: | ||
| await session.delete(record) | ||
| await session.commit() | ||
| return count | ||
| @staticmethod | ||
| def generate_pricing_config_hash(pricing_rules_dict: dict) -> str: | ||
| """Generate a hash for pricing rules configuration.""" | ||
| # Create a normalized string representation for hashing | ||
| config_str = str(sorted(pricing_rules_dict.items())) | ||
| return hashlib.sha256(config_str.encode()).hexdigest()[:16] | ||
| async def save_from_pydantic_models( | ||
| self, | ||
| cups: str, | ||
| supplies: List, | ||
| contracts: List, | ||
| consumptions: List, | ||
| maximeter: List, | ||
| ): | ||
| """Save data from Pydantic models to database.""" | ||
| # Save supplies | ||
| for supply in supplies: | ||
| supply_dict = supply.model_dump() | ||
| await self.save_supply(supply_dict) | ||
| # Save contracts with CUPS | ||
| for contract in contracts: | ||
| contract_dict = contract.model_dump() | ||
| contract_dict["cups"] = cups | ||
| await self.save_contract(contract_dict) | ||
| # Save consumptions with CUPS | ||
| for consumption in consumptions: | ||
| consumption_dict = consumption.model_dump() | ||
| consumption_dict["cups"] = cups | ||
| await self.save_consumption(consumption_dict) | ||
| # Save maximeter readings with CUPS | ||
| for maxpower in maximeter: | ||
| maxpower_dict = maxpower.model_dump() | ||
| maxpower_dict["cups"] = cups | ||
| await self.save_maxpower(maxpower_dict) | ||
| # Global database service instance | ||
| _db_service: Optional[DatabaseService] = None | ||
| async def get_database_service(storage_dir: Optional[str] = None) -> DatabaseService: | ||
| """Get the global database service instance.""" | ||
| global _db_service | ||
| if _db_service is None: | ||
| _db_service = DatabaseService(storage_dir) | ||
| # Initialize tables on first access | ||
| await _db_service.create_tables() | ||
| return _db_service |
| """Maximeter service for fetching and updating maximum power data.""" | ||
| import logging | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Dict, List, Optional | ||
| from edata.connectors.datadis import DatadisConnector | ||
| from edata.models.maximeter import MaxPower | ||
| from edata.services.database import DatabaseService, get_database_service | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class MaximeterService: | ||
| """Service for managing maximum power data fetching and storage.""" | ||
| def __init__( | ||
| self, | ||
| datadis_connector: DatadisConnector, | ||
| storage_dir: Optional[str] = None, | ||
| ): | ||
| """Initialize maximeter service. | ||
| Args: | ||
| datadis_connector: Configured Datadis connector instance | ||
| storage_dir: Directory for database and cache storage | ||
| """ | ||
| self._datadis = datadis_connector | ||
| self._storage_dir = storage_dir | ||
| self._db_service = None | ||
| async def _get_db_service(self) -> DatabaseService: | ||
| """Get database service, initializing if needed.""" | ||
| if self._db_service is None: | ||
| self._db_service = await get_database_service(self._storage_dir) | ||
| return self._db_service | ||
| async def update_maxpower( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: Optional[str] = None, | ||
| force_full_update: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Update maximeter (maximum power) data for a CUPS in the specified date range. | ||
| Args: | ||
| cups: CUPS identifier | ||
| distributor_code: Distributor company code | ||
| start_date: Start date for maxpower data | ||
| end_date: End date for maxpower data | ||
| authorized_nif: Authorized NIF if accessing on behalf of someone | ||
| force_full_update: If True, fetch all data ignoring existing records | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| _LOGGER.info( | ||
| f"Updating maxpower for CUPS {cups[-5:]:>5} from {start_date.date()} to {end_date.date()}" | ||
| ) | ||
| # Determine actual start date based on existing data | ||
| actual_start_date = start_date | ||
| if not force_full_update: | ||
| last_maxpower_date = await self.get_last_maxpower_date(cups) | ||
| if last_maxpower_date: | ||
| # Start from the day after the last maxpower reading | ||
| actual_start_date = max( | ||
| start_date, last_maxpower_date + timedelta(hours=1) | ||
| ) | ||
| _LOGGER.info( | ||
| f"Found existing maxpower data up to {last_maxpower_date.date()}, fetching from {actual_start_date.date()}" | ||
| ) | ||
| else: | ||
| _LOGGER.info( | ||
| f"No existing maxpower data found for CUPS {cups[-5:]:>5}, fetching all data" | ||
| ) | ||
| # If actual start date is beyond end date, no new data needed | ||
| if actual_start_date >= end_date: | ||
| _LOGGER.info( | ||
| f"No new maxpower data needed for CUPS {cups[-5:]:>5} (up to date)" | ||
| ) | ||
| return { | ||
| "success": True, | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": 0, | ||
| "saved": 0, | ||
| "updated": 0, | ||
| "skipped": "up_to_date", | ||
| }, | ||
| "message": "Maxpower data is up to date", | ||
| } | ||
| try: | ||
| # Fetch maxpower data from datadis (only missing data) | ||
| maxpower_readings = await self._datadis.get_max_power( | ||
| cups=cups, | ||
| distributor_code=distributor_code, | ||
| start_date=actual_start_date, | ||
| end_date=end_date, | ||
| authorized_nif=authorized_nif, | ||
| ) | ||
| # Save to database | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| for maxpower in maxpower_readings: | ||
| # Convert Pydantic model to dict and add CUPS | ||
| maxpower_dict = maxpower.model_dump() | ||
| maxpower_dict["cups"] = cups | ||
| # Check if maxpower reading already exists | ||
| db_service = await self._get_db_service() | ||
| existing = await db_service.get_maxpower_readings( | ||
| cups=cups, start_date=maxpower.datetime, end_date=maxpower.datetime | ||
| ) | ||
| if existing: | ||
| updated_count += 1 | ||
| else: | ||
| saved_count += 1 | ||
| await db_service.save_maxpower(maxpower_dict) | ||
| result = { | ||
| "success": True, | ||
| "cups": cups, | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": actual_start_date.isoformat(), | ||
| }, | ||
| "stats": { | ||
| "fetched": len(maxpower_readings), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| }, | ||
| } | ||
| if actual_start_date > start_date: | ||
| result["message"] = ( | ||
| f"Fetched only missing maxpower data from {actual_start_date.date()}" | ||
| ) | ||
| _LOGGER.info( | ||
| f"Maxpower update completed: {len(maxpower_readings)} fetched, " | ||
| f"{saved_count} saved, {updated_count} updated" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating maxpower for CUPS {cups}: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "cups": cups, | ||
| "error": str(e), | ||
| "period": { | ||
| "start": start_date.isoformat(), | ||
| "end": end_date.isoformat(), | ||
| "actual_start": ( | ||
| actual_start_date.isoformat() | ||
| if "actual_start_date" in locals() | ||
| else start_date.isoformat() | ||
| ), | ||
| }, | ||
| } | ||
| async def update_maxpower_range_by_months( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: Optional[str] = None, | ||
| force_full_update: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Update maxpower data month by month to respect datadis limits. | ||
| Args: | ||
| cups: CUPS identifier | ||
| distributor_code: Distributor company code | ||
| start_date: Start date for maxpower data | ||
| end_date: End date for maxpower data | ||
| authorized_nif: Authorized NIF if accessing on behalf of someone | ||
| force_full_update: If True, fetch all data ignoring existing records | ||
| Returns: | ||
| Dict with operation results and statistics for all months | ||
| """ | ||
| _LOGGER.info( | ||
| f"Updating maxpower range for CUPS {cups[-5:]:>5} " | ||
| f"from {start_date.date()} to {end_date.date()} by months" | ||
| ) | ||
| results = [] | ||
| current_date = start_date | ||
| while current_date < end_date: | ||
| # Calculate month end | ||
| if current_date.month == 12: | ||
| month_end = current_date.replace( | ||
| year=current_date.year + 1, month=1, day=1 | ||
| ) | ||
| else: | ||
| month_end = current_date.replace(month=current_date.month + 1, day=1) | ||
| # Don't go past the requested end date | ||
| actual_end = min(month_end, end_date) | ||
| # Update maxpower for this month | ||
| maxpower_result = await self.update_maxpower( | ||
| cups=cups, | ||
| distributor_code=distributor_code, | ||
| start_date=current_date, | ||
| end_date=actual_end, | ||
| authorized_nif=authorized_nif, | ||
| force_full_update=force_full_update, | ||
| ) | ||
| result_entry = { | ||
| "month": current_date.strftime("%Y-%m"), | ||
| "maxpower": maxpower_result, | ||
| } | ||
| results.append(result_entry) | ||
| current_date = month_end | ||
| # Calculate totals | ||
| total_maxpower_fetched = sum( | ||
| r["maxpower"]["stats"]["fetched"] | ||
| for r in results | ||
| if r["maxpower"]["success"] | ||
| ) | ||
| total_maxpower_saved = sum( | ||
| r["maxpower"]["stats"]["saved"] for r in results if r["maxpower"]["success"] | ||
| ) | ||
| total_maxpower_updated = sum( | ||
| r["maxpower"]["stats"]["updated"] | ||
| for r in results | ||
| if r["maxpower"]["success"] | ||
| ) | ||
| summary = { | ||
| "success": all(r["maxpower"]["success"] for r in results), | ||
| "cups": cups, | ||
| "period": {"start": start_date.isoformat(), "end": end_date.isoformat()}, | ||
| "months_processed": len(results), | ||
| "total_stats": { | ||
| "maxpower_fetched": total_maxpower_fetched, | ||
| "maxpower_saved": total_maxpower_saved, | ||
| "maxpower_updated": total_maxpower_updated, | ||
| }, | ||
| "monthly_results": results, | ||
| } | ||
| _LOGGER.info( | ||
| f"Maxpower range update completed: {len(results)} months processed, " | ||
| f"{total_maxpower_fetched} maxpower readings fetched" | ||
| ) | ||
| return summary | ||
| async def get_stored_maxpower( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> List: | ||
| """Get stored maxpower readings from database. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Optional start date filter | ||
| end_date: Optional end date filter | ||
| Returns: | ||
| List of MaxPower objects from database | ||
| """ | ||
| db_service = await self._get_db_service() | ||
| return await db_service.get_maxpower_readings(cups, start_date, end_date) | ||
| async def get_last_maxpower_date(self, cups: str) -> Optional[datetime]: | ||
| """Get the date of the last maxpower record in the database. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Datetime of last maxpower reading or None if no data exists | ||
| """ | ||
| db_service = await self._get_db_service() | ||
| latest_maxpower = await db_service.get_latest_maxpower(cups) | ||
| if latest_maxpower: | ||
| return latest_maxpower.datetime | ||
| return None | ||
| async def get_peak_power_for_period( | ||
| self, cups: str, start_date: datetime, end_date: datetime | ||
| ) -> Optional[MaxPower]: | ||
| """Get the peak power reading for a specific period. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Start date for search | ||
| end_date: End date for search | ||
| Returns: | ||
| MaxPower object with highest value_kw in the period, or None if no data | ||
| """ | ||
| readings = await self.get_stored_maxpower(cups, start_date, end_date) | ||
| if not readings: | ||
| return None | ||
| # Find the reading with maximum power | ||
| peak_reading = max(readings, key=lambda r: r.value_kw) | ||
| return peak_reading | ||
| async def get_daily_peaks( | ||
| self, cups: str, start_date: datetime, end_date: datetime | ||
| ) -> Dict[str, MaxPower]: | ||
| """Get daily peak power readings for a date range. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Start date | ||
| end_date: End date | ||
| Returns: | ||
| Dict with date strings as keys and MaxPower objects as values | ||
| """ | ||
| readings = await self.get_stored_maxpower(cups, start_date, end_date) | ||
| if not readings: | ||
| return {} | ||
| # Group by date and find peak for each day | ||
| daily_peaks = {} | ||
| for reading in readings: | ||
| date_key = reading.datetime.date().isoformat() | ||
| if ( | ||
| date_key not in daily_peaks | ||
| or reading.value_kw > daily_peaks[date_key].value_kw | ||
| ): | ||
| daily_peaks[date_key] = reading | ||
| return daily_peaks | ||
| async def get_maximeter_summary( | ||
| self, | ||
| cups: str, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> Dict[str, Any]: | ||
| """Get maximeter summary data compatible with EdataHelper attributes. | ||
| Args: | ||
| cups: CUPS identifier | ||
| start_date: Optional start date filter | ||
| end_date: Optional end date filter | ||
| Returns: | ||
| Dict with summary attributes matching EdataHelper format | ||
| """ | ||
| maximeter_data = await self.get_stored_maxpower(cups, start_date, end_date) | ||
| if not maximeter_data: | ||
| return { | ||
| "max_power_kW": None, | ||
| "max_power_date": None, | ||
| "max_power_mean_kW": None, | ||
| "max_power_90perc_kW": None, | ||
| } | ||
| # Calculate summary statistics | ||
| power_values = [m.value_kw for m in maximeter_data] | ||
| max_power = max(power_values) | ||
| mean_power = sum(power_values) / len(power_values) | ||
| # Find date for max power | ||
| max_power_date = next( | ||
| m.datetime for m in maximeter_data if m.value_kw == max_power | ||
| ) | ||
| # Calculate 90th percentile | ||
| sorted_values = sorted(power_values) | ||
| n = len(sorted_values) | ||
| p90_index = int(0.9 * n) | ||
| p90_power = sorted_values[p90_index] if p90_index < n else sorted_values[-1] | ||
| return { | ||
| "max_power_kW": round(max_power, 2), | ||
| "max_power_date": max_power_date, | ||
| "max_power_mean_kW": round(mean_power, 2), | ||
| "max_power_90perc_kW": round(p90_power, 2), | ||
| } |
| """Supply service for fetching and managing supply data.""" | ||
| import logging | ||
| from datetime import datetime | ||
| from typing import Any, Dict, List, Optional | ||
| from edata.connectors.datadis import DatadisConnector | ||
| from edata.services.database import DatabaseService, SupplyModel, get_database_service | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class SupplyService: | ||
| """Service for managing supply data fetching and storage.""" | ||
| def __init__( | ||
| self, | ||
| datadis_connector: DatadisConnector, | ||
| storage_dir: Optional[str] = None, | ||
| ): | ||
| """Initialize supply service. | ||
| Args: | ||
| datadis_connector: Configured Datadis connector instance | ||
| storage_dir: Directory for database and cache storage | ||
| """ | ||
| self._datadis = datadis_connector | ||
| self._storage_dir = storage_dir | ||
| self._db_service = None | ||
| async def _get_db_service(self) -> DatabaseService: | ||
| """Get database service, initializing if needed.""" | ||
| if self._db_service is None: | ||
| self._db_service = await get_database_service(self._storage_dir) | ||
| return self._db_service | ||
| async def update_supplies( | ||
| self, authorized_nif: Optional[str] = None | ||
| ) -> Dict[str, Any]: | ||
| """Update supply data from Datadis. | ||
| Args: | ||
| authorized_nif: Optional authorized NIF for access | ||
| Returns: | ||
| Dict with operation results and statistics | ||
| """ | ||
| _LOGGER.info("Updating supplies from Datadis") | ||
| try: | ||
| # Fetch supply data from Datadis | ||
| supplies_data = await self._datadis.get_supplies( | ||
| authorized_nif=authorized_nif | ||
| ) | ||
| if not supplies_data: | ||
| _LOGGER.warning("No supply data found") | ||
| return { | ||
| "success": True, | ||
| "stats": { | ||
| "fetched": 0, | ||
| "saved": 0, | ||
| "updated": 0, | ||
| "total_stored": 0, | ||
| }, | ||
| } | ||
| # Save supplies to database | ||
| saved_count = 0 | ||
| updated_count = 0 | ||
| db_service = await self._get_db_service() | ||
| for supply in supplies_data: | ||
| # Convert Pydantic model to dict for database storage | ||
| supply_dict = supply.model_dump() | ||
| # Check if supply already exists | ||
| existing = await db_service.get_supplies(cups=supply.cups) | ||
| if existing: | ||
| updated_count += 1 | ||
| _LOGGER.debug( | ||
| f"Updating existing supply for CUPS {supply.cups[-5:]}" | ||
| ) | ||
| else: | ||
| saved_count += 1 | ||
| _LOGGER.debug(f"Saving new supply for CUPS {supply.cups[-5:]}") | ||
| # Save to database | ||
| await db_service.save_supply(supply_dict) | ||
| # Get total supplies stored | ||
| all_supplies = await db_service.get_supplies() | ||
| total_stored = len(all_supplies) | ||
| result = { | ||
| "success": True, | ||
| "stats": { | ||
| "fetched": len(supplies_data), | ||
| "saved": saved_count, | ||
| "updated": updated_count, | ||
| "total_stored": total_stored, | ||
| }, | ||
| } | ||
| _LOGGER.info( | ||
| f"Supply update completed: " | ||
| f"{len(supplies_data)} fetched, {saved_count} saved, {updated_count} updated" | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error updating supplies: {str(e)}") | ||
| return { | ||
| "success": False, | ||
| "error": str(e), | ||
| "stats": {"fetched": 0, "saved": 0, "updated": 0, "total_stored": 0}, | ||
| } | ||
| async def get_supplies(self, cups: Optional[str] = None) -> List[SupplyModel]: | ||
| """Get stored supply data. | ||
| Args: | ||
| cups: Optional CUPS identifier filter | ||
| Returns: | ||
| List of Supply objects | ||
| """ | ||
| _LOGGER.debug(f"Getting supplies{f' for CUPS {cups[-5:]}' if cups else ''}") | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| supplies = await db_service.get_supplies(cups=cups) | ||
| _LOGGER.debug(f"Found {len(supplies)} supplies") | ||
| return supplies | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting supplies: {str(e)}") | ||
| return [] | ||
| async def get_supply_by_cups(self, cups: str) -> Optional[SupplyModel]: | ||
| """Get a specific supply by CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Supply object if found, None otherwise | ||
| """ | ||
| _LOGGER.debug(f"Getting supply for CUPS {cups[-5:]}") | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| supplies = await db_service.get_supplies(cups=cups) | ||
| if supplies: | ||
| _LOGGER.debug(f"Found supply for CUPS {cups[-5:]}") | ||
| return supplies[0] # Should be unique | ||
| _LOGGER.warning(f"No supply found for CUPS {cups[-5:]}") | ||
| return None | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting supply for CUPS {cups[-5:]}: {str(e)}") | ||
| return None | ||
| async def get_cups_list(self) -> List[str]: | ||
| """Get list of all stored CUPS. | ||
| Returns: | ||
| List of CUPS identifiers | ||
| """ | ||
| _LOGGER.debug("Getting CUPS list") | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| supplies = await db_service.get_supplies() | ||
| cups_list = [supply.cups for supply in supplies if supply.cups] | ||
| _LOGGER.debug(f"Found {len(cups_list)} CUPS") | ||
| return cups_list | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting CUPS list: {str(e)}") | ||
| return [] | ||
| async def get_active_supplies( | ||
| self, reference_date: Optional[datetime] = None | ||
| ) -> List[SupplyModel]: | ||
| """Get supplies that are active at a given date. | ||
| Args: | ||
| reference_date: Date to check for active supplies (defaults to now) | ||
| Returns: | ||
| List of active supplies | ||
| """ | ||
| if reference_date is None: | ||
| reference_date = datetime.now() | ||
| _LOGGER.debug(f"Getting active supplies for date {reference_date.date()}") | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| all_supplies = await db_service.get_supplies() | ||
| active_supplies = [] | ||
| for supply in all_supplies: | ||
| if supply.date_start <= reference_date <= supply.date_end: | ||
| active_supplies.append(supply) | ||
| _LOGGER.debug(f"Found {len(active_supplies)} active supplies") | ||
| return active_supplies | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting active supplies: {str(e)}") | ||
| return [] | ||
| async def get_supply_stats(self) -> Dict[str, Any]: | ||
| """Get statistics about stored supplies. | ||
| Returns: | ||
| Dict with supply statistics | ||
| """ | ||
| _LOGGER.debug("Calculating supply statistics") | ||
| try: | ||
| db_service = await self._get_db_service() | ||
| supplies = await db_service.get_supplies() | ||
| if not supplies: | ||
| return { | ||
| "total_supplies": 0, | ||
| "total_cups": 0, | ||
| "date_range": None, | ||
| "distributors": {}, | ||
| "point_types": {}, | ||
| } | ||
| # Calculate date range | ||
| earliest_start = min(s.date_start for s in supplies) | ||
| latest_end = max(s.date_end for s in supplies) | ||
| # Count by distributor | ||
| distributors = {} | ||
| # Count by point type | ||
| point_types = {} | ||
| for supply in supplies: | ||
| # Count distributors | ||
| dist = supply.distributor or "Unknown" | ||
| distributors[dist] = distributors.get(dist, 0) + 1 | ||
| # Count point types | ||
| pt = supply.point_type or "Unknown" | ||
| point_types[pt] = point_types.get(pt, 0) + 1 | ||
| stats = { | ||
| "total_supplies": len(supplies), | ||
| "total_cups": len(set(s.cups for s in supplies)), | ||
| "date_range": { | ||
| "earliest_start": earliest_start, | ||
| "latest_end": latest_end, | ||
| }, | ||
| "distributors": distributors, | ||
| "point_types": point_types, | ||
| } | ||
| _LOGGER.debug(f"Supply statistics: {len(supplies)} total supplies") | ||
| return stats | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error calculating supply statistics: {str(e)}") | ||
| return {} | ||
| async def validate_cups(self, cups: str) -> bool: | ||
| """Validate that a CUPS exists in stored supplies. | ||
| Args: | ||
| cups: CUPS identifier to validate | ||
| Returns: | ||
| True if CUPS exists, False otherwise | ||
| """ | ||
| _LOGGER.debug(f"Validating CUPS {cups[-5:]}") | ||
| try: | ||
| supply = await self.get_supply_by_cups(cups) | ||
| is_valid = supply is not None | ||
| if is_valid: | ||
| _LOGGER.debug(f"CUPS {cups[-5:]} is valid") | ||
| else: | ||
| _LOGGER.warning(f"CUPS {cups[-5:]} not found") | ||
| return is_valid | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error validating CUPS {cups[-5:]}: {str(e)}") | ||
| return False | ||
| async def get_distributor_code(self, cups: str) -> Optional[str]: | ||
| """Get distributor code for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Distributor code if found, None otherwise | ||
| """ | ||
| _LOGGER.debug(f"Getting distributor code for CUPS {cups[-5:]}") | ||
| try: | ||
| supply = await self.get_supply_by_cups(cups) | ||
| if supply and supply.distributor_code: | ||
| _LOGGER.debug( | ||
| f"Found distributor code {supply.distributor_code} for CUPS {cups[-5:]}" | ||
| ) | ||
| return supply.distributor_code | ||
| _LOGGER.warning(f"No distributor code found for CUPS {cups[-5:]}") | ||
| return None | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting distributor code for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return None | ||
| async def get_point_type(self, cups: str) -> Optional[int]: | ||
| """Get point type for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Point type if found, None otherwise | ||
| """ | ||
| _LOGGER.debug(f"Getting point type for CUPS {cups[-5:]}") | ||
| try: | ||
| supply = await self.get_supply_by_cups(cups) | ||
| if supply and supply.point_type is not None: | ||
| _LOGGER.debug( | ||
| f"Found point type {supply.point_type} for CUPS {cups[-5:]}" | ||
| ) | ||
| return supply.point_type | ||
| _LOGGER.warning(f"No point type found for CUPS {cups[-5:]}") | ||
| return None | ||
| except Exception as e: | ||
| _LOGGER.error(f"Error getting point type for CUPS {cups[-5:]}: {str(e)}") | ||
| return None | ||
| async def get_supply_summary(self, cups: str) -> Dict[str, Any]: | ||
| """Get supply summary attributes for a CUPS. | ||
| Args: | ||
| cups: CUPS identifier | ||
| Returns: | ||
| Dict with supply summary attributes | ||
| """ | ||
| _LOGGER.debug(f"Getting supply summary for CUPS {cups[-5:]}") | ||
| try: | ||
| supply = await self.get_supply_by_cups(cups) | ||
| if not supply: | ||
| _LOGGER.warning(f"No supply found for CUPS {cups[-5:]}") | ||
| return {"cups": None} | ||
| summary = { | ||
| "cups": supply.cups, | ||
| # Add other supply-related summary attributes here as needed | ||
| # These would be used by EdataHelper for calculating summary attributes | ||
| } | ||
| _LOGGER.debug(f"Supply summary calculated for CUPS {cups[-5:]}") | ||
| return summary | ||
| except Exception as e: | ||
| _LOGGER.error( | ||
| f"Error getting supply summary for CUPS {cups[-5:]}: {str(e)}" | ||
| ) | ||
| return {"cups": None} |
@@ -11,2 +11,23 @@ .python-version | ||
| edata/utils.py | ||
| edata/connectors/__init__.py | ||
| edata/connectors/datadis.py | ||
| edata/connectors/redata.py | ||
| edata/models/__init__.py | ||
| edata/models/base.py | ||
| edata/models/consumption.py | ||
| edata/models/contract.py | ||
| edata/models/database.py | ||
| edata/models/maximeter.py | ||
| edata/models/pricing.py | ||
| edata/models/supply.py | ||
| edata/scripts/__init__.py | ||
| edata/scripts/__main__.py | ||
| edata/scripts/dump.py | ||
| edata/services/__init__.py | ||
| edata/services/billing.py | ||
| edata/services/consumption.py | ||
| edata/services/contract.py | ||
| edata/services/database.py | ||
| edata/services/maximeter.py | ||
| edata/services/supply.py | ||
| edata/tests/__init__.py | ||
@@ -13,0 +34,0 @@ edata/tests/test_helpers.py |
+4
-2
| Metadata-Version: 2.4 | ||
| Name: e-data | ||
| Version: 2.0.0b1 | ||
| Version: 2.0.0b2 | ||
| Summary: Python library for managing spanish energy data from various web providers | ||
| Author-email: VMG <vmayorg@outlook.es> | ||
| License: GPL-3.0-or-later | ||
| License-Expression: GPL-3.0-or-later | ||
| Project-URL: Homepage, https://github.com/uvejota/python-edata | ||
@@ -34,2 +34,4 @@ Project-URL: Repository, https://github.com/uvejota/python-edata | ||
| Requires-Dist: sqlalchemy[asyncio]>=2.0.0 | ||
| Requires-Dist: aiohttp>=3.8.0 | ||
| Requires-Dist: diskcache>=5.4.0 | ||
| Provides-Extra: dev | ||
@@ -36,0 +38,0 @@ Requires-Dist: pytest>=7.1.2; extra == "dev" |
+8
-3
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "e-data" | ||
| version = "2.0.0b1" | ||
| version = "2.0.0b2" | ||
| authors = [ | ||
@@ -14,3 +14,3 @@ {name = "VMG", email = "vmayorg@outlook.es"}, | ||
| readme = "README.md" | ||
| license = {text = "GPL-3.0-or-later"} | ||
| license = "GPL-3.0-or-later" | ||
| requires-python = ">=3.8" | ||
@@ -41,2 +41,4 @@ classifiers = [ | ||
| "sqlalchemy[asyncio]>=2.0.0", | ||
| "aiohttp>=3.8.0", | ||
| "diskcache>=5.4.0", | ||
| ] | ||
@@ -67,4 +69,7 @@ | ||
| [tool.setuptools] | ||
| packages = ["edata"] | ||
| include-package-data = true | ||
| [tool.setuptools.packages.find] | ||
| exclude = ["tests*", "*.tests*", "*.tests", "edata.tests*"] | ||
| [tool.setuptools.package-data] | ||
@@ -71,0 +76,0 @@ edata = ["py.typed"] |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
409210
87.22%46
84%8109
115.15%