Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

e-data

Package Overview
Dependencies
Maintainers
1
Versions
85
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

e-data - npm Package Compare versions

Comparing version
2.0.0b1
to
2.0.0b2
edata/connectors/__init__.py
+544
"""Datadis API connector.
To fetch data from datadis.es private API.
There a few issues that are workarounded:
- You have to wait 24h between two identical requests.
- Datadis server does not like ranges greater than 1 month.
"""
import asyncio
import hashlib
import logging
import os
import tempfile
from datetime import datetime, timedelta
import aiohttp
import diskcache
from dateutil.relativedelta import relativedelta
from edata import utils
from edata.models import Consumption, Contract, MaxPower, Supply
_LOGGER = logging.getLogger(__name__)
# Request timeout constant
REQUESTS_TIMEOUT = 30
# Token-related constants
URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login"
TOKEN_USERNAME = "username"
TOKEN_PASSWD = "password"
# Supplies-related constants
URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies"
GET_SUPPLIES_MANDATORY_FIELDS = [
"cups",
"validDateFrom",
"validDateTo",
"pointType",
"distributorCode",
]
# Contracts-related constants
URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail"
GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [
"startDate",
"endDate",
"marketer",
"contractedPowerkW",
]
# Consumption-related constants
URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data"
GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [
"time",
"date",
"consumptionKWh",
"obtainMethod",
]
MAX_CONSUMPTIONS_MONTHS = (
1 # max consumptions in a single request (fixed to 1 due to datadis limitations)
)
# Maximeter-related constants
URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power"
GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"]
# Timing constants
TIMEOUT = 3 * 60 # requests timeout
QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again...
# Cache-related constants
RECENT_CACHE_SUBDIR = "cache"
class DatadisConnector:
"""A Datadis private API connector."""
def __init__(
self,
username: str,
password: str,
enable_smart_fetch: bool = True,
storage_path: str | None = None,
) -> None:
"""DatadisConnector constructor."""
# initialize some things
self._usr = username
self._pwd = password
self._token = {}
self._smart_fetch = enable_smart_fetch
self._warned_queries = []
if storage_path is not None:
self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR)
else:
self._recent_cache_dir = os.path.join(
tempfile.gettempdir(), RECENT_CACHE_SUBDIR
)
os.makedirs(self._recent_cache_dir, exist_ok=True)
# Initialize diskcache for persistent caching
self._cache = diskcache.Cache(
self._recent_cache_dir,
size_limit=100 * 1024 * 1024, # 100MB limit
eviction_policy="least-recently-used",
)
async def login(self):
"""Test to login with provided credentials."""
return await self._get_token()
async def get_supplies(self, authorized_nif: str | None = None) -> list[Supply]:
"""Datadis 'get_supplies' query (async version)."""
data = {}
# If authorized_nif is provided, we have to include it as parameter
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
# Request the resource using get method
response = await self._get(
URL_GET_SUPPLIES, request_data=data, ignore_recent_queries=True
)
# Response is a list of serialized supplies.
# We will iter through them to transform them into Supply objects
supplies = []
# Build tomorrow Y/m/d string since we will use it as the 'date_end' of
# active supplies
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
# check data integrity (maybe this can be supressed if datadis proves to be reliable)
if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS):
supplies.append(
Supply(
cups=i["cups"], # the supply identifier
date_start=datetime.strptime(
(
i["validDateFrom"]
if i["validDateFrom"] != ""
else "1970/01/01"
),
"%Y/%m/%d",
), # start date of the supply. 1970/01/01 if unset.
date_end=datetime.strptime(
(
i["validDateTo"]
if i["validDateTo"] != ""
else tomorrow_str
),
"%Y/%m/%d",
), # end date of the supply, tomorrow if unset
# the following parameters are not crucial, so they can be none
address=i.get("address", None),
postal_code=i.get("postalCode", None),
province=i.get("province", None),
municipality=i.get("municipality", None),
distributor=i.get("distributor", None),
# these two are mandatory, we will use them to fetch contracts data
point_type=i["pointType"],
distributor_code=i["distributorCode"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching supplies data, got %s",
response,
)
return supplies
async def _get(
self,
url: str,
request_data: dict | None = None,
refresh_token: bool = False,
is_retry: bool = False,
ignore_recent_queries: bool = False,
):
"""Get request for Datadis API (async version)."""
if request_data is None:
data = {}
else:
data = request_data
# build get parameters
params = "?" if len(data) > 0 else ""
for param in data:
key = param
value = data[param]
params = params + f"{key}={value}&"
anonym_params = "?" if len(data) > 0 else ""
# build anonymized params for logging
for anonym_param in data:
key = anonym_param
if key == "cups":
value = "xxxx" + data[anonym_param][-5:]
elif key == "authorizedNif":
value = "xxxx"
else:
value = data[anonym_param]
anonym_params = anonym_params + f"{key}={value}&"
# Check diskcache first (unless ignoring cache)
if not ignore_recent_queries:
cache_data = {
"url": url,
"request_data": request_data,
"refresh_token": refresh_token,
"is_retry": is_retry,
}
cache_key = hashlib.sha256(str(cache_data).encode()).hexdigest()
try:
# Run cache get operation in thread to avoid blocking
cached_result = await asyncio.to_thread(self._cache.get, cache_key)
if cached_result is not None and isinstance(
cached_result, (list, dict)
):
_LOGGER.info(
"Returning cached response for '%s'", url + anonym_params
)
return cached_result
except Exception as e:
_LOGGER.warning("Error reading cache: %s", e)
# refresh token if needed (recursive approach)
is_valid_token = False
response = []
if refresh_token:
is_valid_token = await self._get_token()
if is_valid_token or not refresh_token:
# run the query
timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
_LOGGER.info("GET %s", url + anonym_params)
headers = {"Accept-Encoding": "identity"}
# Ensure we have a token
if not self._token.get("encoded"):
await self._get_token()
headers["Authorization"] = f"Bearer {self._token['encoded']}"
async with session.get(url + params, headers=headers) as reply:
# eval response
if reply.status == 200:
# we're here if reply seems valid
_LOGGER.info("Got 200 OK")
try:
response_json = await reply.json(content_type=None)
if response_json:
response = response_json
# Store in diskcache with 24h TTL
if not ignore_recent_queries and isinstance(
response, (list, dict)
):
try:
cache_data = {
"url": url,
"request_data": request_data,
"refresh_token": refresh_token,
"is_retry": is_retry,
}
cache_key = hashlib.sha256(
str(cache_data).encode()
).hexdigest()
ttl_seconds = int(
QUERY_LIMIT.total_seconds()
)
# Run cache set operation in thread to avoid blocking
await asyncio.to_thread(
self._cache.set,
cache_key,
response,
expire=ttl_seconds,
)
_LOGGER.info(
"Cached response for %s with TTL %d seconds",
url,
ttl_seconds,
)
except Exception as e:
_LOGGER.warning(
"Error storing in cache: %s", e
)
else:
# this mostly happens when datadis provides an empty response
_LOGGER.info("Got an empty response")
except Exception as e:
# Handle non-JSON responses
_LOGGER.info("Got an empty or non-JSON response")
_LOGGER.exception(e)
elif reply.status == 401 and not refresh_token:
# we're here if we were unauthorized so we will refresh the token
response = await self._get(
url,
request_data=data,
refresh_token=True,
ignore_recent_queries=ignore_recent_queries,
)
elif reply.status == 429:
# we're here if we exceeded datadis API rates (24h)
_LOGGER.warning(
"Got status code '%s' with message '%s'",
reply.status,
await reply.text(),
)
elif is_retry:
# otherwise, if this was a retried request... warn the user
if (url + params) not in self._warned_queries:
_LOGGER.warning(
"Got status code '%s' with message '%s'. %s. %s",
reply.status,
await reply.text(),
"Query temporary disabled",
"Future 500 code errors for this query will be silenced until restart",
)
self._warned_queries.append(url + params)
else:
# finally, retry since an unexpected error took place (mostly 500 errors - server fault)
response = await self._get(
url,
request_data,
is_retry=True,
ignore_recent_queries=ignore_recent_queries,
)
except asyncio.TimeoutError:
_LOGGER.warning("Timeout at %s", url + anonym_params)
return []
except Exception as e:
_LOGGER.error(
"Error during async request to %s: %s", url + anonym_params, e
)
return []
return response
async def get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
) -> list[Contract]:
"""Datadis get_contract_detail query (async version)."""
data = {"cups": cups, "distributorCode": distributor_code}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._get(
URL_GET_CONTRACT_DETAIL, request_data=data, ignore_recent_queries=True
)
contracts = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS):
contracts.append(
Contract(
date_start=datetime.strptime(
i["startDate"] if i["startDate"] != "" else "1970/01/01",
"%Y/%m/%d",
),
date_end=datetime.strptime(
i["endDate"] if i["endDate"] != "" else tomorrow_str,
"%Y/%m/%d",
),
marketer=i["marketer"],
distributor_code=distributor_code,
power_p1=(
i["contractedPowerkW"][0]
if isinstance(i["contractedPowerkW"], list)
else None
),
power_p2=(
i["contractedPowerkW"][1]
if (len(i["contractedPowerkW"]) > 1)
else None
),
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching contracts data, got %s",
response,
)
return contracts
async def get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
is_smart_fetch: bool = False,
) -> list[Consumption]:
"""Datadis get_consumption_data query (async version)."""
if self._smart_fetch and not is_smart_fetch:
_start = start_date
consumptions_dicts = []
while _start < end_date:
_end = min(
_start + relativedelta(months=MAX_CONSUMPTIONS_MONTHS), end_date
)
batch_consumptions = await self.get_consumption_data(
cups,
distributor_code,
_start,
_end,
measurement_type,
point_type,
authorized_nif,
is_smart_fetch=True,
)
# Convert to dicts for extend_by_key function
batch_dicts = [c.model_dump() for c in batch_consumptions]
consumptions_dicts = utils.extend_by_key(
consumptions_dicts,
batch_dicts,
"datetime",
)
_start = _end
# Convert back to Pydantic models
return [Consumption(**c) for c in consumptions_dicts]
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
"measurementType": measurement_type,
"pointType": point_type,
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._get(URL_GET_CONSUMPTION_DATA, request_data=data)
consumptions = []
for i in response:
if "consumptionKWh" in i:
if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS):
hour = str(int(i["time"].split(":")[0]) - 1)
date_as_dt = datetime.strptime(
f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M"
)
if not (start_date <= date_as_dt <= end_date):
continue # skip element if dt is out of range
_surplus = i.get("surplusEnergyKWh", 0)
if _surplus is None:
_surplus = 0
consumptions.append(
Consumption(
datetime=date_as_dt,
delta_h=1,
value_kwh=i["consumptionKWh"],
surplus_kwh=_surplus,
real=i["obtainMethod"] == "Real",
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching consumption data, got %s",
response,
)
return consumptions
async def get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
) -> list[MaxPower]:
"""Datadis get_max_power query (async version)."""
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._get(URL_GET_MAX_POWER, request_data=data)
maxpower_values = []
for i in response:
if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS):
maxpower_values.append(
MaxPower(
datetime=datetime.strptime(
f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M"
),
value_kw=i["maxPower"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching maximeter data, got %s",
response,
)
return maxpower_values
async def _get_token(self):
"""Private method that fetches a new token if needed (async version)."""
_LOGGER.info("Fetching token for async requests")
is_valid_token = False
timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT)
# Prepare data as URL-encoded string, same as sync version
form_data = {
TOKEN_USERNAME: self._usr,
TOKEN_PASSWD: self._pwd,
}
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(
URL_TOKEN,
data=form_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
) as response:
if response.status == 200:
# store token encoded
self._token["encoded"] = await response.text()
is_valid_token = True
else:
_LOGGER.error(
"Unknown error while retrieving async token, got %s",
await response.text(),
)
except Exception as e:
_LOGGER.error("Error during async token fetch: %s", e)
return is_valid_token
"""A REData API connector"""
import asyncio
import datetime as dt
import logging
import aiohttp
from dateutil import parser
from edata.models.pricing import PricingData
_LOGGER = logging.getLogger(__name__)
REQUESTS_TIMEOUT = 15
URL_REALTIME_PRICES = (
"https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real"
"?time_trunc=hour"
"&geo_ids={geo_id}"
"&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}"
)
class REDataConnector:
"""Main class for REData connector"""
def __init__(
self,
) -> None:
"""Init method for REDataConnector"""
async def get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month (async version)"""
url = URL_REALTIME_PRICES.format(
geo_id=8744 if is_ceuta_melilla else 8741,
start=dt_from,
end=dt_to,
)
data = []
timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as response:
if response.status == 200:
res_json = await response.json()
if res_json:
try:
res_list = res_json["included"][0]["attributes"][
"values"
]
except (IndexError, KeyError):
_LOGGER.error(
"%s returned a malformed response: %s ",
url,
await response.text(),
)
return data
for element in res_list:
data.append(
PricingData(
datetime=parser.parse(
element["datetime"]
).replace(tzinfo=None),
value_eur_kwh=element["value"] / 1000,
delta_h=1,
)
)
else:
_LOGGER.error(
"%s returned %s with code %s",
url,
await response.text(),
response.status,
)
except asyncio.TimeoutError:
_LOGGER.error("Timeout error when fetching data from %s", url)
except aiohttp.ClientError as e:
_LOGGER.error(
"HTTP client error when fetching data from %s: %s", url, e
)
except Exception as e:
_LOGGER.error("Unexpected error when fetching data from %s: %s", url, e)
return data
"""Pydantic models for edata.
This module contains all data models using Pydantic for robust validation,
serialization and better developer experience.
"""
from edata.models.consumption import Consumption, ConsumptionAggregated
from edata.models.contract import Contract
from edata.models.maximeter import MaxPower
from edata.models.pricing import PricingAggregated, PricingData, PricingRules
from edata.models.supply import Supply
__all__ = [
"Supply",
"Contract",
"Consumption",
"ConsumptionAggregated",
"PricingData",
"PricingRules",
"PricingAggregated",
"MaxPower",
]
"""Base models and common functionality for edata Pydantic models."""
from datetime import datetime
from typing import Any, Dict
from pydantic import BaseModel, ConfigDict, field_validator
class EdataBaseModel(BaseModel):
"""Base model for all edata entities with common configuration."""
model_config = ConfigDict(
# Validate assignments to ensure data integrity
validate_assignment=True,
# Use enum values instead of enum objects for serialization
use_enum_values=True,
# Extra fields are forbidden to catch typos and ensure schema compliance
extra="forbid",
# Validate default values
validate_default=True,
# Allow serialization of datetime objects
arbitrary_types_allowed=False,
# Convert strings to datetime objects when possible
str_strip_whitespace=True,
)
def model_dump_for_storage(self) -> Dict[str, Any]:
"""Serialize model for storage, handling special types like datetime."""
return self.model_dump(mode="json")
@classmethod
def from_storage(cls, data: Dict[str, Any]):
"""Create model instance from storage data."""
return cls.model_validate(data)
class TimestampMixin(BaseModel):
"""Mixin for models that have datetime fields."""
@field_validator("*", mode="before")
@classmethod
def validate_datetime_fields(cls, v, info):
"""Convert datetime strings to datetime objects if needed."""
field_name = info.field_name
if field_name and ("datetime" in field_name or "date" in field_name):
if isinstance(v, str):
try:
from dateutil import parser
return parser.parse(v)
except (ValueError, TypeError):
pass
return v
class EnergyMixin(BaseModel):
"""Mixin for models dealing with energy values."""
@field_validator("*", mode="before")
@classmethod
def validate_energy_fields(cls, v, info):
"""Validate energy-related fields."""
field_name = info.field_name
if field_name and ("kwh" in field_name.lower() or "kw" in field_name.lower()):
if v is not None and v < 0:
raise ValueError(f"{field_name} cannot be negative")
return v
def validate_cups(v: str) -> str:
"""Validate CUPS (Spanish electricity supply point code) format."""
if not v:
raise ValueError("CUPS cannot be empty")
# Remove spaces and convert to uppercase
cups = v.replace(" ", "").upper()
# Basic CUPS format validation (ES + 18-20 alphanumeric characters)
if not cups.startswith("ES"):
raise ValueError("CUPS must start with 'ES'")
if len(cups) < 20 or len(cups) > 22:
raise ValueError("CUPS must be 20-22 characters long")
return cups
def validate_positive_number(v: float) -> float:
"""Validate that a number is positive."""
if v is not None and v < 0:
raise ValueError("Value must be positive")
return v
def validate_reasonable_datetime(v: datetime) -> datetime:
"""Validate that datetime is within reasonable bounds."""
if v.year < 2000:
raise ValueError("Date cannot be before year 2000")
# Allow future dates for contracts and supplies (they can be valid until future dates)
# Only restrict to really unreasonable future dates
if v.year > datetime.now().year + 50:
raise ValueError("Date cannot be more than 50 years in the future")
return v
"""Consumption (consumo) related Pydantic models."""
from datetime import datetime as dt
from pydantic import Field, field_validator
from edata.models.base import (
EdataBaseModel,
EnergyMixin,
TimestampMixin,
validate_positive_number,
validate_reasonable_datetime,
)
class Consumption(EdataBaseModel, TimestampMixin, EnergyMixin):
"""Pydantic model for electricity consumption data."""
datetime: dt = Field(..., description="Timestamp of the consumption measurement")
delta_h: float = Field(
..., description="Time interval in hours for this measurement", gt=0, le=24
)
value_kwh: float = Field(..., description="Energy consumption in kWh", ge=0)
surplus_kwh: float = Field(
default=0.0, description="Energy surplus/generation in kWh", ge=0
)
real: bool = Field(
default=True, description="Whether this is a real measurement or estimated"
)
@field_validator("datetime")
@classmethod
def validate_datetime_range(cls, v: dt) -> dt:
"""Validate datetime is reasonable."""
return validate_reasonable_datetime(v)
@field_validator("value_kwh", "surplus_kwh")
@classmethod
def validate_energy_values(cls, v: float) -> float:
"""Validate energy values are positive."""
return validate_positive_number(v)
def __str__(self) -> str:
"""String representation."""
return f"Consumption({self.datetime}, {self.value_kwh}kWh)"
def __repr__(self) -> str:
"""Developer representation."""
return f"Consumption(datetime={self.datetime}, value_kwh={self.value_kwh}, real={self.real})"
class ConsumptionAggregated(EdataBaseModel, TimestampMixin, EnergyMixin):
"""Pydantic model for aggregated consumption data (daily/monthly summaries)."""
datetime: dt = Field(
..., description="Timestamp representing the start of the aggregation period"
)
value_kwh: float = Field(
..., description="Total energy consumption in kWh for the period", ge=0
)
value_p1_kwh: float = Field(
default=0.0, description="Energy consumption in period P1 (kWh)", ge=0
)
value_p2_kwh: float = Field(
default=0.0, description="Energy consumption in period P2 (kWh)", ge=0
)
value_p3_kwh: float = Field(
default=0.0, description="Energy consumption in period P3 (kWh)", ge=0
)
surplus_kwh: float = Field(
default=0.0,
description="Total energy surplus/generation in kWh for the period",
ge=0,
)
surplus_p1_kwh: float = Field(
default=0.0, description="Energy surplus in period P1 (kWh)", ge=0
)
surplus_p2_kwh: float = Field(
default=0.0, description="Energy surplus in period P2 (kWh)", ge=0
)
surplus_p3_kwh: float = Field(
default=0.0, description="Energy surplus in period P3 (kWh)", ge=0
)
delta_h: float = Field(
..., description="Duration of the aggregation period in hours", gt=0
)
@field_validator("datetime")
@classmethod
def validate_datetime_range(cls, v: dt) -> dt:
"""Validate datetime is reasonable."""
return validate_reasonable_datetime(v)
def __str__(self) -> str:
"""String representation."""
period = "day" if self.delta_h <= 24 else "month"
return f"ConsumptionAgg({self.datetime.date()}, {self.value_kwh}kWh/{period})"
def __repr__(self) -> str:
"""Developer representation."""
return f"ConsumptionAggregated(datetime={self.datetime}, value_kwh={self.value_kwh}, delta_h={self.delta_h})"
"""Contract (contrato) related Pydantic models."""
from datetime import datetime
from typing import Optional
from pydantic import Field, field_validator
from edata.models.base import (
EdataBaseModel,
TimestampMixin,
validate_positive_number,
validate_reasonable_datetime,
)
class Contract(EdataBaseModel, TimestampMixin):
"""Pydantic model for electricity contract data."""
date_start: datetime = Field(..., description="Contract start date")
date_end: datetime = Field(..., description="Contract end date")
marketer: str = Field(..., description="Energy marketer company name", min_length=1)
distributor_code: str = Field(
..., description="Distributor company code", min_length=1
)
power_p1: Optional[float] = Field(
None, description="Contracted power for period P1 (kW)", ge=0
)
power_p2: Optional[float] = Field(
None, description="Contracted power for period P2 (kW)", ge=0
)
@field_validator("date_start", "date_end")
@classmethod
def validate_date_range(cls, v: datetime) -> datetime:
"""Validate date is reasonable."""
return validate_reasonable_datetime(v)
@field_validator("power_p1", "power_p2")
@classmethod
def validate_power_values(cls, v: Optional[float]) -> Optional[float]:
"""Validate power values are positive."""
if v is not None:
return validate_positive_number(v)
return v
def __str__(self) -> str:
"""String representation."""
return f"Contract(marketer={self.marketer}, power_p1={self.power_p1}kW)"
def __repr__(self) -> str:
"""Developer representation."""
return f"Contract(marketer={self.marketer}, date_start={self.date_start}, date_end={self.date_end})"
from datetime import datetime as DateTime
from typing import List, Optional
from pydantic import Field
from sqlalchemy import UniqueConstraint
from sqlmodel import Field, Relationship, SQLModel
from edata.models import Consumption, Contract, MaxPower, PricingData, Supply
class SupplyModel(Supply, SQLModel, table=True):
"""SQLModel for electricity supply data inheriting from Pydantic model."""
__tablename__: str = "supplies"
# Override cups field to add primary key
cups: str = Field(primary_key=True, min_length=20, max_length=22)
# Add database-specific fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
# Relationships
contracts: List["ContractModel"] = Relationship(back_populates="supply")
consumptions: List["ConsumptionModel"] = Relationship(back_populates="supply")
maximeter: List["MaxPowerModel"] = Relationship(back_populates="supply")
class ContractModel(Contract, SQLModel, table=True):
"""SQLModel for electricity contract data inheriting from Pydantic model."""
__tablename__: str = "contracts"
__table_args__ = (UniqueConstraint("cups", "date_start"),)
# Add ID field for database
id: Optional[int] = Field(default=None, primary_key=True)
# Add CUPS field for foreign key
cups: str = Field(foreign_key="supplies.cups")
# Add database-specific fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
# Relationships
supply: Optional["SupplyModel"] = Relationship(back_populates="contracts")
class ConsumptionModel(Consumption, SQLModel, table=True):
"""SQLModel for electricity consumption data inheriting from Pydantic model."""
__tablename__: str = "consumptions"
__table_args__ = (UniqueConstraint("cups", "datetime"),)
# Add ID field for database
id: Optional[int] = Field(default=None, primary_key=True)
# Add CUPS field for foreign key
cups: str = Field(foreign_key="supplies.cups")
# Add database-specific fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
# Relationships
supply: Optional["SupplyModel"] = Relationship(back_populates="consumptions")
class MaxPowerModel(MaxPower, SQLModel, table=True):
"""SQLModel for maximum power demand data inheriting from Pydantic model."""
__tablename__: str = "maximeter"
__table_args__ = (UniqueConstraint("cups", "datetime"),)
# Add ID field for database
id: Optional[int] = Field(default=None, primary_key=True)
# Add CUPS field for foreign key
cups: str = Field(foreign_key="supplies.cups")
# Add database-specific fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
# Relationships
supply: Optional["SupplyModel"] = Relationship(back_populates="maximeter")
class PVPCPricesModel(PricingData, SQLModel, table=True):
"""SQLModel for PVPC pricing data inheriting from Pydantic model."""
__tablename__: str = "pvpc_prices"
__table_args__ = (UniqueConstraint("datetime", "geo_id"),)
# Add ID field for database
id: Optional[int] = Field(default=None, primary_key=True)
# Add database-specific fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
# Add required fields for geographic specificity
geo_id: int = Field(
description="Geographic identifier (8741=Peninsula, 8744=Ceuta/Melilla)"
)
class BillingModel(SQLModel, table=True):
"""SQLModel for billing calculations per hour."""
__tablename__: str = "billing"
__table_args__ = (UniqueConstraint("cups", "datetime", "pricing_config_hash"),)
# Primary key
id: Optional[int] = Field(default=None, primary_key=True)
# Foreign key to supply
cups: str = Field(foreign_key="supplies.cups")
datetime: DateTime = Field(description="Hour of the billing calculation")
# Calculated cost terms (the essential billing data)
energy_term: float = Field(default=0.0, description="Energy cost term in €")
power_term: float = Field(default=0.0, description="Power cost term in €")
others_term: float = Field(default=0.0, description="Other costs term in €")
surplus_term: float = Field(default=0.0, description="Surplus income term in €")
total_eur: float = Field(default=0.0, description="Total cost in €")
# Metadata
tariff: Optional[str] = Field(
default=None, description="Tariff period (p1, p2, p3)"
)
pricing_config_hash: str = Field(description="Hash of pricing rules configuration")
# Audit fields
created_at: DateTime = Field(default_factory=DateTime.now)
updated_at: DateTime = Field(default_factory=DateTime.now)
"""Maximeter (maxímetro) related Pydantic models."""
from datetime import datetime as dt
from pydantic import Field, field_validator
from edata.models.base import (
EdataBaseModel,
TimestampMixin,
validate_positive_number,
validate_reasonable_datetime,
)
class MaxPower(EdataBaseModel, TimestampMixin):
"""Pydantic model for maximum power demand data."""
datetime: dt = Field(..., description="Timestamp when maximum power was recorded")
value_kw: float = Field(..., description="Maximum power demand in kW", ge=0)
@field_validator("datetime")
@classmethod
def validate_datetime_range(cls, v: dt) -> dt:
"""Validate datetime is reasonable."""
return validate_reasonable_datetime(v)
@field_validator("value_kw")
@classmethod
def validate_power_value(cls, v: float) -> float:
"""Validate power value is positive."""
return validate_positive_number(v)
def __str__(self) -> str:
"""String representation."""
return f"MaxPower({self.datetime}, {self.value_kw}kW)"
def __repr__(self) -> str:
"""Developer representation."""
return f"MaxPower(datetime={self.datetime}, value_kw={self.value_kw})"
"""Pricing related Pydantic models."""
from datetime import datetime as dt
from typing import Optional
from pydantic import Field, field_validator
from edata.models.base import (
EdataBaseModel,
TimestampMixin,
validate_positive_number,
validate_reasonable_datetime,
)
class PricingData(EdataBaseModel, TimestampMixin):
"""Pydantic model for electricity pricing data (PVPC prices)."""
datetime: dt = Field(..., description="Timestamp of the price data")
value_eur_kwh: float = Field(..., description="Price in EUR per kWh", ge=0)
delta_h: float = Field(
default=1.0, description="Duration this price applies (hours)", gt=0, le=24
)
@field_validator("datetime")
@classmethod
def validate_datetime_range(cls, v: dt) -> dt:
"""Validate datetime is reasonable."""
return validate_reasonable_datetime(v)
@field_validator("value_eur_kwh")
@classmethod
def validate_price_value(cls, v: float) -> float:
"""Validate price value is positive."""
return validate_positive_number(v)
def __str__(self) -> str:
"""String representation."""
return f"Price({self.datetime}, {self.value_eur_kwh:.4f}€/kWh)"
def __repr__(self) -> str:
"""Developer representation."""
return (
f"PricingData(datetime={self.datetime}, value_eur_kwh={self.value_eur_kwh})"
)
class PricingRules(EdataBaseModel):
"""Pydantic model for custom pricing rules configuration."""
# Power term costs (yearly costs in EUR per kW)
p1_kw_year_eur: float = Field(
..., description="P1 power term cost (EUR/kW/year)", ge=0
)
p2_kw_year_eur: float = Field(
..., description="P2 power term cost (EUR/kW/year)", ge=0
)
# Energy term costs (optional for fixed pricing)
p1_kwh_eur: Optional[float] = Field(
None, description="P1 energy term cost (EUR/kWh) - None for PVPC", ge=0
)
p2_kwh_eur: Optional[float] = Field(
None, description="P2 energy term cost (EUR/kWh) - None for PVPC", ge=0
)
p3_kwh_eur: Optional[float] = Field(
None, description="P3 energy term cost (EUR/kWh) - None for PVPC", ge=0
)
# Surplus compensation (optional)
surplus_p1_kwh_eur: Optional[float] = Field(
None, description="P1 surplus compensation (EUR/kWh)", ge=0
)
surplus_p2_kwh_eur: Optional[float] = Field(
None, description="P2 surplus compensation (EUR/kWh)", ge=0
)
surplus_p3_kwh_eur: Optional[float] = Field(
None, description="P3 surplus compensation (EUR/kWh)", ge=0
)
# Fixed costs
meter_month_eur: float = Field(
..., description="Monthly meter rental cost (EUR/month)", ge=0
)
market_kw_year_eur: float = Field(
..., description="Market operator cost (EUR/kW/year)", ge=0
)
# Tax multipliers
electricity_tax: float = Field(
..., description="Electricity tax multiplier (e.g., 1.05113 for 5.113%)", ge=1.0
)
iva_tax: float = Field(
..., description="VAT tax multiplier (e.g., 1.21 for 21%)", ge=1.0
)
# Custom formulas (optional)
energy_formula: Optional[str] = Field(
"electricity_tax * iva_tax * kwh_eur * kwh",
description="Custom energy cost formula (Jinja2 template)",
)
power_formula: Optional[str] = Field(
"electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24",
description="Custom power cost formula (Jinja2 template)",
)
others_formula: Optional[str] = Field(
"iva_tax * meter_month_eur / 30 / 24",
description="Custom other costs formula (Jinja2 template)",
)
surplus_formula: Optional[str] = Field(
"electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur",
description="Custom surplus compensation formula (Jinja2 template)",
)
main_formula: Optional[str] = Field(
"energy_term + power_term + others_term",
description="Main cost calculation formula (Jinja2 template)",
)
# Billing cycle
cycle_start_day: int = Field(
default=1, description="Day of month when billing cycle starts", ge=1, le=30
)
@property
def is_pvpc(self) -> bool:
"""Check if this configuration uses PVPC (variable pricing)."""
return all(
price is None
for price in [self.p1_kwh_eur, self.p2_kwh_eur, self.p3_kwh_eur]
)
def __str__(self) -> str:
"""String representation."""
pricing_type = "PVPC" if self.is_pvpc else "Fixed"
return f"PricingRules({pricing_type}, P1={self.p1_kw_year_eur}€/kW/year)"
def __repr__(self) -> str:
"""Developer representation."""
return f"PricingRules(p1_kw_year_eur={self.p1_kw_year_eur}, is_pvpc={self.is_pvpc})"
class PricingAggregated(EdataBaseModel, TimestampMixin):
"""Pydantic model for aggregated pricing/billing data."""
datetime: dt = Field(
..., description="Timestamp representing the start of the billing period"
)
value_eur: float = Field(..., description="Total cost in EUR for the period", ge=0)
energy_term: float = Field(default=0.0, description="Energy term cost (EUR)", ge=0)
power_term: float = Field(default=0.0, description="Power term cost (EUR)", ge=0)
others_term: float = Field(default=0.0, description="Other costs term (EUR)", ge=0)
surplus_term: float = Field(
default=0.0, description="Surplus compensation term (EUR)", ge=0
)
delta_h: float = Field(
default=1.0, description="Duration of the billing period in hours", gt=0
)
@field_validator("datetime")
@classmethod
def validate_datetime_range(cls, v: dt) -> dt:
"""Validate datetime is reasonable."""
return validate_reasonable_datetime(v)
def __str__(self) -> str:
"""String representation."""
period = (
"hour" if self.delta_h <= 1 else "day" if self.delta_h <= 24 else "month"
)
return f"Billing({self.datetime.date()}, {self.value_eur:.2f}€/{period})"
def __repr__(self) -> str:
"""Developer representation."""
return f"PricingAggregated(datetime={self.datetime}, value_eur={self.value_eur}, delta_h={self.delta_h})"
"""Supply (suministro) related Pydantic models."""
from datetime import datetime
from typing import Optional
from pydantic import Field, field_validator
from edata.models.base import (
EdataBaseModel,
TimestampMixin,
validate_cups,
validate_reasonable_datetime,
)
class Supply(EdataBaseModel, TimestampMixin):
"""Pydantic model for electricity supply data (suministro eléctrico)."""
cups: str = Field(
...,
description="CUPS (Código Universal de Punto de Suministro) - Universal Supply Point Code",
min_length=20,
max_length=22,
)
date_start: datetime = Field(..., description="Supply contract start date")
date_end: datetime = Field(..., description="Supply contract end date")
address: Optional[str] = Field(None, description="Supply point address")
postal_code: Optional[str] = Field(
None, description="Postal code of the supply point", pattern=r"^\d{5}$"
)
province: Optional[str] = Field(None, description="Province name")
municipality: Optional[str] = Field(None, description="Municipality name")
distributor: Optional[str] = Field(
None, description="Electricity distributor company name"
)
point_type: int = Field(..., description="Type of supply point", ge=1, le=5)
distributor_code: str = Field(
..., description="Distributor company code", min_length=1
)
@field_validator("cups")
@classmethod
def validate_cups_format(cls, v: str) -> str:
"""Validate CUPS format."""
return validate_cups(v)
@field_validator("date_start", "date_end")
@classmethod
def validate_date_range(cls, v: datetime) -> datetime:
"""Validate date is reasonable."""
return validate_reasonable_datetime(v)
@field_validator("date_end")
@classmethod
def validate_end_after_start(cls, v: datetime, info) -> datetime:
"""Validate that end date is after start date."""
if (
hasattr(info.data, "date_start")
and info.data["date_start"]
and v <= info.data["date_start"]
):
raise ValueError("End date must be after start date")
return v
def __str__(self) -> str:
"""String representation showing anonymized CUPS."""
return f"Supply(cups=...{self.cups[-5:]}, distributor={self.distributor})"
def __repr__(self) -> str:
"""Developer representation."""
return f"Supply(cups={self.cups}, point_type={self.point_type})"
"""Scripts de utilidades para edata."""
#!/usr/bin/env python3
"""
Script interactivo para hacer un dump completo de un CUPS a una base de datos.
"""
import argparse
import asyncio
import getpass
import logging
from datetime import datetime, timedelta
from typing import List, Optional
from edata.connectors.datadis import DatadisConnector
from edata.const import DEFAULT_STORAGE_DIR
from edata.helpers import EdataHelper
from edata.services.database import SupplyModel as DbSupply
from edata.services.supply import SupplyService
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
_LOGGER = logging.getLogger(__name__)
class DumpSupply:
"""Clase para hacer dump interactivo completo de un CUPS."""
def __init__(self, storage_dir: Optional[str] = None):
"""Inicializar el dumper interactivo."""
self.storage_dir = storage_dir or DEFAULT_STORAGE_DIR
self.username: Optional[str] = None
self.password: Optional[str] = None
self.authorized_nif: Optional[str] = None
self.connector: Optional[DatadisConnector] = None
self.supplies: List[DbSupply] = []
def get_credentials(self) -> bool:
"""Obtener credenciales del usuario de forma interactiva."""
print("\n🔐 Configuración de credenciales Datadis")
print("=" * 50)
try:
self.username = input("📧 Usuario Datadis: ").strip()
if not self.username:
print("❌ El usuario es obligatorio")
return False
print(self.username)
self.password = getpass.getpass("🔑 Contraseña Datadis: ").strip()
if not self.password:
print("❌ La contraseña es obligatoria")
return False
print(self.password)
nif_input = input(
"🆔 NIF autorizado (opcional, Enter para omitir): "
).strip()
self.authorized_nif = nif_input if nif_input else None
return True
except KeyboardInterrupt:
print("\n❌ Operación cancelada por el usuario")
return False
except Exception as e:
print(f"❌ Error obteniendo credenciales: {e}")
return False
async def test_connection(self) -> bool:
"""Probar la conexión con Datadis."""
print("\n🧪 Probando conexión con Datadis...")
try:
# Verificar que tenemos credenciales
if not self.username or not self.password:
print("❌ Credenciales no disponibles")
return False
self.connector = DatadisConnector(self.username, self.password)
# Probar autenticación
token_result = await self.connector.login()
if not token_result:
print("❌ Error de autenticación. Verifica tus credenciales.")
return False
print("✅ Conexión exitosa con Datadis")
return True
except Exception as e:
print(f"❌ Error conectando con Datadis: {e}")
return False
async def fetch_supplies(self) -> bool:
"""Obtener y mostrar los suministros disponibles."""
print("\n📋 Obteniendo suministros disponibles...")
try:
# Verificar que tenemos credenciales
if not self.username or not self.password:
print("❌ Credenciales no disponibles")
return False
supplies_service = SupplyService(
DatadisConnector(
username=self.username,
password=self.password,
storage_path=self.storage_dir,
),
storage_dir=self.storage_dir,
)
# Actualizar supplies
result = await supplies_service.update_supplies(
authorized_nif=self.authorized_nif
)
if not result["success"]:
print(
f"❌ Error obteniendo suministros: {result.get('error', 'Error desconocido')}"
)
return False
# Obtener todos los supplies desde la base de datos
self.supplies = await supplies_service.get_supplies()
if not self.supplies:
print("❌ No se encontraron suministros en tu cuenta")
return False
print(f"✅ Encontrados {len(self.supplies)} suministros")
return True
except Exception as e:
print(f"❌ Error obteniendo suministros: {e}")
return False
def display_supplies_menu(self) -> Optional[DbSupply]:
"""Mostrar menú de suministros y obtener selección."""
print("\n🏠 Selecciona un suministro para procesar:")
print("=" * 70)
for i, supply in enumerate(self.supplies, 1):
# Mostrar información del suministro
cups_short = supply.cups[-10:] if len(supply.cups) > 10 else supply.cups
address = supply.address or "Dirección no disponible"
if len(address) > 40:
address = address[:40] + "..."
print(f"{i:2d}. CUPS: {cups_short} | {address}")
print(
f" 📍 {supply.municipality or 'N/A'}, {supply.province or 'N/A'} ({supply.postal_code or 'N/A'})"
)
print(
f" 📊 Tipo: {supply.point_type} | Distribuidor: {supply.distributor or 'N/A'}"
)
print(
f" 📅 Válido: {supply.date_start.date()} - {supply.date_end.date()}"
)
print()
try:
selection = input(
f"Selecciona un suministro (1-{len(self.supplies)}) o 'q' para salir: "
).strip()
if selection.lower() == "q":
return None
index = int(selection) - 1
if 0 <= index < len(self.supplies):
return self.supplies[index]
else:
print(
f"❌ Selección inválida. Debe estar entre 1 y {len(self.supplies)}"
)
return self.display_supplies_menu()
except ValueError:
print("❌ Por favor introduce un número válido")
return self.display_supplies_menu()
except KeyboardInterrupt:
print("\n❌ Operación cancelada")
return None
def get_date_range(self) -> tuple[datetime, datetime]:
"""Obtener rango de fechas del usuario."""
print("\n📅 Configuración de fechas")
print("=" * 30)
print("Deja en blanco para usar valores por defecto (últimos 2 años)")
try:
date_from_str = input(
"📅 Fecha inicio (YYYY-MM-DD) [Enter = 2 años atrás]: "
).strip()
date_to_str = input("📅 Fecha fin (YYYY-MM-DD) [Enter = hoy]: ").strip()
date_from = None
date_to = None
if date_from_str:
try:
date_from = datetime.strptime(date_from_str, "%Y-%m-%d")
except ValueError:
print(
"❌ Formato de fecha inicio inválido, usando valor por defecto"
)
if date_to_str:
try:
date_to = datetime.strptime(date_to_str, "%Y-%m-%d")
except ValueError:
print("❌ Formato de fecha fin inválido, usando valor por defecto")
# Valores por defecto
if date_from is None:
date_from = datetime.now() - timedelta(days=730)
if date_to is None:
date_to = datetime.now()
print(f"📊 Período seleccionado: {date_from.date()} a {date_to.date()}")
return date_from, date_to
except KeyboardInterrupt:
print("❌ Usando valores por defecto")
default_from = datetime.now() - timedelta(days=730)
default_to = datetime.now()
return default_from, default_to
async def dump_selected_supply(
self, supply: DbSupply, date_from: datetime, date_to: datetime
) -> bool:
"""Hacer dump completo de un suministro seleccionado."""
print(f"🚀 Iniciando dump para CUPS {supply.cups[-10:]}")
print("=" * 50)
try:
# Verificar que tenemos credenciales
if not self.username or not self.password:
print("❌ Credenciales no disponibles")
return False
# Crear EdataHelper para este CUPS
helper = EdataHelper(
datadis_username=self.username,
datadis_password=self.password,
cups=supply.cups,
datadis_authorized_nif=self.authorized_nif,
storage_dir_path=self.storage_dir,
)
print(f"📅 Período: {date_from.date()} a {date_to.date()}")
print("⏳ Descargando datos... (esto puede tomar varios minutos)")
# Actualizar todos los datos
result = await helper.update(date_from=date_from, date_to=date_to)
if not result:
print("❌ Error durante la descarga de datos")
return False
print("✅ Datos descargados correctamente")
# Mostrar estadísticas
await self.display_final_statistics(helper)
return True
except Exception as e:
print(f"❌ Error durante el dump: {e}")
return False
async def display_final_statistics(self, helper: EdataHelper):
"""Mostrar estadísticas finales del dump."""
print("📊 Estadísticas del dump completado:")
print("=" * 50)
summary = helper.attributes
print(f"🏠 CUPS: {summary.get('cups', 'N/A')}")
# Información de contrato
if summary.get("contract_p1_kW") is not None:
print(
f"⚡ Potencia contratada P1: {summary.get('contract_p1_kW', 'N/A')} kW"
)
if summary.get("contract_p2_kW") is not None:
print(
f"⚡ Potencia contratada P2: {summary.get('contract_p2_kW', 'N/A')} kW"
)
# Información de consumo
if summary.get("yesterday_kWh") is not None:
print(f"📈 Consumo ayer: {summary.get('yesterday_kWh', 'N/A')} kWh")
if summary.get("month_kWh") is not None:
print(f"📈 Consumo mes actual: {summary.get('month_kWh', 'N/A')} kWh")
if summary.get("last_month_kWh") is not None:
print(
f"📈 Consumo mes anterior: {summary.get('last_month_kWh', 'N/A')} kWh"
)
# Información de potencia máxima
if summary.get("max_power_kW") is not None:
print(
f"🔋 Potencia máxima registrada: {summary.get('max_power_kW', 'N/A')} kW"
)
# Información de costes (si está disponible)
if summary.get("month_€") is not None:
print(f"💰 Coste mes actual: {summary.get('month_€', 'N/A')} €")
if summary.get("last_month_€") is not None:
print(f"💰 Coste mes anterior: {summary.get('last_month_€', 'N/A')} €")
print(f"\n💾 Datos almacenados en: {self.storage_dir}")
async def run_interactive_session(self) -> bool:
"""Ejecutar sesión interactiva completa."""
print("🏠 Extractor interactivo de datos eléctricos")
print("=" * 50)
print(
"Este script te ayudará a extraer todos los datos de tu suministro eléctrico"
)
print()
try:
# 1. Obtener credenciales
if not self.get_credentials():
return False
# 2. Probar conexión
if not await self.test_connection():
return False
# 3. Obtener suministros
if not await self.fetch_supplies():
return False
# 4. Mostrar menú y seleccionar suministro
selected_supply = self.display_supplies_menu()
if not selected_supply:
print("👋 Operación cancelada")
return False
print(
f"\n✅ Seleccionado: {selected_supply.cups[-10:]} - {selected_supply.address or 'Sin dirección'}"
)
# 5. Configurar fechas
date_from, date_to = self.get_date_range()
# 6. Ejecutar dump
success = await self.dump_selected_supply(
selected_supply, date_from, date_to
)
if success:
print("\n🎉 ¡Dump completado exitosamente!")
print("Todos los datos han sido almacenados en la base de datos local.")
return success
except KeyboardInterrupt:
print("\n\n👋 Operación cancelada por el usuario")
return False
except Exception as e:
print(f"\n❌ Error durante la sesión interactiva: {e}")
return False
async def main():
"""Función principal."""
parser = argparse.ArgumentParser(
description="Extractor interactivo de datos eléctricos",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Ejemplo de uso:
# Modo interactivo
python -m edata.scripts.dump
# Con directorio personalizado
python -m edata.scripts.dump --storage-dir /ruta/datos
""",
)
parser.add_argument(
"--storage-dir",
default=".",
help="Directorio de almacenamiento (por defecto: directorio actual)",
)
args = parser.parse_args()
# Crear dumper
dumper = DumpSupply(storage_dir=args.storage_dir)
# Ejecutar modo interactivo
success = await dumper.run_interactive_session()
if success:
exit(0)
else:
exit(1)
if __name__ == "__main__":
asyncio.run(main())
"""Services package for edata."""
from edata.services.billing import BillingService
from edata.services.consumption import ConsumptionService
from edata.services.contract import ContractService
from edata.services.database import DatabaseService, get_database_service
from edata.services.maximeter import MaximeterService
from edata.services.supply import SupplyService
__all__ = [
"DatabaseService",
"get_database_service",
"SupplyService",
"ContractService",
"ConsumptionService",
"MaximeterService",
"BillingService",
]
"""Billing service for managing energy prices and billing calculations."""
import contextlib
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from jinja2 import Environment
from edata.connectors.redata import REDataConnector
from edata.models.pricing import PricingAggregated, PricingData, PricingRules
from edata.services.database import PVPCPricesModel, get_database_service
_LOGGER = logging.getLogger(__name__)
class BillingService:
"""Service for managing energy pricing and billing data."""
def __init__(self, storage_dir: Optional[str] = None):
"""Initialize billing service.
Args:
storage_dir: Directory for database storage
"""
self._redata = REDataConnector()
self._storage_dir = storage_dir
self._db_service = None
async def _get_db_service(self):
"""Get database service, initializing if needed."""
if self._db_service is None:
self._db_service = await get_database_service(self._storage_dir)
return self._db_service
async def update_pvpc_prices(
self, start_date: datetime, end_date: datetime, is_ceuta_melilla: bool = False
) -> Dict[str, Any]:
"""Update PVPC prices from REData API.
Args:
start_date: Start date for price data
end_date: End date for price data
is_ceuta_melilla: Whether to get prices for Ceuta/Melilla (True) or Peninsula (False)
Returns:
Dict with operation results and statistics
"""
geo_id = 8744 if is_ceuta_melilla else 8741
region = "Ceuta/Melilla" if is_ceuta_melilla else "Peninsula"
_LOGGER.info(
f"Updating PVPC prices for {region} from {start_date.date()} to {end_date.date()}"
)
# Determine actual start date based on existing data
actual_start_date = start_date
db_service = await self._get_db_service()
last_price_record = await db_service.get_latest_pvpc_price(geo_id=geo_id)
if last_price_record:
# Start from the hour after the last price record
actual_start_date = max(
start_date, last_price_record.datetime + timedelta(hours=1)
)
_LOGGER.info(
f"Found existing price data up to {last_price_record.datetime.date()}, fetching from {actual_start_date.date()}"
)
else:
_LOGGER.info(
f"No existing price data found for {region}, fetching all data"
)
# If actual start date is beyond end date, no new data needed
if actual_start_date >= end_date:
_LOGGER.info(f"No new price data needed for {region} (up to date)")
return {
"success": True,
"region": region,
"geo_id": geo_id,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": 0,
"saved": 0,
"updated": 0,
"skipped": "up_to_date",
},
"message": "Price data is up to date",
}
try:
# Fetch price data from REData (only missing data)
prices = await self._redata.get_realtime_prices(
dt_from=actual_start_date,
dt_to=end_date,
is_ceuta_melilla=is_ceuta_melilla,
)
# Save to database
saved_count = 0
updated_count = 0
for price in prices:
price_dict = price.model_dump()
price_dict["geo_id"] = geo_id
# Check if price already exists for this specific datetime and geo_id
existing = await db_service.get_pvpc_prices(
start_date=price.datetime, end_date=price.datetime, geo_id=geo_id
)
if existing:
updated_count += 1
else:
saved_count += 1
await db_service.save_pvpc_price(price_dict)
result = {
"success": True,
"region": region,
"geo_id": geo_id,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": len(prices),
"saved": saved_count,
"updated": updated_count,
},
}
if actual_start_date > start_date:
result["message"] = (
f"Fetched only missing price data from {actual_start_date.date()}"
)
_LOGGER.info(
f"PVPC price update completed: {len(prices)} fetched, "
f"{saved_count} saved, {updated_count} updated"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating PVPC prices for {region}: {str(e)}")
return {
"success": False,
"region": region,
"geo_id": geo_id,
"error": str(e),
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": (
actual_start_date.isoformat()
if "actual_start_date" in locals()
else start_date.isoformat()
),
},
}
def get_custom_prices(
self, pricing_rules: PricingRules, start_date: datetime, end_date: datetime
) -> List[PricingData]:
"""Calculate custom energy prices dynamically based on pricing rules.
Args:
pricing_rules: Custom pricing configuration
start_date: Start date for price data
end_date: End date for price data
Returns:
List of PricingData objects calculated on-the-fly
"""
if pricing_rules.is_pvpc:
raise ValueError("Use get_stored_pvpc_prices() for PVPC pricing rules")
_LOGGER.info(
f"Calculating custom prices from {start_date.date()} to {end_date.date()}"
)
try:
# Import here to avoid circular imports
from edata.utils import get_pvpc_tariff
prices = []
# Generate hourly prices based on custom rules
current_dt = start_date
while current_dt < end_date:
# Determine tariff period for this hour
tariff = get_pvpc_tariff(current_dt)
# Get the appropriate price based on tariff period
if tariff == "p1" and pricing_rules.p1_kwh_eur is not None:
price_eur_kwh = pricing_rules.p1_kwh_eur
elif tariff == "p2" and pricing_rules.p2_kwh_eur is not None:
price_eur_kwh = pricing_rules.p2_kwh_eur
elif tariff == "p3" and pricing_rules.p3_kwh_eur is not None:
price_eur_kwh = pricing_rules.p3_kwh_eur
else:
# Skip if no price defined for this period
current_dt += timedelta(hours=1)
continue
# Create PricingData object
price_data = PricingData(
datetime=current_dt, value_eur_kwh=price_eur_kwh, delta_h=1.0
)
prices.append(price_data)
current_dt += timedelta(hours=1)
_LOGGER.info(f"Generated {len(prices)} custom price points")
return prices
except Exception as e:
_LOGGER.error(f"Error calculating custom prices: {str(e)}")
raise
async def get_stored_pvpc_prices(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
geo_id: Optional[int] = None,
) -> List[PVPCPricesModel]:
"""Get stored PVPC prices from database.
Args:
start_date: Optional start date filter
end_date: Optional end date filter
geo_id: Optional geographic filter
Returns:
List of PVPCPrices objects
"""
db_service = await self._get_db_service()
return await db_service.get_pvpc_prices(start_date, end_date, geo_id)
async def get_prices(
self,
pricing_rules: PricingRules,
start_date: datetime,
end_date: datetime,
is_ceuta_melilla: bool = False,
) -> Optional[List[PricingData]]:
"""Get prices automatically based on pricing rules configuration.
Args:
pricing_rules: Pricing configuration
start_date: Start date for price data
end_date: End date for price data
is_ceuta_melilla: Whether to get PVPC prices for Ceuta/Melilla
Returns:
List of PricingData objects or None if missing required data
"""
if pricing_rules.is_pvpc:
# Get stored PVPC prices from database
geo_id = 8744 if is_ceuta_melilla else 8741
pvpc_prices = await self.get_stored_pvpc_prices(
start_date, end_date, geo_id
)
# Return None if no PVPC prices found
if not pvpc_prices:
_LOGGER.warning(
f"No PVPC prices found for geo_id {geo_id} from {start_date.date()} to {end_date.date()}"
)
return None
# Convert PVPCPrices to PricingData
return [
PricingData(
datetime=price.datetime,
value_eur_kwh=price.value_eur_kwh,
delta_h=price.delta_h,
)
for price in pvpc_prices
]
else:
# Check if custom pricing rules have required data
if (
pricing_rules.p1_kwh_eur is None
and pricing_rules.p2_kwh_eur is None
and pricing_rules.p3_kwh_eur is None
):
_LOGGER.warning("No custom energy prices defined in pricing rules")
return None
# Calculate custom prices dynamically
try:
custom_prices = self.get_custom_prices(
pricing_rules, start_date, end_date
)
# Return None if no prices could be generated
if not custom_prices:
_LOGGER.warning(
f"No custom prices could be generated for period {start_date.date()} to {end_date.date()}"
)
return None
return custom_prices
except Exception as e:
_LOGGER.error(f"Error generating custom prices: {str(e)}")
return None
async def get_cost(
self,
cups: str,
pricing_rules: PricingRules,
start_date: datetime,
end_date: datetime,
is_ceuta_melilla: bool = False,
) -> PricingAggregated:
"""Get billing cost for a period based on pricing rules.
First checks the billing table for existing data with the pricing rules hash.
If not found, calls update_missing_costs to calculate and store the data.
Then returns the aggregated cost from the billing table.
Args:
cups: CUPS identifier for consumption data
pricing_rules: Pricing configuration
start_date: Start date for cost calculation
end_date: End date for cost calculation
is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices
Returns:
PricingAggregated object with cost breakdown for the period
"""
_LOGGER.info(
f"Getting cost for CUPS {cups} from {start_date.date()} to {end_date.date()}"
)
try:
# Generate pricing configuration hash
db_service = await self._get_db_service()
pricing_config_hash = db_service.generate_pricing_config_hash(
pricing_rules.model_dump()
)
# Check if billing data already exists by looking for the latest billing record
latest_billing = await db_service.get_latest_billing(
cups=cups, pricing_config_hash=pricing_config_hash
)
# Determine if we need to calculate missing costs
needs_calculation = False
actual_start_date = start_date
if not latest_billing:
# No billing data exists for this configuration
needs_calculation = True
_LOGGER.info(
f"No billing data found for hash {pricing_config_hash[:8]}..., calculating all costs"
)
elif latest_billing.datetime < end_date - timedelta(hours=1):
# Billing data exists but is incomplete for the requested period
needs_calculation = True
actual_start_date = max(
start_date, latest_billing.datetime + timedelta(hours=1)
)
_LOGGER.info(
f"Found billing data up to {latest_billing.datetime.date()}, calculating from {actual_start_date.date()}"
)
# Calculate missing costs if needed
if needs_calculation:
update_result = await self.update_missing_costs(
cups,
pricing_rules,
actual_start_date,
end_date,
is_ceuta_melilla,
force_recalculate=False,
)
if not update_result["success"]:
_LOGGER.error(
f"Failed to update costs: {update_result.get('error', 'Unknown error')}"
)
return PricingAggregated(
datetime=start_date,
value_eur=0.0,
energy_term=0.0,
power_term=0.0,
others_term=0.0,
surplus_term=0.0,
delta_h=(end_date - start_date).total_seconds() / 3600,
)
# Get the complete billing data for the requested period
existing_billing = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# Aggregate the billing data
total_value_eur = 0.0
total_energy_term = 0.0
total_power_term = 0.0
total_others_term = 0.0
total_surplus_term = 0.0
total_hours = len(existing_billing)
for billing in existing_billing:
total_value_eur += billing.total_eur or 0.0
total_energy_term += billing.energy_term or 0.0
total_power_term += billing.power_term or 0.0
total_others_term += billing.others_term or 0.0
total_surplus_term += billing.surplus_term or 0.0
result = PricingAggregated(
datetime=start_date,
value_eur=round(total_value_eur, 6),
energy_term=round(total_energy_term, 6),
power_term=round(total_power_term, 6),
others_term=round(total_others_term, 6),
surplus_term=round(total_surplus_term, 6),
delta_h=total_hours,
)
_LOGGER.info(
f"Cost calculation completed for CUPS {cups}: "
f"€{total_value_eur:.2f} for {total_hours} hours"
)
return result
except Exception as e:
_LOGGER.error(f"Error getting cost for CUPS {cups}: {str(e)}")
raise
async def update_missing_costs(
self,
cups: str,
pricing_rules: PricingRules,
start_date: datetime,
end_date: datetime,
is_ceuta_melilla: bool = False,
force_recalculate: bool = False,
) -> Dict[str, Any]:
"""Calculate and store billing costs in the database.
Args:
cups: CUPS identifier for consumption data
pricing_rules: Pricing configuration
start_date: Start date for cost calculation
end_date: End date for cost calculation
is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices
force_recalculate: If True, recalculate even if billing data exists
Returns:
Dict with operation results and statistics
"""
_LOGGER.info(
f"Updating costs for CUPS {cups} from {start_date.date()} to {end_date.date()}"
)
try:
# Generate pricing configuration hash
db_service = await self._get_db_service()
pricing_config_hash = db_service.generate_pricing_config_hash(
pricing_rules.model_dump()
)
# Get existing billing data if not forcing recalculation
existing_billing = []
if not force_recalculate:
existing_billing = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# Create set of existing datetime for quick lookup
existing_hours = {billing.datetime for billing in existing_billing}
# Get consumption data
consumptions = await db_service.get_consumptions(cups, start_date, end_date)
if not consumptions:
_LOGGER.warning(
f"No consumption data found for CUPS {cups} in the specified period"
)
return {
"success": False,
"error": "No consumption data found",
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
},
}
# Get contract data for power terms
contracts = await db_service.get_contracts(cups)
if not contracts:
_LOGGER.warning(
f"No contract data found for CUPS {cups}, using defaults"
)
# Use default power values if no contracts found
default_contract = {
"power_p1": 3.45, # Default residential power
"power_p2": 3.45,
"date_start": start_date,
"date_end": end_date,
}
contracts = [type("MockContract", (), default_contract)()]
# Get pricing data
prices = await self.get_prices(
pricing_rules, start_date, end_date, is_ceuta_melilla
)
if prices is None:
_LOGGER.warning(
f"No pricing data available for CUPS {cups} in the specified period"
)
return {
"success": False,
"error": "No pricing data available",
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
},
}
# Create price lookup by datetime
price_lookup = {price.datetime: price.value_eur_kwh for price in prices}
# Build data structure similar to billing processor
data = {}
for consumption in consumptions:
data[consumption.datetime] = {
"datetime": consumption.datetime,
"kwh": consumption.value_kwh,
"surplus_kwh": (
consumption.surplus_kwh
if hasattr(consumption, "surplus_kwh")
and consumption.surplus_kwh is not None
else 0
),
}
# Add contract power data
for contract in contracts:
start_dt = getattr(contract, "date_start", start_date)
end_dt = getattr(contract, "date_end", end_date)
current = start_dt
while current <= end_dt and current <= end_date:
if current in data:
data[current]["p1_kw"] = getattr(contract, "power_p1", 3.45)
data[current]["p2_kw"] = getattr(contract, "power_p2", 3.45)
current += timedelta(hours=1)
# Add pricing data
for dt, kwh_eur in price_lookup.items():
if dt in data:
data[dt]["kwh_eur"] = kwh_eur
# Prepare Jinja2 expressions for cost calculation
env = Environment()
energy_expr = env.compile_expression(
f"({pricing_rules.energy_formula})|float"
)
power_expr = env.compile_expression(
f"({pricing_rules.power_formula})|float"
)
others_expr = env.compile_expression(
f"({pricing_rules.others_formula})|float"
)
surplus_expr = env.compile_expression(
f"({pricing_rules.surplus_formula})|float"
)
main_expr = env.compile_expression(f"({pricing_rules.main_formula})|float")
# Calculate and save costs for each hour
saved_count = 0
updated_count = 0
skipped_count = 0
for dt in sorted(data.keys()):
# Skip if already exists and not forcing recalculation
if not force_recalculate and dt in existing_hours:
skipped_count += 1
continue
hour_data = data[dt]
# Add pricing rules to hour data
hour_data.update(pricing_rules.model_dump())
# Import here to avoid circular imports
from edata.utils import get_pvpc_tariff
tariff = get_pvpc_tariff(hour_data["datetime"])
# Set energy price if not already set
if "kwh_eur" not in hour_data:
if tariff == "p1" and pricing_rules.p1_kwh_eur is not None:
hour_data["kwh_eur"] = pricing_rules.p1_kwh_eur
elif tariff == "p2" and pricing_rules.p2_kwh_eur is not None:
hour_data["kwh_eur"] = pricing_rules.p2_kwh_eur
elif tariff == "p3" and pricing_rules.p3_kwh_eur is not None:
hour_data["kwh_eur"] = pricing_rules.p3_kwh_eur
else:
continue # Skip if no price available
# Set surplus price based on tariff
if tariff == "p1":
hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p1_kwh_eur or 0
elif tariff == "p2":
hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p2_kwh_eur or 0
elif tariff == "p3":
hour_data["surplus_kwh_eur"] = pricing_rules.surplus_p3_kwh_eur or 0
# Calculate individual cost terms
energy_term = 0.0
power_term = 0.0
others_term = 0.0
surplus_term = 0.0
with contextlib.suppress(Exception):
result = energy_expr(**hour_data)
energy_term = round(float(result), 6) if result is not None else 0.0
result = power_expr(**hour_data)
power_term = round(float(result), 6) if result is not None else 0.0
result = others_expr(**hour_data)
others_term = round(float(result), 6) if result is not None else 0.0
result = surplus_expr(**hour_data)
surplus_term = (
round(float(result), 6) if result is not None else 0.0
)
# Calculate total using main formula
cost_data = {
"energy_term": energy_term,
"power_term": power_term,
"others_term": others_term,
"surplus_term": surplus_term,
**pricing_rules.model_dump(),
}
total_eur = 0.0
with contextlib.suppress(Exception):
result = main_expr(**cost_data)
total_eur = round(float(result), 6) if result is not None else 0.0
# Prepare billing data (only calculated terms, not raw data)
billing_data = {
"cups": cups,
"datetime": dt,
"energy_term": energy_term,
"power_term": power_term,
"others_term": others_term,
"surplus_term": surplus_term,
"total_eur": total_eur,
"tariff": tariff,
"pricing_config_hash": pricing_config_hash,
}
# Save to database
await db_service.save_billing(billing_data)
if dt in existing_hours:
updated_count += 1
else:
saved_count += 1
result = {
"success": True,
"cups": cups,
"pricing_config_hash": pricing_config_hash,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
},
"stats": {
"total_consumptions": len(consumptions),
"saved": saved_count,
"updated": updated_count,
"skipped": skipped_count,
"processed": saved_count + updated_count,
},
}
_LOGGER.info(
f"Billing cost update completed for CUPS {cups}: "
f"{saved_count} saved, {updated_count} updated, {skipped_count} skipped"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating costs for CUPS {cups}: {str(e)}")
return {
"success": False,
"error": str(e),
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
},
}
async def get_daily_costs(
self,
cups: str,
pricing_rules: PricingRules,
start_date: datetime,
end_date: datetime,
is_ceuta_melilla: bool = False,
) -> List[PricingAggregated]:
"""Get daily aggregated billing costs for a period.
Args:
cups: CUPS identifier for consumption data
pricing_rules: Pricing configuration
start_date: Start date for cost calculation
end_date: End date for cost calculation
is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices
Returns:
List of PricingAggregated objects, one per day
"""
_LOGGER.info(
f"Getting daily costs for CUPS {cups} from {start_date.date()} to {end_date.date()}"
)
try:
# Generate pricing configuration hash
db_service = await self._get_db_service()
pricing_config_hash = db_service.generate_pricing_config_hash(
pricing_rules.model_dump()
)
# Get billing data for the period
billing_records = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# If no billing data exists, calculate and store it first
if not billing_records:
_LOGGER.info(f"No billing data found, calculating costs first")
update_result = await self.update_missing_costs(
cups,
pricing_rules,
start_date,
end_date,
is_ceuta_melilla,
force_recalculate=False,
)
if not update_result["success"]:
_LOGGER.error(
f"Failed to update costs: {update_result.get('error', 'Unknown error')}"
)
return []
# Get the newly calculated billing data
billing_records = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# Group by day and aggregate
daily_aggregates = {}
for billing in billing_records:
# Get the date (without time) as key
date_key = billing.datetime.date()
if date_key not in daily_aggregates:
daily_aggregates[date_key] = {
"datetime": datetime.combine(date_key, datetime.min.time()),
"total_eur": 0.0,
"energy_term": 0.0,
"power_term": 0.0,
"others_term": 0.0,
"surplus_term": 0.0,
"hours": 0,
}
# Add this hour's costs
daily_aggregates[date_key]["total_eur"] += billing.total_eur or 0.0
daily_aggregates[date_key]["energy_term"] += billing.energy_term or 0.0
daily_aggregates[date_key]["power_term"] += billing.power_term or 0.0
daily_aggregates[date_key]["others_term"] += billing.others_term or 0.0
daily_aggregates[date_key]["surplus_term"] += (
billing.surplus_term or 0.0
)
daily_aggregates[date_key]["hours"] += 1
# Convert to PricingAggregated objects
result = []
for date_key in sorted(daily_aggregates.keys()):
agg = daily_aggregates[date_key]
pricing_agg = PricingAggregated(
datetime=agg["datetime"],
value_eur=round(agg["total_eur"], 6),
energy_term=round(agg["energy_term"], 6),
power_term=round(agg["power_term"], 6),
others_term=round(agg["others_term"], 6),
surplus_term=round(agg["surplus_term"], 6),
delta_h=agg["hours"],
)
result.append(pricing_agg)
_LOGGER.info(f"Generated {len(result)} daily cost aggregates")
return result
except Exception as e:
_LOGGER.error(f"Error getting daily costs for CUPS {cups}: {str(e)}")
raise
async def get_monthly_costs(
self,
cups: str,
pricing_rules: PricingRules,
start_date: datetime,
end_date: datetime,
is_ceuta_melilla: bool = False,
) -> List[PricingAggregated]:
"""Get monthly aggregated billing costs for a period.
Args:
cups: CUPS identifier for consumption data
pricing_rules: Pricing configuration
start_date: Start date for cost calculation
end_date: End date for cost calculation
is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices
Returns:
List of PricingAggregated objects, one per month
"""
_LOGGER.info(
f"Getting monthly costs for CUPS {cups} from {start_date.date()} to {end_date.date()}"
)
try:
# Generate pricing configuration hash
db_service = await self._get_db_service()
pricing_config_hash = db_service.generate_pricing_config_hash(
pricing_rules.model_dump()
)
# Get billing data for the period
billing_records = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# If no billing data exists, calculate and store it first
if not billing_records:
_LOGGER.info(f"No billing data found, calculating costs first")
update_result = await self.update_missing_costs(
cups,
pricing_rules,
start_date,
end_date,
is_ceuta_melilla,
force_recalculate=False,
)
if not update_result["success"]:
_LOGGER.error(
f"Failed to update costs: {update_result.get('error', 'Unknown error')}"
)
return []
# Get the newly calculated billing data
billing_records = await db_service.get_billing(
cups=cups,
start_date=start_date,
end_date=end_date,
pricing_config_hash=pricing_config_hash,
)
# Group by month and aggregate
monthly_aggregates = {}
for billing in billing_records:
# Get year-month as key
month_key = (billing.datetime.year, billing.datetime.month)
if month_key not in monthly_aggregates:
# Create datetime for first day of month
month_start = datetime(month_key[0], month_key[1], 1)
monthly_aggregates[month_key] = {
"datetime": month_start,
"total_eur": 0.0,
"energy_term": 0.0,
"power_term": 0.0,
"others_term": 0.0,
"surplus_term": 0.0,
"hours": 0,
}
# Add this hour's costs
monthly_aggregates[month_key]["total_eur"] += billing.total_eur or 0.0
monthly_aggregates[month_key]["energy_term"] += (
billing.energy_term or 0.0
)
monthly_aggregates[month_key]["power_term"] += billing.power_term or 0.0
monthly_aggregates[month_key]["others_term"] += (
billing.others_term or 0.0
)
monthly_aggregates[month_key]["surplus_term"] += (
billing.surplus_term or 0.0
)
monthly_aggregates[month_key]["hours"] += 1
# Convert to PricingAggregated objects
result = []
for month_key in sorted(monthly_aggregates.keys()):
agg = monthly_aggregates[month_key]
pricing_agg = PricingAggregated(
datetime=agg["datetime"],
value_eur=round(agg["total_eur"], 6),
energy_term=round(agg["energy_term"], 6),
power_term=round(agg["power_term"], 6),
others_term=round(agg["others_term"], 6),
surplus_term=round(agg["surplus_term"], 6),
delta_h=agg["hours"],
)
result.append(pricing_agg)
_LOGGER.info(f"Generated {len(result)} monthly cost aggregates")
return result
except Exception as e:
_LOGGER.error(f"Error getting monthly costs for CUPS {cups}: {str(e)}")
raise
async def get_billing_summary(
self,
cups: str,
pricing_rules: PricingRules,
target_date: Optional[datetime] = None,
is_ceuta_melilla: bool = False,
) -> Dict[str, Any]:
"""Get billing summary data compatible with EdataHelper attributes.
Args:
cups: CUPS identifier
pricing_rules: Pricing configuration
target_date: Reference date for calculations (defaults to today)
is_ceuta_melilla: Whether to use Ceuta/Melilla PVPC prices
Returns:
Dict with summary attributes matching EdataHelper format
"""
from dateutil.relativedelta import relativedelta
if target_date is None:
target_date = datetime.now()
# Calculate date ranges
month_starts = target_date.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
last_month_starts = month_starts - relativedelta(months=1)
# Initialize summary attributes
summary: Dict[str, Any] = {"month_€": None, "last_month_€": None}
try:
# Get current month cost
current_month_costs = await self.get_monthly_costs(
cups=cups,
pricing_rules=pricing_rules,
start_date=month_starts,
end_date=month_starts + relativedelta(months=1),
is_ceuta_melilla=is_ceuta_melilla,
)
if current_month_costs:
current_month_data = next(
(
c
for c in current_month_costs
if c.datetime.year == month_starts.year
and c.datetime.month == month_starts.month
),
None,
)
if current_month_data:
summary["month_€"] = current_month_data.value_eur
# Get last month cost
last_month_costs = await self.get_monthly_costs(
cups=cups,
pricing_rules=pricing_rules,
start_date=last_month_starts,
end_date=month_starts,
is_ceuta_melilla=is_ceuta_melilla,
)
if last_month_costs:
last_month_data = next(
(
c
for c in last_month_costs
if c.datetime.year == last_month_starts.year
and c.datetime.month == last_month_starts.month
),
None,
)
if last_month_data:
summary["last_month_€"] = last_month_data.value_eur
except Exception as e:
_LOGGER.warning(
f"Error calculating billing summary for CUPS {cups}: {str(e)}"
)
# Round numeric values to 2 decimal places
for key, value in summary.items():
if isinstance(value, float):
summary[key] = round(value, 2)
return summary
"""Consumption service for fetching and updating consumption data."""
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from edata.connectors.datadis import DatadisConnector
from edata.models.consumption import Consumption, ConsumptionAggregated
from edata.services.database import ConsumptionModel as DbConsumption
from edata.services.database import DatabaseService, get_database_service
from edata.utils import get_pvpc_tariff
_LOGGER = logging.getLogger(__name__)
class ConsumptionService:
"""Service for managing consumption data fetching and storage."""
def __init__(
self,
datadis_connector: DatadisConnector,
storage_dir: Optional[str] = None,
):
"""Initialize consumption service.
Args:
datadis_connector: Configured Datadis connector instance
storage_dir: Directory for database and cache storage
"""
self._datadis = datadis_connector
self._storage_dir = storage_dir
self._db_service = None
async def _get_db_service(self) -> DatabaseService:
"""Get database service, initializing if needed."""
if self._db_service is None:
self._db_service = await get_database_service(self._storage_dir)
return self._db_service
async def update_consumptions(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str = "0",
point_type: int = 5,
authorized_nif: Optional[str] = None,
force_full_update: bool = False,
) -> Dict[str, Any]:
"""Update consumption data for a CUPS in the specified date range.
Args:
cups: CUPS identifier
distributor_code: Distributor company code
start_date: Start date for consumption data
end_date: End date for consumption data
measurement_type: Type of measurement (default "0" for hourly)
point_type: Type of supply point (default 5)
authorized_nif: Authorized NIF if accessing on behalf of someone
force_full_update: If True, fetch all data ignoring existing records
Returns:
Dict with operation results and statistics
"""
_LOGGER.info(
f"Updating consumptions for CUPS {cups[-5:]:>5} from {start_date.date()} to {end_date.date()}"
)
# Determine actual start date based on existing data
actual_start_date = start_date
if not force_full_update:
last_consumption_date = await self.get_last_consumption_date(cups)
if last_consumption_date:
# Start from the day after the last consumption
actual_start_date = max(
start_date, last_consumption_date + timedelta(hours=1)
)
_LOGGER.info(
f"Found existing data up to {last_consumption_date.date()}, fetching from {actual_start_date.date()}"
)
else:
_LOGGER.info(
f"No existing consumption data found for CUPS {cups[-5:]:>5}, fetching all data"
)
# If actual start date is beyond end date, no new data needed
if actual_start_date >= end_date:
_LOGGER.info(
f"No new consumption data needed for CUPS {cups[-5:]:>5} (up to date)"
)
return {
"success": True,
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": 0,
"saved": 0,
"updated": 0,
"skipped": "up_to_date",
},
"message": "Data is up to date",
}
try:
# Fetch consumption data from datadis (only missing data)
consumptions = await self._datadis.get_consumption_data(
cups=cups,
distributor_code=distributor_code,
start_date=actual_start_date,
end_date=end_date,
measurement_type=measurement_type,
point_type=point_type,
authorized_nif=authorized_nif,
)
# Save to database
saved_count = 0
updated_count = 0
for consumption in consumptions:
# Convert Pydantic model to dict and add CUPS
consumption_dict = consumption.model_dump()
consumption_dict["cups"] = cups
# Check if consumption already exists
db_service = await self._get_db_service()
existing = await db_service.get_consumptions(
cups=cups,
start_date=consumption.datetime,
end_date=consumption.datetime,
)
if existing:
updated_count += 1
else:
saved_count += 1
await db_service.save_consumption(consumption_dict)
result = {
"success": True,
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": len(consumptions),
"saved": saved_count,
"updated": updated_count,
},
}
if actual_start_date > start_date:
result["message"] = (
f"Fetched only missing data from {actual_start_date.date()}"
)
_LOGGER.info(
f"Consumption update completed: {len(consumptions)} fetched, "
f"{saved_count} saved, {updated_count} updated"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating consumptions for CUPS {cups}: {str(e)}")
return {
"success": False,
"cups": cups,
"error": str(e),
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": (
actual_start_date.isoformat()
if "actual_start_date" in locals()
else start_date.isoformat()
),
},
}
async def update_consumption_range_by_months(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str = "0",
point_type: int = 5,
authorized_nif: Optional[str] = None,
force_full_update: bool = False,
) -> Dict[str, Any]:
"""Update consumption data month by month to respect datadis limits.
Args:
cups: CUPS identifier
distributor_code: Distributor company code
start_date: Start date for consumption data
end_date: End date for consumption data
measurement_type: Type of measurement (default "0" for hourly)
point_type: Type of supply point (default 5)
authorized_nif: Authorized NIF if accessing on behalf of someone
force_full_update: If True, fetch all data ignoring existing records
Returns:
Dict with operation results and statistics for all months
"""
_LOGGER.info(
f"Updating consumption range for CUPS {cups[-5:]:>5} "
f"from {start_date.date()} to {end_date.date()} by months"
)
results = []
current_date = start_date
while current_date < end_date:
# Calculate month end
if current_date.month == 12:
month_end = current_date.replace(
year=current_date.year + 1, month=1, day=1
)
else:
month_end = current_date.replace(month=current_date.month + 1, day=1)
# Don't go past the requested end date
actual_end = min(month_end, end_date)
# Update consumptions for this month
consumption_result = await self.update_consumptions(
cups=cups,
distributor_code=distributor_code,
start_date=current_date,
end_date=actual_end,
measurement_type=measurement_type,
point_type=point_type,
authorized_nif=authorized_nif,
force_full_update=force_full_update,
)
result_entry = {
"month": current_date.strftime("%Y-%m"),
"consumption": consumption_result,
}
results.append(result_entry)
current_date = month_end
# Calculate totals
total_consumptions_fetched = sum(
r["consumption"]["stats"]["fetched"]
for r in results
if r["consumption"]["success"]
)
total_consumptions_saved = sum(
r["consumption"]["stats"]["saved"]
for r in results
if r["consumption"]["success"]
)
total_consumptions_updated = sum(
r["consumption"]["stats"]["updated"]
for r in results
if r["consumption"]["success"]
)
summary = {
"success": all(r["consumption"]["success"] for r in results),
"cups": cups,
"period": {"start": start_date.isoformat(), "end": end_date.isoformat()},
"months_processed": len(results),
"total_stats": {
"consumptions_fetched": total_consumptions_fetched,
"consumptions_saved": total_consumptions_saved,
"consumptions_updated": total_consumptions_updated,
},
"monthly_results": results,
}
_LOGGER.info(
f"Consumption range update completed: {len(results)} months processed, "
f"{total_consumptions_fetched} consumptions fetched"
)
return summary
async def get_stored_consumptions(
self,
cups: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[DbConsumption]:
"""Get stored consumptions from database.
Args:
cups: CUPS identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of database Consumption objects
"""
db_service = await self._get_db_service()
return await db_service.get_consumptions(cups, start_date, end_date)
async def get_last_consumption_date(self, cups: str) -> Optional[datetime]:
"""Get the date of the last consumption record in the database.
Args:
cups: CUPS identifier
Returns:
Datetime of last consumption or None if no data exists
"""
db_service = await self._get_db_service()
latest_consumption = await db_service.get_latest_consumption(cups)
if latest_consumption:
return latest_consumption.datetime
return None
async def get_daily_consumptions(
self, cups: str, start_date: datetime, end_date: datetime
) -> List[ConsumptionAggregated]:
"""Calculate daily consumption aggregations.
Args:
cups: CUPS identifier
start_date: Start date for aggregation
end_date: End date for aggregation
Returns:
List of daily consumption aggregations
"""
# Get hourly consumptions from database
db_service = await self._get_db_service()
db_consumptions = await db_service.get_consumptions(cups, start_date, end_date)
# Convert to Pydantic models for processing
consumptions = []
for db_cons in db_consumptions:
cons = Consumption(
datetime=db_cons.datetime,
delta_h=db_cons.delta_h,
value_kwh=db_cons.value_kwh,
surplus_kwh=db_cons.surplus_kwh or 0.0,
real=db_cons.real or True,
)
consumptions.append(cons)
# Sort by datetime
consumptions.sort(key=lambda x: x.datetime)
# Aggregate by day
daily_aggregations = {}
for consumption in consumptions:
curr_day = consumption.datetime.replace(
hour=0, minute=0, second=0, microsecond=0
)
# Determine tariff period
tariff = get_pvpc_tariff(consumption.datetime)
# Initialize daily aggregation if not exists
if curr_day not in daily_aggregations:
daily_aggregations[curr_day] = {
"datetime": curr_day,
"value_kwh": 0.0,
"value_p1_kwh": 0.0,
"value_p2_kwh": 0.0,
"value_p3_kwh": 0.0,
"surplus_kwh": 0.0,
"surplus_p1_kwh": 0.0,
"surplus_p2_kwh": 0.0,
"surplus_p3_kwh": 0.0,
"delta_h": 0.0,
}
# Add consumption values
daily_aggregations[curr_day]["value_kwh"] += consumption.value_kwh
daily_aggregations[curr_day]["surplus_kwh"] += consumption.surplus_kwh
daily_aggregations[curr_day]["delta_h"] += consumption.delta_h
# Add by tariff period
if tariff == "p1":
daily_aggregations[curr_day]["value_p1_kwh"] += consumption.value_kwh
daily_aggregations[curr_day][
"surplus_p1_kwh"
] += consumption.surplus_kwh
elif tariff == "p2":
daily_aggregations[curr_day]["value_p2_kwh"] += consumption.value_kwh
daily_aggregations[curr_day][
"surplus_p2_kwh"
] += consumption.surplus_kwh
elif tariff == "p3":
daily_aggregations[curr_day]["value_p3_kwh"] += consumption.value_kwh
daily_aggregations[curr_day][
"surplus_p3_kwh"
] += consumption.surplus_kwh
# Convert to ConsumptionAggregated objects and round values
result = []
for day_data in sorted(
daily_aggregations.values(), key=lambda x: x["datetime"]
):
# Round all float values to 2 decimal places
for key, value in day_data.items():
if isinstance(value, float):
day_data[key] = round(value, 2)
aggregated = ConsumptionAggregated(**day_data)
result.append(aggregated)
return result
async def get_monthly_consumptions(
self,
cups: str,
start_date: datetime,
end_date: datetime,
cycle_start_day: int = 1,
) -> List[ConsumptionAggregated]:
"""Calculate monthly consumption aggregations.
Args:
cups: CUPS identifier
start_date: Start date for aggregation
end_date: End date for aggregation
cycle_start_day: Day of month when billing cycle starts (1-30)
Returns:
List of monthly consumption aggregations
"""
# Get hourly consumptions from database
db_service = await self._get_db_service()
db_consumptions = await db_service.get_consumptions(cups, start_date, end_date)
# Convert to Pydantic models for processing
consumptions = []
for db_cons in db_consumptions:
cons = Consumption(
datetime=db_cons.datetime,
delta_h=db_cons.delta_h,
value_kwh=db_cons.value_kwh,
surplus_kwh=db_cons.surplus_kwh or 0.0,
real=db_cons.real or True,
)
consumptions.append(cons)
# Sort by datetime
consumptions.sort(key=lambda x: x.datetime)
# Calculate cycle offset
cycle_offset = cycle_start_day - 1
# Aggregate by month (considering billing cycle)
monthly_aggregations = {}
for consumption in consumptions:
curr_day = consumption.datetime.replace(
hour=0, minute=0, second=0, microsecond=0
)
# Adjust for billing cycle start day
billing_month_date = (curr_day - timedelta(days=cycle_offset)).replace(
day=1
)
# Determine tariff period
tariff = get_pvpc_tariff(consumption.datetime)
# Initialize monthly aggregation if not exists
if billing_month_date not in monthly_aggregations:
monthly_aggregations[billing_month_date] = {
"datetime": billing_month_date,
"value_kwh": 0.0,
"value_p1_kwh": 0.0,
"value_p2_kwh": 0.0,
"value_p3_kwh": 0.0,
"surplus_kwh": 0.0,
"surplus_p1_kwh": 0.0,
"surplus_p2_kwh": 0.0,
"surplus_p3_kwh": 0.0,
"delta_h": 0.0,
}
# Add consumption values
monthly_aggregations[billing_month_date][
"value_kwh"
] += consumption.value_kwh
monthly_aggregations[billing_month_date][
"surplus_kwh"
] += consumption.surplus_kwh
monthly_aggregations[billing_month_date]["delta_h"] += consumption.delta_h
# Add by tariff period
if tariff == "p1":
monthly_aggregations[billing_month_date][
"value_p1_kwh"
] += consumption.value_kwh
monthly_aggregations[billing_month_date][
"surplus_p1_kwh"
] += consumption.surplus_kwh
elif tariff == "p2":
monthly_aggregations[billing_month_date][
"value_p2_kwh"
] += consumption.value_kwh
monthly_aggregations[billing_month_date][
"surplus_p2_kwh"
] += consumption.surplus_kwh
elif tariff == "p3":
monthly_aggregations[billing_month_date][
"value_p3_kwh"
] += consumption.value_kwh
monthly_aggregations[billing_month_date][
"surplus_p3_kwh"
] += consumption.surplus_kwh
# Convert to ConsumptionAggregated objects and round values
result = []
for month_data in sorted(
monthly_aggregations.values(), key=lambda x: x["datetime"]
):
# Round all float values to 2 decimal places
for key, value in month_data.items():
if isinstance(value, float):
month_data[key] = round(value, 2)
aggregated = ConsumptionAggregated(**month_data)
result.append(aggregated)
return result
async def get_consumption_summary(
self, cups: str, target_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get consumption summary data compatible with EdataHelper attributes.
Args:
cups: CUPS identifier
target_date: Reference date for calculations (defaults to today)
Returns:
Dict with summary attributes matching EdataHelper format
"""
from datetime import timedelta
from dateutil.relativedelta import relativedelta
if target_date is None:
target_date = datetime.now()
# Calculate date ranges
today_starts = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_starts = today_starts - timedelta(days=1)
month_starts = target_date.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
last_month_starts = month_starts - relativedelta(months=1)
# Get daily and monthly aggregations
daily_consumptions = await self.get_daily_consumptions(
cups=cups, start_date=yesterday_starts, end_date=today_starts
)
monthly_consumptions = await self.get_monthly_consumptions(
cups=cups,
start_date=last_month_starts,
end_date=month_starts + relativedelta(months=1),
)
# Get all consumptions to find last registered data
all_consumptions = await self.get_stored_consumptions(cups=cups)
# Initialize summary attributes
summary: Dict[str, Any] = {
# Yesterday consumption
"yesterday_kWh": None,
"yesterday_hours": None,
"yesterday_p1_kWh": None,
"yesterday_p2_kWh": None,
"yesterday_p3_kWh": None,
"yesterday_surplus_kWh": None,
"yesterday_surplus_p1_kWh": None,
"yesterday_surplus_p2_kWh": None,
"yesterday_surplus_p3_kWh": None,
# Current month consumption
"month_kWh": None,
"month_surplus_kWh": None,
"month_days": None,
"month_daily_kWh": None,
"month_p1_kWh": None,
"month_p2_kWh": None,
"month_p3_kWh": None,
"month_surplus_p1_kWh": None,
"month_surplus_p2_kWh": None,
"month_surplus_p3_kWh": None,
# Last month consumption
"last_month_kWh": None,
"last_month_surplus_kWh": None,
"last_month_days": None,
"last_month_daily_kWh": None,
"last_month_p1_kWh": None,
"last_month_p2_kWh": None,
"last_month_p3_kWh": None,
"last_month_surplus_p1_kWh": None,
"last_month_surplus_p2_kWh": None,
"last_month_surplus_p3_kWh": None,
# Last registered data
"last_registered_date": None,
"last_registered_day_kWh": None,
"last_registered_day_surplus_kWh": None,
"last_registered_day_hours": None,
"last_registered_day_p1_kWh": None,
"last_registered_day_p2_kWh": None,
"last_registered_day_p3_kWh": None,
"last_registered_day_surplus_p1_kWh": None,
"last_registered_day_surplus_p2_kWh": None,
"last_registered_day_surplus_p3_kWh": None,
}
# Fill yesterday data
yesterday_data = next(
(
d
for d in daily_consumptions
if d.datetime.date() == yesterday_starts.date()
),
None,
)
if yesterday_data:
summary["yesterday_kWh"] = yesterday_data.value_kwh
summary["yesterday_hours"] = yesterday_data.delta_h
summary["yesterday_p1_kWh"] = yesterday_data.value_p1_kwh
summary["yesterday_p2_kWh"] = yesterday_data.value_p2_kwh
summary["yesterday_p3_kWh"] = yesterday_data.value_p3_kwh
summary["yesterday_surplus_kWh"] = yesterday_data.surplus_kwh
summary["yesterday_surplus_p1_kWh"] = yesterday_data.surplus_p1_kwh
summary["yesterday_surplus_p2_kWh"] = yesterday_data.surplus_p2_kwh
summary["yesterday_surplus_p3_kWh"] = yesterday_data.surplus_p3_kwh
# Fill current month data
current_month_data = next(
(
m
for m in monthly_consumptions
if m.datetime.year == month_starts.year
and m.datetime.month == month_starts.month
),
None,
)
if current_month_data:
summary["month_kWh"] = current_month_data.value_kwh
summary["month_surplus_kWh"] = current_month_data.surplus_kwh
summary["month_days"] = (
current_month_data.delta_h / 24 if current_month_data.delta_h else None
)
summary["month_daily_kWh"] = (
(current_month_data.value_kwh / (current_month_data.delta_h / 24))
if current_month_data.delta_h and current_month_data.delta_h > 0
else None
)
summary["month_p1_kWh"] = current_month_data.value_p1_kwh
summary["month_p2_kWh"] = current_month_data.value_p2_kwh
summary["month_p3_kWh"] = current_month_data.value_p3_kwh
summary["month_surplus_p1_kWh"] = current_month_data.surplus_p1_kwh
summary["month_surplus_p2_kWh"] = current_month_data.surplus_p2_kwh
summary["month_surplus_p3_kWh"] = current_month_data.surplus_p3_kwh
# Fill last month data
last_month_data = next(
(
m
for m in monthly_consumptions
if m.datetime.year == last_month_starts.year
and m.datetime.month == last_month_starts.month
),
None,
)
if last_month_data:
summary["last_month_kWh"] = last_month_data.value_kwh
summary["last_month_surplus_kWh"] = last_month_data.surplus_kwh
summary["last_month_days"] = (
last_month_data.delta_h / 24 if last_month_data.delta_h else None
)
summary["last_month_daily_kWh"] = (
(last_month_data.value_kwh / (last_month_data.delta_h / 24))
if last_month_data.delta_h and last_month_data.delta_h > 0
else None
)
summary["last_month_p1_kWh"] = last_month_data.value_p1_kwh
summary["last_month_p2_kWh"] = last_month_data.value_p2_kwh
summary["last_month_p3_kWh"] = last_month_data.value_p3_kwh
summary["last_month_surplus_p1_kWh"] = last_month_data.surplus_p1_kwh
summary["last_month_surplus_p2_kWh"] = last_month_data.surplus_p2_kwh
summary["last_month_surplus_p3_kWh"] = last_month_data.surplus_p3_kwh
# Fill last registered data
if all_consumptions:
# Sort by datetime and get the last one
last_consumption = max(all_consumptions, key=lambda c: c.datetime)
summary["last_registered_date"] = last_consumption.datetime
# Get the last day's aggregated data
last_day_start = last_consumption.datetime.replace(
hour=0, minute=0, second=0, microsecond=0
)
last_day_end = last_day_start + timedelta(days=1)
last_day_daily = await self.get_daily_consumptions(
cups=cups, start_date=last_day_start, end_date=last_day_end
)
if last_day_daily:
last_day_data = last_day_daily[0]
summary["last_registered_day_kWh"] = last_day_data.value_kwh
summary["last_registered_day_surplus_kWh"] = last_day_data.surplus_kwh
summary["last_registered_day_hours"] = last_day_data.delta_h
summary["last_registered_day_p1_kWh"] = last_day_data.value_p1_kwh
summary["last_registered_day_p2_kWh"] = last_day_data.value_p2_kwh
summary["last_registered_day_p3_kWh"] = last_day_data.value_p3_kwh
summary["last_registered_day_surplus_p1_kWh"] = (
last_day_data.surplus_p1_kwh
)
summary["last_registered_day_surplus_p2_kWh"] = (
last_day_data.surplus_p2_kwh
)
summary["last_registered_day_surplus_p3_kWh"] = (
last_day_data.surplus_p3_kwh
)
# Round numeric values to 2 decimal places
for key, value in summary.items():
if isinstance(value, float):
summary[key] = round(value, 2)
return summary
"""Contract service for fetching and managing contract data."""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from edata.connectors.datadis import DatadisConnector
from edata.services.database import ContractModel, DatabaseService, get_database_service
_LOGGER = logging.getLogger(__name__)
class ContractService:
"""Service for managing contract data fetching and storage."""
def __init__(
self,
datadis_connector: DatadisConnector,
storage_dir: Optional[str] = None,
):
"""Initialize contract service.
Args:
datadis_connector: Configured Datadis connector instance
storage_dir: Directory for database and cache storage
"""
self._datadis = datadis_connector
self._storage_dir = storage_dir
self._db_service = None
async def _get_db_service(self) -> DatabaseService:
"""Get database service, initializing if needed."""
if self._db_service is None:
self._db_service = await get_database_service(self._storage_dir)
return self._db_service
async def update_contracts(
self, cups: str, distributor_code: str, authorized_nif: Optional[str] = None
) -> Dict[str, Any]:
"""Update contract data for a CUPS.
Args:
cups: CUPS identifier
distributor_code: Distributor code for the CUPS
authorized_nif: Optional authorized NIF for access
Returns:
Dict with operation results and statistics
"""
_LOGGER.info(f"Updating contracts for CUPS {cups[-5:]}")
try:
# Fetch contract data from Datadis
contracts_data = await self._datadis.get_contract_detail(
cups=cups,
distributor_code=distributor_code,
authorized_nif=authorized_nif,
)
if not contracts_data:
_LOGGER.warning(f"No contract data found for CUPS {cups[-5:]}")
return {
"success": True,
"stats": {
"fetched": 0,
"saved": 0,
"updated": 0,
"total_stored": 0,
},
}
# Get existing contracts to avoid duplicates
db_service = await self._get_db_service()
existing = await db_service.get_contracts(cups=cups)
existing_periods = {(c.date_start, c.date_end) for c in existing}
# Save contracts to database
saved_count = 0
updated_count = 0
for contract in contracts_data:
contract_dict = contract.model_dump()
contract_dict["cups"] = cups
# Check if this contract period already exists
period_key = (contract.date_start, contract.date_end)
if period_key in existing_periods:
updated_count += 1
_LOGGER.debug(
f"Updating existing contract for CUPS {cups[-5:]} "
f"period {contract.date_start.date()}-{contract.date_end.date()}"
)
else:
saved_count += 1
_LOGGER.debug(
f"Saving new contract for CUPS {cups[-5:]} "
f"period {contract.date_start.date()}-{contract.date_end.date()}"
)
# Save to database
await db_service.save_contract(contract_dict)
# Get total contracts stored for this CUPS
all_contracts = await db_service.get_contracts(cups=cups)
total_stored = len(all_contracts)
result = {
"success": True,
"stats": {
"fetched": len(contracts_data),
"saved": saved_count,
"updated": updated_count,
"total_stored": total_stored,
},
}
_LOGGER.info(
f"Contract update completed for CUPS {cups[-5:]}: "
f"{len(contracts_data)} fetched, {saved_count} saved, {updated_count} updated"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating contracts for CUPS {cups[-5:]}: {str(e)}")
return {
"success": False,
"error": str(e),
"stats": {"fetched": 0, "saved": 0, "updated": 0, "total_stored": 0},
}
async def get_contracts(
self,
cups: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[ContractModel]:
"""Get stored contract data for a CUPS.
Args:
cups: CUPS identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of Contract objects
"""
_LOGGER.debug(
f"Getting contracts for CUPS {cups[-5:]}"
f"{f' from {start_date.date()}' if start_date else ''}"
f"{f' to {end_date.date()}' if end_date else ''}"
)
try:
db_service = await self._get_db_service()
contracts = await db_service.get_contracts(cups=cups)
# Apply date filters if provided
if start_date or end_date:
filtered_contracts = []
for contract in contracts:
# Check if contract period overlaps with requested period
if start_date and contract.date_end < start_date:
continue
if end_date and contract.date_start > end_date:
continue
filtered_contracts.append(contract)
contracts = filtered_contracts
_LOGGER.debug(f"Found {len(contracts)} contracts for CUPS {cups[-5:]}")
return contracts
except Exception as e:
_LOGGER.error(f"Error getting contracts for CUPS {cups[-5:]}: {str(e)}")
return []
async def get_active_contract(
self, cups: str, reference_date: Optional[datetime] = None
) -> Optional[ContractModel]:
"""Get the active contract for a CUPS at a specific date.
Args:
cups: CUPS identifier
reference_date: Date to check for active contract (defaults to now)
Returns:
Active contract if found, None otherwise
"""
if reference_date is None:
reference_date = datetime.now()
_LOGGER.debug(
f"Getting active contract for CUPS {cups[-5:]} at {reference_date.date()}"
)
try:
contracts = await self.get_contracts(cups=cups)
for contract in contracts:
if contract.date_start <= reference_date <= contract.date_end:
_LOGGER.debug(
f"Found active contract for CUPS {cups[-5:]} "
f"period {contract.date_start.date()}-{contract.date_end.date()}"
)
return contract
_LOGGER.warning(
f"No active contract found for CUPS {cups[-5:]} at {reference_date.date()}"
)
return None
except Exception as e:
_LOGGER.error(
f"Error getting active contract for CUPS {cups[-5:]}: {str(e)}"
)
return None
async def get_latest_contract(self, cups: str) -> Optional[ContractModel]:
"""Get the most recent contract for a CUPS.
Args:
cups: CUPS identifier
Returns:
Latest contract if found, None otherwise
"""
_LOGGER.debug(f"Getting latest contract for CUPS {cups[-5:]}")
try:
contracts = await self.get_contracts(cups=cups)
if not contracts:
_LOGGER.warning(f"No contracts found for CUPS {cups[-5:]}")
return None
# Sort by end date descending to get the most recent
latest_contract = max(contracts, key=lambda c: c.date_end)
_LOGGER.debug(
f"Found latest contract for CUPS {cups[-5:]} "
f"period {latest_contract.date_start.date()}-{latest_contract.date_end.date()}"
)
return latest_contract
except Exception as e:
_LOGGER.error(
f"Error getting latest contract for CUPS {cups[-5:]}: {str(e)}"
)
return None
async def get_contract_summary(self, cups: str) -> Dict[str, Any]:
"""Get contract summary attributes for a CUPS.
Args:
cups: CUPS identifier
Returns:
Dict with contract summary attributes
"""
_LOGGER.debug(f"Getting contract summary for CUPS {cups[-5:]}")
try:
# Get the most recent contract
latest_contract = await self.get_latest_contract(cups)
if not latest_contract:
_LOGGER.warning(f"No contracts found for CUPS {cups[-5:]}")
return {
"contract_p1_kW": None,
"contract_p2_kW": None,
}
summary = {
"contract_p1_kW": latest_contract.power_p1,
"contract_p2_kW": latest_contract.power_p2,
# Add other contract-related summary attributes here as needed
}
_LOGGER.debug(f"Contract summary calculated for CUPS {cups[-5:]}")
return summary
except Exception as e:
_LOGGER.error(
f"Error getting contract summary for CUPS {cups[-5:]}: {str(e)}"
)
return {
"contract_p1_kW": None,
"contract_p2_kW": None,
}
async def get_contract_stats(self, cups: str) -> Dict[str, Any]:
"""Get statistics about contracts for a CUPS.
Args:
cups: CUPS identifier
Returns:
Dict with contract statistics
"""
_LOGGER.debug(f"Getting contract statistics for CUPS {cups[-5:]}")
try:
contracts = await self.get_contracts(cups=cups)
if not contracts:
return {
"total_contracts": 0,
"date_range": None,
"power_ranges": {},
}
# Calculate date range
earliest_start = min(c.date_start for c in contracts)
latest_end = max(c.date_end for c in contracts)
# Calculate power ranges
p1_powers = [c.power_p1 for c in contracts if c.power_p1 is not None]
p2_powers = [c.power_p2 for c in contracts if c.power_p2 is not None]
power_ranges = {}
if p1_powers:
power_ranges["p1_kw"] = {"min": min(p1_powers), "max": max(p1_powers)}
if p2_powers:
power_ranges["p2_kw"] = {"min": min(p2_powers), "max": max(p2_powers)}
stats = {
"total_contracts": len(contracts),
"date_range": {
"earliest_start": earliest_start,
"latest_end": latest_end,
},
"power_ranges": power_ranges,
}
_LOGGER.debug(f"Contract statistics calculated for CUPS {cups[-5:]}")
return stats
except Exception as e:
_LOGGER.error(
f"Error getting contract statistics for CUPS {cups[-5:]}: {str(e)}"
)
return {}
"""Database service for edata using SQLModel and SQLite with async support."""
import hashlib
import os
from datetime import datetime as DateTime
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlmodel import SQLModel, desc, select
from edata.const import DEFAULT_STORAGE_DIR
from edata.models.database import (
BillingModel,
ConsumptionModel,
ContractModel,
MaxPowerModel,
PVPCPricesModel,
SupplyModel,
)
class DatabaseService:
"""Service for managing the SQLite database with async support."""
def __init__(self, storage_dir: Optional[str] = None):
"""Initialize database service.
Args:
storage_dir: Directory to store database, defaults to same as cache
"""
if storage_dir is None:
storage_dir = DEFAULT_STORAGE_DIR
self._db_dir = os.path.join(storage_dir)
os.makedirs(self._db_dir, exist_ok=True)
db_path = os.path.join(self._db_dir, "edata.db")
# Use aiosqlite for async SQLite operations
self._engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async def create_tables(self):
"""Create tables asynchronously."""
async with self._engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
def get_session(self) -> AsyncSession:
"""Get an async database session."""
return AsyncSession(self._engine)
async def save_supply(self, supply_data: dict) -> SupplyModel:
"""Save or update a supply record."""
async with self.get_session() as session:
# Check if supply exists
existing = await session.get(SupplyModel, supply_data["cups"])
if existing:
# Update existing record
for key, value in supply_data.items():
if hasattr(existing, key) and key != "cups":
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
supply = SupplyModel(**supply_data)
session.add(supply)
await session.commit()
await session.refresh(supply)
return supply
async def save_contract(self, contract_data: dict) -> ContractModel:
"""Save or update a contract record."""
async with self.get_session() as session:
# Check if contract exists (by cups + date_start)
stmt = select(ContractModel).where(
ContractModel.cups == contract_data["cups"],
ContractModel.date_start == contract_data["date_start"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
for key, value in contract_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
contract = ContractModel(**contract_data)
session.add(contract)
await session.commit()
await session.refresh(contract)
return contract
async def save_consumption(self, consumption_data: dict) -> ConsumptionModel:
"""Save or update a consumption record."""
async with self.get_session() as session:
# Check if consumption exists (by cups + datetime)
stmt = select(ConsumptionModel).where(
ConsumptionModel.cups == consumption_data["cups"],
ConsumptionModel.datetime == consumption_data["datetime"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
for key, value in consumption_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
consumption = ConsumptionModel(**consumption_data)
session.add(consumption)
await session.commit()
await session.refresh(consumption)
return consumption
async def save_maxpower(self, maxpower_data: dict) -> MaxPowerModel:
"""Save or update a maxpower record."""
async with self.get_session() as session:
# Check if maxpower exists (by cups + datetime)
stmt = select(MaxPowerModel).where(
MaxPowerModel.cups == maxpower_data["cups"],
MaxPowerModel.datetime == maxpower_data["datetime"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
for key, value in maxpower_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
maxpower = MaxPowerModel(**maxpower_data)
session.add(maxpower)
await session.commit()
await session.refresh(maxpower)
return maxpower
async def get_supply(self, cups: str) -> Optional[SupplyModel]:
"""Get a supply by CUPS."""
async with self.get_session() as session:
return await session.get(SupplyModel, cups)
async def get_supplies(self, cups: Optional[str] = None) -> List[SupplyModel]:
"""Get supplies, optionally filtered by CUPS."""
async with self.get_session() as session:
stmt = select(SupplyModel)
if cups:
stmt = stmt.where(SupplyModel.cups == cups)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_supply(
self, cups: Optional[str] = None
) -> Optional[SupplyModel]:
"""Get the most recently updated supply, optionally filtered by CUPS."""
async with self.get_session() as session:
stmt = select(SupplyModel)
if cups:
stmt = stmt.where(SupplyModel.cups == cups)
stmt = stmt.order_by(desc(SupplyModel.updated_at))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_contracts(self, cups: str) -> List[ContractModel]:
"""Get all contracts for a CUPS."""
async with self.get_session() as session:
stmt = select(ContractModel).where(ContractModel.cups == cups)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_contract(self, cups: str) -> Optional[ContractModel]:
"""Get the most recently started contract for a CUPS."""
async with self.get_session() as session:
stmt = select(ContractModel).where(ContractModel.cups == cups)
stmt = stmt.order_by(desc(ContractModel.date_start))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_consumptions(
self,
cups: str,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
) -> List[ConsumptionModel]:
"""Get consumptions for a CUPS within date range."""
async with self.get_session() as session:
stmt = select(ConsumptionModel).where(ConsumptionModel.cups == cups)
if start_date:
stmt = stmt.where(ConsumptionModel.datetime >= start_date)
if end_date:
stmt = stmt.where(ConsumptionModel.datetime <= end_date)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_consumption(self, cups: str) -> Optional[ConsumptionModel]:
"""Get the most recent consumption record for a CUPS."""
async with self.get_session() as session:
stmt = select(ConsumptionModel).where(ConsumptionModel.cups == cups)
stmt = stmt.order_by(desc(ConsumptionModel.datetime))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_maxpower_readings(
self,
cups: str,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
) -> List[MaxPowerModel]:
"""Get maxpower readings for a CUPS within date range."""
async with self.get_session() as session:
stmt = select(MaxPowerModel).where(MaxPowerModel.cups == cups)
if start_date:
stmt = stmt.where(MaxPowerModel.datetime >= start_date)
if end_date:
stmt = stmt.where(MaxPowerModel.datetime <= end_date)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_maxpower(self, cups: str) -> Optional[MaxPowerModel]:
"""Get the most recent maxpower reading for a CUPS."""
async with self.get_session() as session:
stmt = select(MaxPowerModel).where(MaxPowerModel.cups == cups)
stmt = stmt.order_by(desc(MaxPowerModel.datetime))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def save_pvpc_price(self, price_data: dict) -> PVPCPricesModel:
"""Save or update a PVPC price record."""
async with self.get_session() as session:
# Check if price exists (by datetime and geo_id)
stmt = select(PVPCPricesModel).where(
PVPCPricesModel.datetime == price_data["datetime"]
)
if "geo_id" in price_data:
stmt = stmt.where(PVPCPricesModel.geo_id == price_data["geo_id"])
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
for key, value in price_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
price = PVPCPricesModel(**price_data)
session.add(price)
await session.commit()
await session.refresh(price)
return price
async def get_pvpc_prices(
self,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
geo_id: Optional[int] = None,
) -> List[PVPCPricesModel]:
"""Get PVPC prices within date range."""
async with self.get_session() as session:
stmt = select(PVPCPricesModel)
if start_date:
stmt = stmt.where(PVPCPricesModel.datetime >= start_date)
if end_date:
stmt = stmt.where(PVPCPricesModel.datetime <= end_date)
if geo_id is not None:
stmt = stmt.where(PVPCPricesModel.geo_id == geo_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_pvpc_price(
self, geo_id: Optional[int] = None
) -> Optional[PVPCPricesModel]:
"""Get the most recent PVPC price, optionally filtered by geo_id."""
async with self.get_session() as session:
stmt = select(PVPCPricesModel)
if geo_id is not None:
stmt = stmt.where(PVPCPricesModel.geo_id == geo_id)
stmt = stmt.order_by(desc(PVPCPricesModel.datetime))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def save_billing(self, billing_data: dict) -> BillingModel:
"""Save or update a billing record."""
async with self.get_session() as session:
# Check if billing exists (by cups + datetime + pricing_config_hash)
stmt = select(BillingModel).where(
BillingModel.cups == billing_data["cups"],
BillingModel.datetime == billing_data["datetime"],
BillingModel.pricing_config_hash == billing_data["pricing_config_hash"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
for key, value in billing_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.updated_at = DateTime.now()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing
else:
# Create new record
billing = BillingModel(**billing_data)
session.add(billing)
await session.commit()
await session.refresh(billing)
return billing
async def get_billing(
self,
cups: str,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
pricing_config_hash: Optional[str] = None,
) -> List[BillingModel]:
"""Get billing records for a CUPS within date range."""
async with self.get_session() as session:
stmt = select(BillingModel).where(BillingModel.cups == cups)
if start_date:
stmt = stmt.where(BillingModel.datetime >= start_date)
if end_date:
stmt = stmt.where(BillingModel.datetime <= end_date)
if pricing_config_hash:
stmt = stmt.where(
BillingModel.pricing_config_hash == pricing_config_hash
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_latest_billing(
self, cups: str, pricing_config_hash: Optional[str] = None
) -> Optional[BillingModel]:
"""Get the most recent billing record for a CUPS."""
async with self.get_session() as session:
stmt = select(BillingModel).where(BillingModel.cups == cups)
if pricing_config_hash:
stmt = stmt.where(
BillingModel.pricing_config_hash == pricing_config_hash
)
stmt = stmt.order_by(desc(BillingModel.datetime))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def delete_billing(
self,
cups: str,
pricing_config_hash: str,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
) -> int:
"""Delete billing records for a specific configuration and optional date range."""
async with self.get_session() as session:
stmt = select(BillingModel).where(
BillingModel.cups == cups,
BillingModel.pricing_config_hash == pricing_config_hash,
)
if start_date:
stmt = stmt.where(BillingModel.datetime >= start_date)
if end_date:
stmt = stmt.where(BillingModel.datetime <= end_date)
result = await session.execute(stmt)
billing_records = list(result.scalars().all())
count = len(billing_records)
for record in billing_records:
await session.delete(record)
await session.commit()
return count
@staticmethod
def generate_pricing_config_hash(pricing_rules_dict: dict) -> str:
"""Generate a hash for pricing rules configuration."""
# Create a normalized string representation for hashing
config_str = str(sorted(pricing_rules_dict.items()))
return hashlib.sha256(config_str.encode()).hexdigest()[:16]
async def save_from_pydantic_models(
self,
cups: str,
supplies: List,
contracts: List,
consumptions: List,
maximeter: List,
):
"""Save data from Pydantic models to database."""
# Save supplies
for supply in supplies:
supply_dict = supply.model_dump()
await self.save_supply(supply_dict)
# Save contracts with CUPS
for contract in contracts:
contract_dict = contract.model_dump()
contract_dict["cups"] = cups
await self.save_contract(contract_dict)
# Save consumptions with CUPS
for consumption in consumptions:
consumption_dict = consumption.model_dump()
consumption_dict["cups"] = cups
await self.save_consumption(consumption_dict)
# Save maximeter readings with CUPS
for maxpower in maximeter:
maxpower_dict = maxpower.model_dump()
maxpower_dict["cups"] = cups
await self.save_maxpower(maxpower_dict)
# Global database service instance
_db_service: Optional[DatabaseService] = None
async def get_database_service(storage_dir: Optional[str] = None) -> DatabaseService:
"""Get the global database service instance."""
global _db_service
if _db_service is None:
_db_service = DatabaseService(storage_dir)
# Initialize tables on first access
await _db_service.create_tables()
return _db_service
"""Maximeter service for fetching and updating maximum power data."""
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from edata.connectors.datadis import DatadisConnector
from edata.models.maximeter import MaxPower
from edata.services.database import DatabaseService, get_database_service
_LOGGER = logging.getLogger(__name__)
class MaximeterService:
"""Service for managing maximum power data fetching and storage."""
def __init__(
self,
datadis_connector: DatadisConnector,
storage_dir: Optional[str] = None,
):
"""Initialize maximeter service.
Args:
datadis_connector: Configured Datadis connector instance
storage_dir: Directory for database and cache storage
"""
self._datadis = datadis_connector
self._storage_dir = storage_dir
self._db_service = None
async def _get_db_service(self) -> DatabaseService:
"""Get database service, initializing if needed."""
if self._db_service is None:
self._db_service = await get_database_service(self._storage_dir)
return self._db_service
async def update_maxpower(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: Optional[str] = None,
force_full_update: bool = False,
) -> Dict[str, Any]:
"""Update maximeter (maximum power) data for a CUPS in the specified date range.
Args:
cups: CUPS identifier
distributor_code: Distributor company code
start_date: Start date for maxpower data
end_date: End date for maxpower data
authorized_nif: Authorized NIF if accessing on behalf of someone
force_full_update: If True, fetch all data ignoring existing records
Returns:
Dict with operation results and statistics
"""
_LOGGER.info(
f"Updating maxpower for CUPS {cups[-5:]:>5} from {start_date.date()} to {end_date.date()}"
)
# Determine actual start date based on existing data
actual_start_date = start_date
if not force_full_update:
last_maxpower_date = await self.get_last_maxpower_date(cups)
if last_maxpower_date:
# Start from the day after the last maxpower reading
actual_start_date = max(
start_date, last_maxpower_date + timedelta(hours=1)
)
_LOGGER.info(
f"Found existing maxpower data up to {last_maxpower_date.date()}, fetching from {actual_start_date.date()}"
)
else:
_LOGGER.info(
f"No existing maxpower data found for CUPS {cups[-5:]:>5}, fetching all data"
)
# If actual start date is beyond end date, no new data needed
if actual_start_date >= end_date:
_LOGGER.info(
f"No new maxpower data needed for CUPS {cups[-5:]:>5} (up to date)"
)
return {
"success": True,
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": 0,
"saved": 0,
"updated": 0,
"skipped": "up_to_date",
},
"message": "Maxpower data is up to date",
}
try:
# Fetch maxpower data from datadis (only missing data)
maxpower_readings = await self._datadis.get_max_power(
cups=cups,
distributor_code=distributor_code,
start_date=actual_start_date,
end_date=end_date,
authorized_nif=authorized_nif,
)
# Save to database
saved_count = 0
updated_count = 0
for maxpower in maxpower_readings:
# Convert Pydantic model to dict and add CUPS
maxpower_dict = maxpower.model_dump()
maxpower_dict["cups"] = cups
# Check if maxpower reading already exists
db_service = await self._get_db_service()
existing = await db_service.get_maxpower_readings(
cups=cups, start_date=maxpower.datetime, end_date=maxpower.datetime
)
if existing:
updated_count += 1
else:
saved_count += 1
await db_service.save_maxpower(maxpower_dict)
result = {
"success": True,
"cups": cups,
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": actual_start_date.isoformat(),
},
"stats": {
"fetched": len(maxpower_readings),
"saved": saved_count,
"updated": updated_count,
},
}
if actual_start_date > start_date:
result["message"] = (
f"Fetched only missing maxpower data from {actual_start_date.date()}"
)
_LOGGER.info(
f"Maxpower update completed: {len(maxpower_readings)} fetched, "
f"{saved_count} saved, {updated_count} updated"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating maxpower for CUPS {cups}: {str(e)}")
return {
"success": False,
"cups": cups,
"error": str(e),
"period": {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"actual_start": (
actual_start_date.isoformat()
if "actual_start_date" in locals()
else start_date.isoformat()
),
},
}
async def update_maxpower_range_by_months(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: Optional[str] = None,
force_full_update: bool = False,
) -> Dict[str, Any]:
"""Update maxpower data month by month to respect datadis limits.
Args:
cups: CUPS identifier
distributor_code: Distributor company code
start_date: Start date for maxpower data
end_date: End date for maxpower data
authorized_nif: Authorized NIF if accessing on behalf of someone
force_full_update: If True, fetch all data ignoring existing records
Returns:
Dict with operation results and statistics for all months
"""
_LOGGER.info(
f"Updating maxpower range for CUPS {cups[-5:]:>5} "
f"from {start_date.date()} to {end_date.date()} by months"
)
results = []
current_date = start_date
while current_date < end_date:
# Calculate month end
if current_date.month == 12:
month_end = current_date.replace(
year=current_date.year + 1, month=1, day=1
)
else:
month_end = current_date.replace(month=current_date.month + 1, day=1)
# Don't go past the requested end date
actual_end = min(month_end, end_date)
# Update maxpower for this month
maxpower_result = await self.update_maxpower(
cups=cups,
distributor_code=distributor_code,
start_date=current_date,
end_date=actual_end,
authorized_nif=authorized_nif,
force_full_update=force_full_update,
)
result_entry = {
"month": current_date.strftime("%Y-%m"),
"maxpower": maxpower_result,
}
results.append(result_entry)
current_date = month_end
# Calculate totals
total_maxpower_fetched = sum(
r["maxpower"]["stats"]["fetched"]
for r in results
if r["maxpower"]["success"]
)
total_maxpower_saved = sum(
r["maxpower"]["stats"]["saved"] for r in results if r["maxpower"]["success"]
)
total_maxpower_updated = sum(
r["maxpower"]["stats"]["updated"]
for r in results
if r["maxpower"]["success"]
)
summary = {
"success": all(r["maxpower"]["success"] for r in results),
"cups": cups,
"period": {"start": start_date.isoformat(), "end": end_date.isoformat()},
"months_processed": len(results),
"total_stats": {
"maxpower_fetched": total_maxpower_fetched,
"maxpower_saved": total_maxpower_saved,
"maxpower_updated": total_maxpower_updated,
},
"monthly_results": results,
}
_LOGGER.info(
f"Maxpower range update completed: {len(results)} months processed, "
f"{total_maxpower_fetched} maxpower readings fetched"
)
return summary
async def get_stored_maxpower(
self,
cups: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List:
"""Get stored maxpower readings from database.
Args:
cups: CUPS identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of MaxPower objects from database
"""
db_service = await self._get_db_service()
return await db_service.get_maxpower_readings(cups, start_date, end_date)
async def get_last_maxpower_date(self, cups: str) -> Optional[datetime]:
"""Get the date of the last maxpower record in the database.
Args:
cups: CUPS identifier
Returns:
Datetime of last maxpower reading or None if no data exists
"""
db_service = await self._get_db_service()
latest_maxpower = await db_service.get_latest_maxpower(cups)
if latest_maxpower:
return latest_maxpower.datetime
return None
async def get_peak_power_for_period(
self, cups: str, start_date: datetime, end_date: datetime
) -> Optional[MaxPower]:
"""Get the peak power reading for a specific period.
Args:
cups: CUPS identifier
start_date: Start date for search
end_date: End date for search
Returns:
MaxPower object with highest value_kw in the period, or None if no data
"""
readings = await self.get_stored_maxpower(cups, start_date, end_date)
if not readings:
return None
# Find the reading with maximum power
peak_reading = max(readings, key=lambda r: r.value_kw)
return peak_reading
async def get_daily_peaks(
self, cups: str, start_date: datetime, end_date: datetime
) -> Dict[str, MaxPower]:
"""Get daily peak power readings for a date range.
Args:
cups: CUPS identifier
start_date: Start date
end_date: End date
Returns:
Dict with date strings as keys and MaxPower objects as values
"""
readings = await self.get_stored_maxpower(cups, start_date, end_date)
if not readings:
return {}
# Group by date and find peak for each day
daily_peaks = {}
for reading in readings:
date_key = reading.datetime.date().isoformat()
if (
date_key not in daily_peaks
or reading.value_kw > daily_peaks[date_key].value_kw
):
daily_peaks[date_key] = reading
return daily_peaks
async def get_maximeter_summary(
self,
cups: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Get maximeter summary data compatible with EdataHelper attributes.
Args:
cups: CUPS identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
Dict with summary attributes matching EdataHelper format
"""
maximeter_data = await self.get_stored_maxpower(cups, start_date, end_date)
if not maximeter_data:
return {
"max_power_kW": None,
"max_power_date": None,
"max_power_mean_kW": None,
"max_power_90perc_kW": None,
}
# Calculate summary statistics
power_values = [m.value_kw for m in maximeter_data]
max_power = max(power_values)
mean_power = sum(power_values) / len(power_values)
# Find date for max power
max_power_date = next(
m.datetime for m in maximeter_data if m.value_kw == max_power
)
# Calculate 90th percentile
sorted_values = sorted(power_values)
n = len(sorted_values)
p90_index = int(0.9 * n)
p90_power = sorted_values[p90_index] if p90_index < n else sorted_values[-1]
return {
"max_power_kW": round(max_power, 2),
"max_power_date": max_power_date,
"max_power_mean_kW": round(mean_power, 2),
"max_power_90perc_kW": round(p90_power, 2),
}
"""Supply service for fetching and managing supply data."""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from edata.connectors.datadis import DatadisConnector
from edata.services.database import DatabaseService, SupplyModel, get_database_service
_LOGGER = logging.getLogger(__name__)
class SupplyService:
"""Service for managing supply data fetching and storage."""
def __init__(
self,
datadis_connector: DatadisConnector,
storage_dir: Optional[str] = None,
):
"""Initialize supply service.
Args:
datadis_connector: Configured Datadis connector instance
storage_dir: Directory for database and cache storage
"""
self._datadis = datadis_connector
self._storage_dir = storage_dir
self._db_service = None
async def _get_db_service(self) -> DatabaseService:
"""Get database service, initializing if needed."""
if self._db_service is None:
self._db_service = await get_database_service(self._storage_dir)
return self._db_service
async def update_supplies(
self, authorized_nif: Optional[str] = None
) -> Dict[str, Any]:
"""Update supply data from Datadis.
Args:
authorized_nif: Optional authorized NIF for access
Returns:
Dict with operation results and statistics
"""
_LOGGER.info("Updating supplies from Datadis")
try:
# Fetch supply data from Datadis
supplies_data = await self._datadis.get_supplies(
authorized_nif=authorized_nif
)
if not supplies_data:
_LOGGER.warning("No supply data found")
return {
"success": True,
"stats": {
"fetched": 0,
"saved": 0,
"updated": 0,
"total_stored": 0,
},
}
# Save supplies to database
saved_count = 0
updated_count = 0
db_service = await self._get_db_service()
for supply in supplies_data:
# Convert Pydantic model to dict for database storage
supply_dict = supply.model_dump()
# Check if supply already exists
existing = await db_service.get_supplies(cups=supply.cups)
if existing:
updated_count += 1
_LOGGER.debug(
f"Updating existing supply for CUPS {supply.cups[-5:]}"
)
else:
saved_count += 1
_LOGGER.debug(f"Saving new supply for CUPS {supply.cups[-5:]}")
# Save to database
await db_service.save_supply(supply_dict)
# Get total supplies stored
all_supplies = await db_service.get_supplies()
total_stored = len(all_supplies)
result = {
"success": True,
"stats": {
"fetched": len(supplies_data),
"saved": saved_count,
"updated": updated_count,
"total_stored": total_stored,
},
}
_LOGGER.info(
f"Supply update completed: "
f"{len(supplies_data)} fetched, {saved_count} saved, {updated_count} updated"
)
return result
except Exception as e:
_LOGGER.error(f"Error updating supplies: {str(e)}")
return {
"success": False,
"error": str(e),
"stats": {"fetched": 0, "saved": 0, "updated": 0, "total_stored": 0},
}
async def get_supplies(self, cups: Optional[str] = None) -> List[SupplyModel]:
"""Get stored supply data.
Args:
cups: Optional CUPS identifier filter
Returns:
List of Supply objects
"""
_LOGGER.debug(f"Getting supplies{f' for CUPS {cups[-5:]}' if cups else ''}")
try:
db_service = await self._get_db_service()
supplies = await db_service.get_supplies(cups=cups)
_LOGGER.debug(f"Found {len(supplies)} supplies")
return supplies
except Exception as e:
_LOGGER.error(f"Error getting supplies: {str(e)}")
return []
async def get_supply_by_cups(self, cups: str) -> Optional[SupplyModel]:
"""Get a specific supply by CUPS.
Args:
cups: CUPS identifier
Returns:
Supply object if found, None otherwise
"""
_LOGGER.debug(f"Getting supply for CUPS {cups[-5:]}")
try:
db_service = await self._get_db_service()
supplies = await db_service.get_supplies(cups=cups)
if supplies:
_LOGGER.debug(f"Found supply for CUPS {cups[-5:]}")
return supplies[0] # Should be unique
_LOGGER.warning(f"No supply found for CUPS {cups[-5:]}")
return None
except Exception as e:
_LOGGER.error(f"Error getting supply for CUPS {cups[-5:]}: {str(e)}")
return None
async def get_cups_list(self) -> List[str]:
"""Get list of all stored CUPS.
Returns:
List of CUPS identifiers
"""
_LOGGER.debug("Getting CUPS list")
try:
db_service = await self._get_db_service()
supplies = await db_service.get_supplies()
cups_list = [supply.cups for supply in supplies if supply.cups]
_LOGGER.debug(f"Found {len(cups_list)} CUPS")
return cups_list
except Exception as e:
_LOGGER.error(f"Error getting CUPS list: {str(e)}")
return []
async def get_active_supplies(
self, reference_date: Optional[datetime] = None
) -> List[SupplyModel]:
"""Get supplies that are active at a given date.
Args:
reference_date: Date to check for active supplies (defaults to now)
Returns:
List of active supplies
"""
if reference_date is None:
reference_date = datetime.now()
_LOGGER.debug(f"Getting active supplies for date {reference_date.date()}")
try:
db_service = await self._get_db_service()
all_supplies = await db_service.get_supplies()
active_supplies = []
for supply in all_supplies:
if supply.date_start <= reference_date <= supply.date_end:
active_supplies.append(supply)
_LOGGER.debug(f"Found {len(active_supplies)} active supplies")
return active_supplies
except Exception as e:
_LOGGER.error(f"Error getting active supplies: {str(e)}")
return []
async def get_supply_stats(self) -> Dict[str, Any]:
"""Get statistics about stored supplies.
Returns:
Dict with supply statistics
"""
_LOGGER.debug("Calculating supply statistics")
try:
db_service = await self._get_db_service()
supplies = await db_service.get_supplies()
if not supplies:
return {
"total_supplies": 0,
"total_cups": 0,
"date_range": None,
"distributors": {},
"point_types": {},
}
# Calculate date range
earliest_start = min(s.date_start for s in supplies)
latest_end = max(s.date_end for s in supplies)
# Count by distributor
distributors = {}
# Count by point type
point_types = {}
for supply in supplies:
# Count distributors
dist = supply.distributor or "Unknown"
distributors[dist] = distributors.get(dist, 0) + 1
# Count point types
pt = supply.point_type or "Unknown"
point_types[pt] = point_types.get(pt, 0) + 1
stats = {
"total_supplies": len(supplies),
"total_cups": len(set(s.cups for s in supplies)),
"date_range": {
"earliest_start": earliest_start,
"latest_end": latest_end,
},
"distributors": distributors,
"point_types": point_types,
}
_LOGGER.debug(f"Supply statistics: {len(supplies)} total supplies")
return stats
except Exception as e:
_LOGGER.error(f"Error calculating supply statistics: {str(e)}")
return {}
async def validate_cups(self, cups: str) -> bool:
"""Validate that a CUPS exists in stored supplies.
Args:
cups: CUPS identifier to validate
Returns:
True if CUPS exists, False otherwise
"""
_LOGGER.debug(f"Validating CUPS {cups[-5:]}")
try:
supply = await self.get_supply_by_cups(cups)
is_valid = supply is not None
if is_valid:
_LOGGER.debug(f"CUPS {cups[-5:]} is valid")
else:
_LOGGER.warning(f"CUPS {cups[-5:]} not found")
return is_valid
except Exception as e:
_LOGGER.error(f"Error validating CUPS {cups[-5:]}: {str(e)}")
return False
async def get_distributor_code(self, cups: str) -> Optional[str]:
"""Get distributor code for a CUPS.
Args:
cups: CUPS identifier
Returns:
Distributor code if found, None otherwise
"""
_LOGGER.debug(f"Getting distributor code for CUPS {cups[-5:]}")
try:
supply = await self.get_supply_by_cups(cups)
if supply and supply.distributor_code:
_LOGGER.debug(
f"Found distributor code {supply.distributor_code} for CUPS {cups[-5:]}"
)
return supply.distributor_code
_LOGGER.warning(f"No distributor code found for CUPS {cups[-5:]}")
return None
except Exception as e:
_LOGGER.error(
f"Error getting distributor code for CUPS {cups[-5:]}: {str(e)}"
)
return None
async def get_point_type(self, cups: str) -> Optional[int]:
"""Get point type for a CUPS.
Args:
cups: CUPS identifier
Returns:
Point type if found, None otherwise
"""
_LOGGER.debug(f"Getting point type for CUPS {cups[-5:]}")
try:
supply = await self.get_supply_by_cups(cups)
if supply and supply.point_type is not None:
_LOGGER.debug(
f"Found point type {supply.point_type} for CUPS {cups[-5:]}"
)
return supply.point_type
_LOGGER.warning(f"No point type found for CUPS {cups[-5:]}")
return None
except Exception as e:
_LOGGER.error(f"Error getting point type for CUPS {cups[-5:]}: {str(e)}")
return None
async def get_supply_summary(self, cups: str) -> Dict[str, Any]:
"""Get supply summary attributes for a CUPS.
Args:
cups: CUPS identifier
Returns:
Dict with supply summary attributes
"""
_LOGGER.debug(f"Getting supply summary for CUPS {cups[-5:]}")
try:
supply = await self.get_supply_by_cups(cups)
if not supply:
_LOGGER.warning(f"No supply found for CUPS {cups[-5:]}")
return {"cups": None}
summary = {
"cups": supply.cups,
# Add other supply-related summary attributes here as needed
# These would be used by EdataHelper for calculating summary attributes
}
_LOGGER.debug(f"Supply summary calculated for CUPS {cups[-5:]}")
return summary
except Exception as e:
_LOGGER.error(
f"Error getting supply summary for CUPS {cups[-5:]}: {str(e)}"
)
return {"cups": None}
+21
-0

@@ -11,2 +11,23 @@ .python-version

edata/utils.py
edata/connectors/__init__.py
edata/connectors/datadis.py
edata/connectors/redata.py
edata/models/__init__.py
edata/models/base.py
edata/models/consumption.py
edata/models/contract.py
edata/models/database.py
edata/models/maximeter.py
edata/models/pricing.py
edata/models/supply.py
edata/scripts/__init__.py
edata/scripts/__main__.py
edata/scripts/dump.py
edata/services/__init__.py
edata/services/billing.py
edata/services/consumption.py
edata/services/contract.py
edata/services/database.py
edata/services/maximeter.py
edata/services/supply.py
edata/tests/__init__.py

@@ -13,0 +34,0 @@ edata/tests/test_helpers.py

+4
-2
Metadata-Version: 2.4
Name: e-data
Version: 2.0.0b1
Version: 2.0.0b2
Summary: Python library for managing spanish energy data from various web providers
Author-email: VMG <vmayorg@outlook.es>
License: GPL-3.0-or-later
License-Expression: GPL-3.0-or-later
Project-URL: Homepage, https://github.com/uvejota/python-edata

@@ -34,2 +34,4 @@ Project-URL: Repository, https://github.com/uvejota/python-edata

Requires-Dist: sqlalchemy[asyncio]>=2.0.0
Requires-Dist: aiohttp>=3.8.0
Requires-Dist: diskcache>=5.4.0
Provides-Extra: dev

@@ -36,0 +38,0 @@ Requires-Dist: pytest>=7.1.2; extra == "dev"

@@ -7,3 +7,3 @@ [build-system]

name = "e-data"
version = "2.0.0b1"
version = "2.0.0b2"
authors = [

@@ -14,3 +14,3 @@ {name = "VMG", email = "vmayorg@outlook.es"},

readme = "README.md"
license = {text = "GPL-3.0-or-later"}
license = "GPL-3.0-or-later"
requires-python = ">=3.8"

@@ -41,2 +41,4 @@ classifiers = [

"sqlalchemy[asyncio]>=2.0.0",
"aiohttp>=3.8.0",
"diskcache>=5.4.0",
]

@@ -67,4 +69,7 @@

[tool.setuptools]
packages = ["edata"]
include-package-data = true
[tool.setuptools.packages.find]
exclude = ["tests*", "*.tests*", "*.tests", "edata.tests*"]
[tool.setuptools.package-data]

@@ -71,0 +76,0 @@ edata = ["py.typed"]