e-data
Advanced tools
| """Constants file.""" | ||
| PROG_NAME = "edata" |
| """Collection of utilities.""" | ||
| import logging | ||
| from datetime import datetime | ||
| from pathlib import Path | ||
| import holidays | ||
| from edata.models.supply import Contract | ||
| TARIFF_BY_HOUR = [ | ||
| [10, 11, 12, 13, 18, 19, 20, 21], # p1 | ||
| [8, 9, 14, 15, 16, 17, 22, 23], # p2 | ||
| [0, 1, 2, 3, 4, 5, 6, 7], # p3 | ||
| ] | ||
| TARIFF_BY_WEEKDAY = [[], [], [5, 6]] | ||
| _LOGGER = logging.getLogger(__name__) | ||
| def get_tariff(dt: datetime) -> int: | ||
| """Return the tariff for the selected datetime.""" | ||
| hdays = holidays.country_holidays("ES") | ||
| hour = dt.hour | ||
| weekday = dt.weekday() | ||
| if dt.date() in hdays: | ||
| # holidays are p3 | ||
| return 3 | ||
| for idx, weekdays in enumerate(TARIFF_BY_WEEKDAY): | ||
| if weekday in weekdays: | ||
| return idx + 1 | ||
| for idx, hours in enumerate(TARIFF_BY_HOUR): | ||
| if hour in hours: | ||
| return idx + 1 | ||
| # we shouldn't get here | ||
| _LOGGER.error("Cannot decide the tariff for %s", dt.isoformat()) | ||
| return 0 | ||
| def get_contract_for_dt(contracts: list[Contract], date: datetime): | ||
| """Return the active contract for a provided datetime.""" | ||
| for contract in contracts: | ||
| if contract.date_start <= date <= contract.date_end: | ||
| return contract | ||
| return None | ||
| def get_month(dt: datetime) -> datetime: | ||
| """Return a datetime that represents the month start for a provided datetime.""" | ||
| return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | ||
| def get_day(dt: datetime) -> datetime: | ||
| """Return a datetime that represents the day start for a provided datetime.""" | ||
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | ||
| def redacted_cups(cups: str) -> str: | ||
| """Return an anonymized version of the cups identifier.""" | ||
| return cups[-5:] | ||
| def get_db_path(storage_dir: str) -> str: | ||
| """Return the database path for a given root storage dir.""" | ||
| return str(Path(storage_dir).absolute() / "edata.db") |
| from edata.database.controller import EdataDB |
| import logging | ||
| import os | ||
| import typing | ||
| from datetime import datetime | ||
| from sqlalchemy import Select, insert | ||
| from sqlalchemy.exc import IntegrityError | ||
| from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine | ||
| from sqlmodel import SQLModel | ||
| from sqlmodel.ext.asyncio.session import AsyncSession | ||
| from sqlmodel.sql.expression import SelectOfScalar | ||
| import edata.database.queries as q | ||
| from edata.database.models import ( | ||
| BillModel, | ||
| ContractModel, | ||
| EnergyModel, | ||
| PowerModel, | ||
| PVPCModel, | ||
| StatisticsModel, | ||
| SupplyModel, | ||
| ) | ||
| from edata.models import Contract, Energy, Power, Statistics, Supply | ||
| from edata.models.bill import Bill, EnergyPrice | ||
| _LOGGER = logging.getLogger(__name__) | ||
| T = typing.TypeVar("T", bound=SQLModel) | ||
| class EdataDB: | ||
| _instance = None | ||
| _engine: AsyncEngine | None = None | ||
| _db_url: str | None = None | ||
| def __new__(cls, sqlite_path: str): | ||
| db_url = f"sqlite+aiosqlite:////{os.path.abspath(sqlite_path)}" | ||
| if cls._instance is None: | ||
| cls._instance = super().__new__(cls) | ||
| cls._db_url = db_url | ||
| cls._engine = create_async_engine(db_url, future=True) | ||
| # Ensure parent directory exists | ||
| dir_path = os.path.dirname(os.path.abspath(sqlite_path)) | ||
| os.makedirs(dir_path, exist_ok=True) | ||
| cls._instance._tables_initialized = False | ||
| elif db_url != cls._db_url: | ||
| raise ValueError("EdataDB already initialized with a different db_url") | ||
| return cls._instance | ||
| @property | ||
| def engine(self): | ||
| """Return the async database engine.""" | ||
| return self._engine | ||
| async def _ensure_tables(self): | ||
| """Create tables if not already created (lazy init).""" | ||
| if self._tables_initialized: | ||
| return | ||
| if self.engine: | ||
| async with self.engine.begin() as conn: | ||
| await conn.run_sync(SQLModel.metadata.create_all) | ||
| self._tables_initialized = True | ||
| async def _add_one( | ||
| self, | ||
| session: AsyncSession, | ||
| record: T, | ||
| commit: bool = True, | ||
| ) -> T: | ||
| """Add a single record into the database.""" | ||
| session.add(record) | ||
| if commit: | ||
| await session.commit() | ||
| await session.refresh(record) | ||
| else: | ||
| await session.flush() | ||
| return record | ||
| async def _update_one( | ||
| self, | ||
| session: AsyncSession, | ||
| query: SelectOfScalar, | ||
| data, | ||
| commit: bool = True, | ||
| overrides: dict[str, typing.Any] | None = None, | ||
| ) -> T | None: # type: ignore | ||
| """Updates a single record in the database.""" | ||
| result = await session.exec(query) | ||
| existing = result.first() | ||
| if existing and getattr(existing, "data") == data and not overrides: | ||
| return existing | ||
| setattr(existing, "data", data) | ||
| if overrides: | ||
| for key, value in overrides.items(): | ||
| setattr(existing, key, value) | ||
| if commit: | ||
| await session.commit() | ||
| await session.refresh(existing) | ||
| else: | ||
| await session.flush() | ||
| return existing | ||
| async def _add_or_update_one( | ||
| self, | ||
| session: AsyncSession, | ||
| query: SelectOfScalar, | ||
| record: T, | ||
| commit: bool = True, | ||
| override: list[str] | None = None, | ||
| ) -> T | None: | ||
| """Add a single record into the database and fallback to update safely.""" | ||
| try: | ||
| async with session.begin_nested(): | ||
| session.add(record) | ||
| await session.flush() | ||
| if commit: | ||
| await session.commit() | ||
| await session.refresh(record) | ||
| return record | ||
| except IntegrityError: | ||
| new_data = getattr(record, "data") | ||
| override_dict = None | ||
| record_json = record.model_dump() | ||
| if override: | ||
| override_dict = { | ||
| x: record_json[x] for x in record.model_dump() if x in override | ||
| } | ||
| return await self._update_one( | ||
| session, query, new_data, commit=commit, overrides=override_dict | ||
| ) | ||
| async def _add_or_update_many( | ||
| self, | ||
| session: AsyncSession, | ||
| queries: list[SelectOfScalar], | ||
| records: list[T], | ||
| batch_size: int = 100, | ||
| override: list[str] | None = None, | ||
| ) -> list[T]: | ||
| """Updates many records in the database""" | ||
| if not records: | ||
| return [] | ||
| for i in range(0, len(records), batch_size): | ||
| chunk_records = records[i : i + batch_size] | ||
| chunk_queries = queries[i : i + batch_size] | ||
| try: | ||
| async with session.begin_nested(): | ||
| session.add_all(chunk_records) | ||
| await session.flush() | ||
| except IntegrityError: | ||
| for j, record in enumerate(chunk_records): | ||
| await self._add_or_update_one( | ||
| session, | ||
| chunk_queries[j], | ||
| record, | ||
| commit=False, | ||
| override=override, | ||
| ) | ||
| await session.commit() | ||
| return records | ||
| async def get_supply(self, cups: str) -> SupplyModel | None: | ||
| """Get a supply record by cups.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.get_supply(cups)) | ||
| return result.first() | ||
| async def get_contract( | ||
| self, cups: str, date_start: datetime | None = None | ||
| ) -> ContractModel | None: | ||
| """Get a contract record by cups.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.get_contract(cups, date_start)) | ||
| return result.first() | ||
| async def get_last_energy(self, cups: str) -> EnergyModel | None: | ||
| """Get the most recent Energy record by cups.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.get_last_energy(cups)) | ||
| return result.first() | ||
| async def get_last_pvpc(self) -> PVPCModel | None: | ||
| """Get the most recent pvpc.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.get_last_pvpc()) | ||
| return result.first() | ||
| async def get_last_bill(self, cups: str) -> BillModel | None: | ||
| """Get the most recent bill record by cups.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.get_last_bill(cups)) | ||
| return result.first() | ||
| async def add_contract(self, cups: str, contract: Contract) -> ContractModel | None: | ||
| """Add or update a contract record.""" | ||
| await self._ensure_tables() | ||
| record = ContractModel(cups=cups, date_start=contract.date_start, data=contract) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, q.get_contract(cups, contract.date_start), record | ||
| ) | ||
| async def add_supply(self, supply: Supply) -> SupplyModel | None: | ||
| """Add or update a supply record.""" | ||
| await self._ensure_tables() | ||
| record = SupplyModel(cups=supply.cups, data=supply) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, q.get_supply(supply.cups), record | ||
| ) | ||
| async def add_energy(self, cups: str, energy: Energy) -> EnergyModel | None: | ||
| """Add or update an energy record.""" | ||
| await self._ensure_tables() | ||
| record = EnergyModel( | ||
| cups=cups, delta_h=energy.delta_h, datetime=energy.datetime, data=energy | ||
| ) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, q.get_energy(cups, energy.datetime), record | ||
| ) | ||
| async def add_energy_list(self, cups: str, energy: list[Energy]): | ||
| """Add or update a list of energy records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| unique_map = {item.datetime: item for item in energy} | ||
| unique = list(unique_map.values()) | ||
| queries = [q.get_energy(cups, x.datetime) for x in unique] | ||
| items = [ | ||
| EnergyModel(cups=cups, delta_h=x.delta_h, datetime=x.datetime, data=x) | ||
| for x in unique | ||
| ] | ||
| await self._add_or_update_many(session, queries, items) | ||
| async def add_power(self, cups: str, power: Power) -> PowerModel | None: | ||
| """Add or update a power record for a given CUPS and Power instance.""" | ||
| await self._ensure_tables() | ||
| record = PowerModel(cups=cups, datetime=power.datetime, data=power) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, q.get_power(cups, power.datetime), record | ||
| ) | ||
| async def add_power_list(self, cups: str, power: list[Power]): | ||
| """Add or update a list of power records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| unique_map = {item.datetime: item for item in power} | ||
| unique = list(unique_map.values()) | ||
| queries = [q.get_power(cups, x.datetime) for x in unique] | ||
| items = [PowerModel(cups=cups, datetime=x.datetime, data=x) for x in unique] | ||
| await self._add_or_update_many(session, queries, items) | ||
| async def add_pvpc(self, pvpc: EnergyPrice) -> PVPCModel | None: | ||
| """Add or update a pvpc record.""" | ||
| await self._ensure_tables() | ||
| record = PVPCModel(datetime=pvpc.datetime, data=pvpc) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, q.get_pvpc(pvpc.datetime), record | ||
| ) | ||
| async def add_pvpc_list(self, pvpc: list[EnergyPrice]): | ||
| """Add or update a list of pvpc records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| unique_map = {item.datetime: item for item in pvpc} | ||
| unique = list(unique_map.values()) | ||
| queries = [q.get_pvpc(x.datetime) for x in unique] | ||
| items = [PVPCModel(datetime=x.datetime, data=x) for x in unique] | ||
| await self._add_or_update_many(session, queries, items) | ||
| async def add_statistics( | ||
| self, | ||
| cups: str, | ||
| type_: typing.Literal["day", "month"], | ||
| data: Statistics, | ||
| complete: bool = False, | ||
| ) -> StatisticsModel | None: | ||
| """Add or update a statistics record.""" | ||
| await self._ensure_tables() | ||
| record = StatisticsModel( | ||
| cups=cups, datetime=data.datetime, type=type_, data=data, complete=complete | ||
| ) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, | ||
| q.get_statistics(cups, type_, data.datetime), | ||
| record, | ||
| override=["complete"], | ||
| ) | ||
| async def add_bill( | ||
| self, | ||
| cups: str, | ||
| type_: typing.Literal["hour", "day", "month"], | ||
| data: Bill, | ||
| confhash: str, | ||
| complete: bool, | ||
| ) -> BillModel | None: | ||
| """Add or update a bill record.""" | ||
| await self._ensure_tables() | ||
| record = BillModel( | ||
| cups=cups, | ||
| datetime=data.datetime, | ||
| type=type_, | ||
| complete=complete, | ||
| confhash=confhash, | ||
| data=data, | ||
| ) | ||
| async with AsyncSession(self.engine) as session: | ||
| return await self._add_or_update_one( | ||
| session, | ||
| q.get_bill(cups, type_, data.datetime), | ||
| record, | ||
| override=["complete", "confhash"], | ||
| ) | ||
| async def add_bill_list( | ||
| self, | ||
| cups: str, | ||
| type_: typing.Literal["hour", "day", "month"], | ||
| confhash: str, | ||
| complete: bool, | ||
| bill: list[Bill], | ||
| ): | ||
| """Add or update a list of bill records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| unique_map = {item.datetime: item for item in bill} | ||
| unique = list(unique_map.values()) | ||
| queries = [q.get_bill(cups, type_, x.datetime) for x in unique] | ||
| items = [ | ||
| BillModel( | ||
| cups=cups, | ||
| datetime=x.datetime, | ||
| type=type_, | ||
| confhash=confhash, | ||
| complete=complete, | ||
| data=x, | ||
| ) | ||
| for x in unique | ||
| ] | ||
| await self._add_or_update_many( | ||
| session, queries, items, override=["complete", "confhash"] | ||
| ) | ||
| async def list_supplies(self): | ||
| """List all supply records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.list_supply()) | ||
| return result.all() | ||
| async def list_contracts(self, cups: str | None = None): | ||
| """List all contract records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.list_contract(cups)) | ||
| return result.all() | ||
| async def list_energy( | ||
| self, | ||
| cups: str, | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| ): | ||
| """List energy records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.list_energy(cups, date_from, date_to)) | ||
| return result.all() | ||
| async def list_power( | ||
| self, | ||
| cups: str, | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| ): | ||
| """List power records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.list_power(cups, date_from, date_to)) | ||
| return result.all() | ||
| async def list_pvpc( | ||
| self, | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| ): | ||
| """List pvpc records.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec(q.list_pvpc(date_from, date_to)) | ||
| return result.all() | ||
| async def list_statistics( | ||
| self, | ||
| cups: str, | ||
| type_: typing.Literal["day", "month"], | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| complete: bool | None = None, | ||
| ): | ||
| """List statistics records filtered by type ('day' or 'month') and date range.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec( | ||
| q.list_statistics(cups, type_, date_from, date_to, complete) | ||
| ) | ||
| return result.all() | ||
| async def list_bill( | ||
| self, | ||
| cups: str, | ||
| type_: typing.Literal["hour", "day", "month"], | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| complete: bool | None = None, | ||
| ): | ||
| """List bill records filtered by type ('hour', 'day' or 'month') and date range.""" | ||
| await self._ensure_tables() | ||
| async with AsyncSession(self.engine) as session: | ||
| result = await session.exec( | ||
| q.list_bill(cups, type_, date_from, date_to, complete) | ||
| ) | ||
| return result.all() |
| import typing | ||
| from datetime import datetime as dt | ||
| from sqlmodel import AutoString, Column, Field, SQLModel, UniqueConstraint | ||
| from edata.database.utils import PydanticJSON | ||
| from edata.models import Bill, Contract, Energy, EnergyPrice, Power, Statistics, Supply | ||
| class SupplyModel(SQLModel, table=True): | ||
| __tablename__ = "supply" # type: ignore | ||
| __table_args__ = {"extend_existing": True} | ||
| cups: str = Field(default=None, primary_key=True) | ||
| data: Supply = Field(sa_column=Column(PydanticJSON(Supply))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class ContractModel(SQLModel, table=True): | ||
| __tablename__ = "contract" # type: ignore | ||
| __table_args__ = ( | ||
| UniqueConstraint("cups", "date_start", name="uq_contract_cups_start"), | ||
| {"extend_existing": True}, | ||
| ) | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| cups: str = Field(foreign_key="supply.cups", index=True) | ||
| date_start: dt = Field(index=True) | ||
| data: Contract = Field(sa_column=Column(PydanticJSON(Contract))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class EnergyModel(SQLModel, table=True): | ||
| __tablename__ = "energy" # type: ignore | ||
| __table_args__ = ( | ||
| UniqueConstraint( | ||
| "cups", "delta_h", "datetime", name="uq_energy_cups_delta_datetime" | ||
| ), | ||
| {"extend_existing": True}, | ||
| ) | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| cups: str = Field(foreign_key="supply.cups", index=True) | ||
| delta_h: float | ||
| datetime: dt = Field(index=True) | ||
| data: Energy = Field(sa_column=Column(PydanticJSON(Energy))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class PowerModel(SQLModel, table=True): | ||
| __tablename__ = "power" # type: ignore | ||
| __table_args__ = ( | ||
| UniqueConstraint("cups", "datetime", name="uq_power_cups_datetime"), | ||
| {"extend_existing": True}, | ||
| ) | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| cups: str = Field(foreign_key="supply.cups", index=True) | ||
| datetime: dt = Field(index=True) | ||
| data: Power = Field(sa_column=Column(PydanticJSON(Power))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class StatisticsModel(SQLModel, table=True): | ||
| __tablename__ = "statistics" # type: ignore | ||
| __table_args__ = ( | ||
| UniqueConstraint( | ||
| "cups", | ||
| "datetime", | ||
| "type", | ||
| name="uq_statistics_datetime_type", | ||
| ), | ||
| {"extend_existing": True}, | ||
| ) | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| cups: str = Field(foreign_key="supply.cups", index=True) | ||
| datetime: dt = Field(index=True) | ||
| type: typing.Literal["day", "month"] = Field(index=True, sa_type=AutoString) | ||
| complete: bool = Field(False) | ||
| data: Statistics = Field(sa_column=Column(PydanticJSON(Statistics))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class PVPCModel(SQLModel, table=True): | ||
| __tablename__ = "pvpc" # type: ignore | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| datetime: dt = Field(index=True, unique=True) | ||
| data: EnergyPrice = Field(sa_column=Column(PydanticJSON(EnergyPrice))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) | ||
| class BillModel(SQLModel, table=True): | ||
| __tablename__ = "bill" # type: ignore | ||
| __table_args__ = ( | ||
| UniqueConstraint( | ||
| "cups", | ||
| "datetime", | ||
| "type", | ||
| name="uq_bill_datetime_type", | ||
| ), | ||
| {"extend_existing": True}, | ||
| ) | ||
| id: int | None = Field(default=None, primary_key=True) | ||
| cups: str = Field(foreign_key="supply.cups", index=True) | ||
| datetime: dt = Field(index=True) | ||
| type: typing.Literal["hour", "day", "month"] = Field(index=True, sa_type=AutoString) | ||
| complete: bool = Field(False) | ||
| confhash: str | ||
| data: Bill = Field(sa_column=Column(PydanticJSON(Bill))) | ||
| version: int = Field(default=1) | ||
| created_at: dt = Field(default_factory=dt.now, nullable=False) | ||
| updated_at: dt = Field( | ||
| default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now} | ||
| ) |
| import typing | ||
| from datetime import datetime | ||
| from sqlmodel import asc, desc, select | ||
| from sqlmodel.sql.expression import SelectOfScalar | ||
| from edata.database.models import ( | ||
| BillModel, | ||
| ContractModel, | ||
| EnergyModel, | ||
| PowerModel, | ||
| PVPCModel, | ||
| StatisticsModel, | ||
| SupplyModel, | ||
| ) | ||
| # Queries for "supply" table | ||
| def get_supply(cups: str) -> SelectOfScalar[SupplyModel]: | ||
| """Query that selects a Supply.""" | ||
| return select(SupplyModel).where(SupplyModel.cups == cups) | ||
| def list_supply() -> SelectOfScalar[SupplyModel]: | ||
| """Query that selects all Supply data.""" | ||
| return select(SupplyModel) | ||
| # Queries for "contract" table | ||
| def get_contract( | ||
| cups: str, date_start: datetime | None = None | ||
| ) -> SelectOfScalar[ContractModel]: | ||
| """Query that selects a Contract.""" | ||
| query = select(ContractModel).where((ContractModel.cups == cups)) | ||
| if date_start: | ||
| query = query.where(ContractModel.date_start == date_start) | ||
| else: | ||
| query = query.order_by(desc(ContractModel.date_start)) | ||
| return query | ||
| def list_contract(cups: str | None = None) -> SelectOfScalar[ContractModel]: | ||
| """Query that selects all Contract data.""" | ||
| query = select(ContractModel) | ||
| if cups is not None: | ||
| query = query.where(ContractModel.cups == cups) | ||
| query = query.order_by(asc(ContractModel.date_start)) | ||
| return query | ||
| # Queries for "energy" table | ||
| def get_energy( | ||
| cups: str, datetime_: datetime | None = None | ||
| ) -> SelectOfScalar[EnergyModel]: | ||
| """Query that selects a Energy.""" | ||
| query = select(EnergyModel).where(EnergyModel.cups == cups) | ||
| if datetime_ is not None: | ||
| query = query.where(EnergyModel.datetime == datetime_) | ||
| return query | ||
| def list_energy( | ||
| cups: str, date_from: datetime | None = None, date_to: datetime | None = None | ||
| ) -> SelectOfScalar[EnergyModel]: | ||
| """Query that selects all Energy data.""" | ||
| query = select(EnergyModel).where(EnergyModel.cups == cups) | ||
| if date_from: | ||
| query = query.where(EnergyModel.datetime >= date_from) | ||
| if date_to: | ||
| query = query.where(EnergyModel.datetime <= date_to) | ||
| query = query.order_by(asc(EnergyModel.datetime)) | ||
| return query | ||
| def get_last_energy( | ||
| cups: str, | ||
| ) -> SelectOfScalar[EnergyModel]: | ||
| """Query that selects the most recent Energy record.""" | ||
| query = select(EnergyModel).where(EnergyModel.cups == cups) | ||
| query = query.order_by(desc(EnergyModel.datetime)) | ||
| query = query.limit(1) | ||
| return query | ||
| # Queries for "power" table | ||
| def get_power( | ||
| cups: str, datetime_: datetime | None = None | ||
| ) -> SelectOfScalar[PowerModel]: | ||
| """Query that selects a power.""" | ||
| query = select(PowerModel).where(PowerModel.cups == cups) | ||
| if datetime_ is not None: | ||
| query = query.where(PowerModel.datetime == datetime_) | ||
| return query | ||
| def list_power( | ||
| cups: str, date_from: datetime | None = None, date_to: datetime | None = None | ||
| ) -> SelectOfScalar[PowerModel]: | ||
| """Query that selects all power data.""" | ||
| query = select(PowerModel).where(PowerModel.cups == cups) | ||
| if date_from: | ||
| query = query.where(PowerModel.datetime >= date_from) | ||
| if date_to: | ||
| query = query.where(PowerModel.datetime <= date_to) | ||
| query = query.order_by(asc(PowerModel.datetime)) | ||
| return query | ||
| # Queries for "statistics" table | ||
| def get_statistics( | ||
| cups: str, type_: typing.Literal["day", "month"], datetime_: datetime | None = None | ||
| ) -> SelectOfScalar[StatisticsModel]: | ||
| """Query that selects a statistics.""" | ||
| query = select(StatisticsModel).where(StatisticsModel.cups == cups) | ||
| query = query.where(StatisticsModel.type == type_) | ||
| if datetime_ is not None: | ||
| query = query.where(StatisticsModel.datetime == datetime_) | ||
| return query | ||
| def list_statistics( | ||
| cups: str, | ||
| type_: typing.Literal["day", "month"], | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| complete: bool | None = None, | ||
| ) -> SelectOfScalar[StatisticsModel]: | ||
| """Query that selects all statistics data.""" | ||
| query = select(StatisticsModel).where(StatisticsModel.cups == cups) | ||
| query = query.where(StatisticsModel.type == type_) | ||
| if date_from: | ||
| query = query.where(StatisticsModel.datetime >= date_from) | ||
| if date_to: | ||
| query = query.where(StatisticsModel.datetime <= date_to) | ||
| if complete is not None: | ||
| query = query.where(StatisticsModel.complete == complete) | ||
| query = query.order_by(asc(StatisticsModel.datetime)) | ||
| return query | ||
| def list_bill( | ||
| cups: str, | ||
| type_: typing.Literal["hour", "day", "month"], | ||
| date_from: datetime | None = None, | ||
| date_to: datetime | None = None, | ||
| complete: bool | None = None, | ||
| ) -> SelectOfScalar[BillModel]: | ||
| """Query that selects all bill data.""" | ||
| query = select(BillModel).where(BillModel.cups == cups) | ||
| query = query.where(BillModel.type == type_) | ||
| if date_from: | ||
| query = query.where(BillModel.datetime >= date_from) | ||
| if date_to: | ||
| query = query.where(BillModel.datetime <= date_to) | ||
| if complete is not None: | ||
| query = query.where(BillModel.complete == complete) | ||
| query = query.order_by(asc(BillModel.datetime)) | ||
| return query | ||
| def get_last_bill( | ||
| cups: str, | ||
| ) -> SelectOfScalar[BillModel]: | ||
| """Query that selects the most recent bill record.""" | ||
| query = select(BillModel).where(BillModel.cups == cups) | ||
| query = query.order_by(desc(BillModel.datetime)) | ||
| query = query.limit(1) | ||
| return query | ||
| # Queries for "pvpc" table | ||
| def get_pvpc(datetime_: datetime) -> SelectOfScalar[PVPCModel]: | ||
| """Query that selects a PVPC record.""" | ||
| query = select(PVPCModel).where(PVPCModel.datetime == datetime_) | ||
| return query | ||
| def get_last_pvpc() -> SelectOfScalar[PVPCModel]: | ||
| """Query that selects the most recent pvpc record.""" | ||
| query = select(PVPCModel) | ||
| query = query.order_by(desc(PVPCModel.datetime)) | ||
| query = query.limit(1) | ||
| return query | ||
| def list_pvpc( | ||
| date_from: datetime | None = None, date_to: datetime | None = None | ||
| ) -> SelectOfScalar[PVPCModel]: | ||
| """Query that selects a PVPC record.""" | ||
| query = select(PVPCModel) | ||
| if date_from: | ||
| query = query.where(PVPCModel.datetime >= date_from) | ||
| if date_to: | ||
| query = query.where(PVPCModel.datetime <= date_to) | ||
| return query | ||
| def get_bill(cups, type_, datetime_): | ||
| """Query that selects a bill.""" | ||
| query = select(BillModel).where(BillModel.cups == cups) | ||
| query = query.where(BillModel.type == type_) | ||
| if datetime_ is not None: | ||
| query = query.where(BillModel.datetime == datetime_) | ||
| return query |
| from typing import Any, Type, TypeVar | ||
| from pydantic import BaseModel | ||
| from sqlalchemy.types import JSON, TypeDecorator | ||
| T = TypeVar("T", bound=BaseModel) | ||
| class PydanticJSON(TypeDecorator): | ||
| """ | ||
| Tipo de SQLAlchemy para guardar modelos Pydantic como JSON. | ||
| Automáticamente serializa al guardar y deserializa al leer. | ||
| """ | ||
| impl = JSON | ||
| cache_ok = True | ||
| def __init__(self, pydantic_model: Type[T]): | ||
| super().__init__() | ||
| self.pydantic_model = pydantic_model | ||
| def process_bind_param(self, value: T | None, dialect: Any) -> Any: | ||
| # Python -> Base de Datos | ||
| if value is None: | ||
| return None | ||
| # Aquí ocurre la magia: mode='json' convierte datetime a string ISO | ||
| return value.model_dump(mode="json") | ||
| def process_result_value(self, value: Any, dialect: Any) -> T | None: | ||
| # Base de Datos -> Python | ||
| if value is None: | ||
| return None | ||
| # Reconstruimos el objeto Pydantic desde el diccionario | ||
| return self.pydantic_model.model_validate(value) # type: ignore |
| from edata.models.bill import Bill, EnergyPrice | ||
| from edata.models.data import Energy, Power, Statistics | ||
| from edata.models.supply import Contract, Supply |
| """Models for billing-related data""" | ||
| from datetime import datetime | ||
| from pydantic import BaseModel, Field | ||
| class EnergyPrice(BaseModel): | ||
| """Data structure to represent pricing data.""" | ||
| datetime: datetime | ||
| value_eur_kWh: float | ||
| delta_h: float | ||
| class Bill(BaseModel): | ||
| """Data structure to represent a bill during a period.""" | ||
| datetime: datetime | ||
| delta_h: float | ||
| value_eur: float = Field(default=0.0) | ||
| energy_term: float = Field(default=0.0) | ||
| power_term: float = Field(default=0.0) | ||
| others_term: float = Field(default=0.0) | ||
| surplus_term: float = Field(default=0.0) | ||
| class BillingRules(BaseModel): | ||
| """Data structure to represent a generic billing rule.""" | ||
| p1_kw_year_eur: float | ||
| p2_kw_year_eur: float | ||
| p1_kwh_eur: float | ||
| p2_kwh_eur: float | ||
| p3_kwh_eur: float | ||
| surplus_p1_kwh_eur: float | None = Field(default=None) | ||
| surplus_p2_kwh_eur: float | None = Field(default=None) | ||
| surplus_p3_kwh_eur: float | None = Field(default=None) | ||
| meter_month_eur: float = Field(default=0.81) | ||
| market_kw_year_eur: float = Field(default=3.113) | ||
| electricity_tax: float = Field(default=1.0511300560) | ||
| iva_tax: float = Field(default=1.21) | ||
| energy_formula: str = Field(default="electricity_tax * iva_tax * kwh_eur * kwh") | ||
| power_formula: str = Field( | ||
| default="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24" | ||
| ) | ||
| others_formula: str = Field(default="iva_tax * meter_month_eur / 30 / 24") | ||
| surplus_formula: str | None = Field(default=None) | ||
| cycle_start_day: int = Field(default=1) | ||
| class PVPCBillingRules(BillingRules): | ||
| """Data structure to represent a PVPC billing rule.""" | ||
| p1_kw_year_eur: float = Field(default=30.67266) | ||
| p2_kw_year_eur: float = Field(default=1.4243591) | ||
| p1_kwh_eur: None = Field(default=None) | ||
| p2_kwh_eur: None = Field(default=None) | ||
| p3_kwh_eur: None = Field(default=None) | ||
| surplus_p1_kwh_eur: None = Field(default=None) | ||
| surplus_p2_kwh_eur: None = Field(default=None) | ||
| surplus_p3_kwh_eur: None = Field(default=None) | ||
| meter_month_eur: float = Field(default=0.81) | ||
| market_kw_year_eur: float = Field(default=3.113) | ||
| electricity_tax: float = Field(default=1.0511300560) | ||
| iva_tax: float = Field(default=1.21) | ||
| energy_formula: str = Field(default="electricity_tax * iva_tax * kwh_eur * kwh") | ||
| power_formula: str = Field( | ||
| default="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24" | ||
| ) | ||
| others_formula: str = Field(default="iva_tax * meter_month_eur / 30 / 24") | ||
| surplus_formula: str | None = Field(default=None) | ||
| cycle_start_day: int = Field(default=1) |
| """Models for telemetry data""" | ||
| from datetime import datetime | ||
| from pydantic import BaseModel, Field | ||
| class Energy(BaseModel): | ||
| """Data structure to represent energy consumption and/or surplus measurements.""" | ||
| datetime: datetime | ||
| delta_h: float | ||
| value_kWh: float | ||
| surplus_kWh: float = Field(0) | ||
| real: bool | ||
| class Power(BaseModel): | ||
| """Data structure to represent power measurements.""" | ||
| datetime: datetime | ||
| value_kW: float | ||
| class Statistics(BaseModel): | ||
| """Data structure to represent aggregated energy/surplus data.""" | ||
| datetime: datetime | ||
| delta_h: float = Field(0) | ||
| value_kWh: float = Field(0) | ||
| value_p1_kWh: float = Field(0) | ||
| value_p2_kWh: float = Field(0) | ||
| value_p3_kWh: float = Field(0) | ||
| surplus_kWh: float = Field(0) | ||
| surplus_p1_kWh: float = Field(0) | ||
| surplus_p2_kWh: float = Field(0) | ||
| surplus_p3_kWh: float = Field(0) |
| """Models for contractual data""" | ||
| from datetime import datetime | ||
| from pydantic import BaseModel | ||
| class Supply(BaseModel): | ||
| """Data model of a Supply.""" | ||
| cups: str | ||
| date_start: datetime | ||
| date_end: datetime | ||
| address: str | None | ||
| postal_code: str | None | ||
| province: str | None | ||
| municipality: str | None | ||
| distributor: str | None | ||
| pointType: int | ||
| distributorCode: str | ||
| class Contract(BaseModel): | ||
| """Data model of a Contract.""" | ||
| date_start: datetime | ||
| date_end: datetime | ||
| marketer: str | ||
| distributorCode: str | ||
| power_p1: float | None | ||
| power_p2: float | None |
| from edata.providers.datadis import DatadisConnector | ||
| from edata.providers.redata import REDataConnector |
| """Datadis API connector. | ||
| To fetch data from datadis.es private API. | ||
| There a few issues that are workarounded: | ||
| - You have to wait 24h between two identical requests. | ||
| - Datadis server does not like ranges greater than 1 month. | ||
| """ | ||
| import asyncio | ||
| import contextlib | ||
| import hashlib | ||
| import logging | ||
| import os | ||
| import tempfile | ||
| import typing | ||
| from datetime import datetime, timedelta | ||
| import aiohttp | ||
| import diskcache | ||
| from edata.models import Contract, Energy, Power, Supply | ||
| _LOGGER = logging.getLogger(__name__) | ||
| # Token-related constants | ||
| URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login" | ||
| TOKEN_USERNAME = "username" | ||
| TOKEN_PASSWD = "password" | ||
| # Supplies-related constants | ||
| URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies" | ||
| GET_SUPPLIES_MANDATORY_FIELDS = [ | ||
| "cups", | ||
| "validDateFrom", | ||
| "validDateTo", | ||
| "pointType", | ||
| "distributorCode", | ||
| ] | ||
| # Contracts-related constants | ||
| URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail" | ||
| GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [ | ||
| "startDate", | ||
| "endDate", | ||
| "marketer", | ||
| "contractedPowerkW", | ||
| ] | ||
| # Consumption-related constants | ||
| URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data" | ||
| GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [ | ||
| "time", | ||
| "date", | ||
| "consumptionKWh", | ||
| "obtainMethod", | ||
| ] | ||
| # Maximeter-related constants | ||
| URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power" | ||
| GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"] | ||
| # Timing constants | ||
| TIMEOUT = 3 * 60 # requests timeout | ||
| QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again... | ||
| # Cache-related constants | ||
| RECENT_CACHE_SUBDIR = "cache" | ||
| def migrate_storage(storage_dir): | ||
| """Migrate storage from older versions.""" | ||
| with contextlib.suppress(FileNotFoundError): | ||
| os.remove(os.path.join(storage_dir, "edata_recent_queries.json")) | ||
| os.remove(os.path.join(storage_dir, "edata_recent_queries_cache.json")) | ||
| class DatadisConnector: | ||
| """A Datadis private API connector.""" | ||
| def __init__( | ||
| self, | ||
| username: str, | ||
| password: str, | ||
| enable_smart_fetch: bool = True, | ||
| storage_path: str | None = None, | ||
| ) -> None: | ||
| self._usr = username | ||
| self._pwd = password | ||
| self._token = {} | ||
| self._smart_fetch = enable_smart_fetch | ||
| self._recent_queries = {} | ||
| self._recent_cache = {} | ||
| self._warned_queries = [] | ||
| if storage_path is not None: | ||
| self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR) | ||
| migrate_storage(storage_path) | ||
| else: | ||
| self._recent_cache_dir = os.path.join( | ||
| tempfile.gettempdir(), RECENT_CACHE_SUBDIR | ||
| ) | ||
| os.makedirs(self._recent_cache_dir, exist_ok=True) | ||
| self._cache = diskcache.Cache(self._recent_cache_dir) | ||
| def _get_hash(self, item: str): | ||
| """Return a hash.""" | ||
| return hashlib.md5(item.encode()).hexdigest() | ||
| def _set_cache(self, key: str, data: dict | None = None) -> None: | ||
| """Cache a successful query to avoid exceeding query limits (diskcache).""" | ||
| hash_query = self._get_hash(key) | ||
| try: | ||
| self._cache.set(hash_query, data, expire=QUERY_LIMIT.total_seconds()) | ||
| _LOGGER.debug("Updating cache item '%s'", hash_query) | ||
| except Exception as e: | ||
| _LOGGER.warning("Unknown error while updating cache: %s", e) | ||
| def _is_cached(self, key: str) -> bool: | ||
| """Check if a query has been done recently to avoid exceeding query limits (diskcache).""" | ||
| hash_query = self._get_hash(key) | ||
| return hash_query in self._cache | ||
| def _get_cache(self, key: str): | ||
| """Return cached response for a query (diskcache).""" | ||
| hash_query = self._get_hash(key) | ||
| try: | ||
| return self._cache.get(hash_query, default=None) | ||
| except Exception: | ||
| return None | ||
| async def _async_get_token(self): | ||
| """Private async method that fetches a new token if needed.""" | ||
| _LOGGER.debug("No token found, fetching a new one") | ||
| is_valid_token = False | ||
| timeout = aiohttp.ClientTimeout(total=TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.post( | ||
| URL_TOKEN, | ||
| data={ | ||
| TOKEN_USERNAME: self._usr, | ||
| TOKEN_PASSWD: self._pwd, | ||
| }, | ||
| ) as response: | ||
| text = await response.text() | ||
| if response.status == 200: | ||
| self._token["encoded"] = text | ||
| self._token["headers"] = { | ||
| "Authorization": "Bearer " + self._token["encoded"] | ||
| } | ||
| is_valid_token = True | ||
| else: | ||
| _LOGGER.error( | ||
| "Unknown error while retrieving token, got %s", text | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.error("Exception while retrieving token: %s", e) | ||
| return is_valid_token | ||
| async def async_login(self): | ||
| """Test to login with provided credentials (async).""" | ||
| return await self._async_get_token() | ||
| def login(self): | ||
| """Test to login with provided credentials (sync wrapper).""" | ||
| return asyncio.run(self.async_login()) | ||
| async def _async_get( | ||
| self, | ||
| url: str, | ||
| request_data: dict | None = None, | ||
| refresh_token: bool = False, | ||
| is_retry: bool = False, | ||
| ignore_cache: bool = False, | ||
| ) -> list[dict[str, typing.Any]]: | ||
| """Async get request for Datadis API.""" | ||
| if request_data is None: | ||
| data = {} | ||
| else: | ||
| data = request_data | ||
| is_valid_token = False | ||
| response = [] | ||
| if refresh_token: | ||
| is_valid_token = await self._async_get_token() | ||
| if is_valid_token or not refresh_token: | ||
| params = "?" if len(data) > 0 else "" | ||
| for param in data: | ||
| key = param | ||
| value = data[param] | ||
| params = params + f"{key}={value}&" | ||
| anonym_params = "?" if len(data) > 0 else "" | ||
| for anonym_param in data: | ||
| key = anonym_param | ||
| if key == "cups": | ||
| value = "xxxx" + str(data[anonym_param])[-5:] | ||
| elif key == "authorizedNif": | ||
| value = "xxxx" | ||
| else: | ||
| value = data[anonym_param] | ||
| anonym_params = anonym_params + f"{key}={value}&" | ||
| is_recent_query = await asyncio.to_thread(self._is_cached, url + params) | ||
| if not ignore_cache and is_recent_query: | ||
| _cache = await asyncio.to_thread(self._get_cache, url + params) | ||
| if _cache is not None: | ||
| _LOGGER.info("CACHED %s", url + anonym_params) | ||
| return _cache # type: ignore | ||
| return [] | ||
| try: | ||
| _LOGGER.info("GET %s", url + anonym_params) | ||
| headers = {"Accept-Encoding": "identity"} | ||
| if self._token.get("headers"): | ||
| headers.update(self._token["headers"]) | ||
| timeout = aiohttp.ClientTimeout(total=TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| async with session.get( | ||
| url + params, | ||
| headers=headers, | ||
| ) as reply: | ||
| text = await reply.text() | ||
| if reply.status == 200: | ||
| try: | ||
| json_data = await reply.json(content_type=None) | ||
| if json_data: | ||
| response = json_data | ||
| if not ignore_cache: | ||
| await asyncio.to_thread( | ||
| self._set_cache, | ||
| url + params, | ||
| response, | ||
| ) | ||
| else: | ||
| _LOGGER.info("200 OK but empty response") | ||
| if not ignore_cache: | ||
| await asyncio.to_thread( | ||
| self._set_cache, url + params | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.warning( | ||
| "200 OK but failed to parse the response" | ||
| ) | ||
| elif reply.status == 401 and not refresh_token: | ||
| response = await self._async_get( | ||
| url, | ||
| request_data=data, | ||
| refresh_token=True, | ||
| ignore_cache=ignore_cache, | ||
| ) | ||
| elif reply.status == 429: | ||
| _LOGGER.warning( | ||
| "%s with message '%s'", | ||
| reply.status, | ||
| text, | ||
| ) | ||
| if not ignore_cache: | ||
| await asyncio.to_thread(self._set_cache, url + params) | ||
| elif is_retry: | ||
| if (url + params) not in self._warned_queries: | ||
| _LOGGER.warning( | ||
| "%s with message '%s'. %s. %s", | ||
| reply.status, | ||
| text, | ||
| "Query temporary disabled", | ||
| "Future 500 code errors for this query will be silenced until restart", | ||
| ) | ||
| if not ignore_cache: | ||
| await asyncio.to_thread(self._set_cache, url + params) | ||
| self._warned_queries.append(url + params) | ||
| else: | ||
| response = await self._async_get( | ||
| url, | ||
| request_data, | ||
| is_retry=True, | ||
| ignore_cache=ignore_cache, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| _LOGGER.warning("Timeout at %s", url + anonym_params) | ||
| return [] | ||
| except Exception as e: | ||
| _LOGGER.warning("Exception at %s: %s", url + anonym_params, e) | ||
| return [] | ||
| return response | ||
| async def async_get_supplies( | ||
| self, authorized_nif: str | None = None | ||
| ) -> list[Supply]: | ||
| data = {} | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get( | ||
| URL_GET_SUPPLIES, request_data=data, ignore_cache=True | ||
| ) | ||
| supplies = [] | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS): | ||
| supplies.append( | ||
| Supply( | ||
| cups=i["cups"], | ||
| date_start=datetime.strptime( | ||
| ( | ||
| i["validDateFrom"] | ||
| if i["validDateFrom"] != "" | ||
| else "1970/01/01" | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), | ||
| date_end=datetime.strptime( | ||
| ( | ||
| i["validDateTo"] | ||
| if i["validDateTo"] != "" | ||
| else tomorrow_str | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), | ||
| address=i.get("address", None), | ||
| postal_code=i.get("postalCode", None), | ||
| province=i.get("province", None), | ||
| municipality=i.get("municipality", None), | ||
| distributor=i.get("distributor", None), | ||
| pointType=i["pointType"], | ||
| distributorCode=i["distributorCode"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching supplies data, got %s", | ||
| response, | ||
| ) | ||
| return supplies | ||
| def get_supplies(self, authorized_nif: str | None = None): | ||
| """Datadis 'get_supplies' query (sync wrapper).""" | ||
| return asyncio.run(self.async_get_supplies(authorized_nif=authorized_nif)) | ||
| async def async_get_contract_detail( | ||
| self, cups: str, distributor_code: str, authorized_nif: str | None = None | ||
| ) -> list[Contract]: | ||
| data = {"cups": cups, "distributorCode": distributor_code} | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get( | ||
| URL_GET_CONTRACT_DETAIL, request_data=data, ignore_cache=True | ||
| ) | ||
| contracts = [] | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS): | ||
| contracts.append( | ||
| Contract( | ||
| date_start=datetime.strptime( | ||
| i["startDate"] if i["startDate"] != "" else "1970/01/01", | ||
| "%Y/%m/%d", | ||
| ), | ||
| date_end=datetime.strptime( | ||
| i["endDate"] if i["endDate"] != "" else tomorrow_str, | ||
| "%Y/%m/%d", | ||
| ), | ||
| marketer=i["marketer"], | ||
| distributorCode=distributor_code, | ||
| power_p1=( | ||
| i["contractedPowerkW"][0] | ||
| if isinstance(i["contractedPowerkW"], list) | ||
| else None | ||
| ), | ||
| power_p2=( | ||
| i["contractedPowerkW"][1] | ||
| if (len(i["contractedPowerkW"]) > 1) | ||
| else None | ||
| ), | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching contracts data, got %s", | ||
| response, | ||
| ) | ||
| return contracts | ||
| def get_contract_detail( | ||
| self, cups: str, distributor_code: str, authorized_nif: str | None = None | ||
| ): | ||
| """Datadis get_contract_detail query (sync wrapper).""" | ||
| return asyncio.run( | ||
| self.async_get_contract_detail(cups, distributor_code, authorized_nif) | ||
| ) | ||
| async def async_get_consumption_data( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| authorized_nif: str | None = None, | ||
| ) -> list[Energy]: | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| "measurementType": measurement_type, | ||
| "pointType": point_type, | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get(URL_GET_CONSUMPTION_DATA, request_data=data) | ||
| consumptions = [] | ||
| for i in response: | ||
| if "consumptionKWh" in i: | ||
| if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS): | ||
| hour = str(int(i["time"].split(":")[0]) - 1) | ||
| date_as_dt = datetime.strptime( | ||
| f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M" | ||
| ) | ||
| if not (start_date <= date_as_dt <= end_date): | ||
| continue # skip element if dt is out of range | ||
| _surplus = i.get("surplusEnergyKWh", 0) | ||
| if _surplus is None: | ||
| _surplus = 0 | ||
| consumptions.append( | ||
| Energy( | ||
| datetime=date_as_dt, | ||
| delta_h=1, | ||
| value_kWh=i["consumptionKWh"], | ||
| surplus_kWh=_surplus, | ||
| real=i["obtainMethod"] == "Real", | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching consumption data, got %s", | ||
| response, | ||
| ) | ||
| return consumptions | ||
| def get_consumption_data( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| """Datadis get_consumption_data query (sync wrapper).""" | ||
| return asyncio.run( | ||
| self.async_get_consumption_data( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| measurement_type, | ||
| point_type, | ||
| authorized_nif, | ||
| ) | ||
| ) | ||
| async def async_get_max_power( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: str | None = None, | ||
| ) -> list[Power]: | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get(URL_GET_MAX_POWER, request_data=data) | ||
| maxpower_values = [] | ||
| for i in response: | ||
| if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS): | ||
| maxpower_values.append( | ||
| Power( | ||
| datetime=datetime.strptime( | ||
| f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M" | ||
| ), | ||
| value_kW=i["maxPower"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching maximeter data, got %s", | ||
| response, | ||
| ) | ||
| return maxpower_values | ||
| def get_max_power( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| """Datadis get_max_power query (sync wrapper).""" | ||
| return asyncio.run( | ||
| self.async_get_max_power( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| authorized_nif, | ||
| ) | ||
| ) |
| """A REData API connector""" | ||
| import asyncio | ||
| import datetime as dt | ||
| import logging | ||
| import aiohttp | ||
| from dateutil import parser | ||
| from edata.models import EnergyPrice | ||
| _LOGGER = logging.getLogger(__name__) | ||
| REQUESTS_TIMEOUT = 15 | ||
| URL_REALTIME_PRICES = ( | ||
| "https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real" | ||
| "?time_trunc=hour" | ||
| "&geo_ids={geo_id}" | ||
| "&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}" | ||
| ) | ||
| class REDataConnector: | ||
| """Main class for REData connector""" | ||
| def __init__( | ||
| self, | ||
| ) -> None: | ||
| """Init method for REDataConnector""" | ||
| async def async_get_realtime_prices( | ||
| self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False | ||
| ) -> list[EnergyPrice]: | ||
| """GET query to fetch realtime pvpc prices, historical data is limited to current month (async)""" | ||
| url = URL_REALTIME_PRICES.format( | ||
| geo_id=8744 if is_ceuta_melilla else 8741, | ||
| start=dt_from, | ||
| end=dt_to, | ||
| ) | ||
| data = [] | ||
| _LOGGER.info("GET %s", url) | ||
| timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.get(url) as res: | ||
| text = await res.text() | ||
| if res.status == 200: | ||
| try: | ||
| res_json = await res.json() | ||
| res_list = res_json["included"][0]["attributes"]["values"] | ||
| except (IndexError, KeyError): | ||
| _LOGGER.error( | ||
| "%s returned a malformed response: %s ", | ||
| url, | ||
| text, | ||
| ) | ||
| return data | ||
| for element in res_list: | ||
| data.append( | ||
| EnergyPrice( | ||
| datetime=parser.parse(element["datetime"]).replace( | ||
| tzinfo=None | ||
| ), | ||
| value_eur_kWh=element["value"] / 1000, | ||
| delta_h=1, | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.error( | ||
| "%s returned %s with code %s", | ||
| url, | ||
| text, | ||
| res.status, | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.error("Exception fetching realtime prices: %s", e) | ||
| return data | ||
| def get_realtime_prices( | ||
| self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False | ||
| ) -> list: | ||
| """GET query to fetch realtime pvpc prices, historical data is limited to current month (sync wrapper)""" | ||
| return asyncio.run( | ||
| self.async_get_realtime_prices(dt_from, dt_to, is_ceuta_melilla) | ||
| ) |
| import asyncio | ||
| import calendar | ||
| import logging | ||
| import os | ||
| import typing | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
| from tempfile import gettempdir | ||
| from dateutil import relativedelta | ||
| from jinja2 import Environment | ||
| from edata.core.utils import ( | ||
| get_db_path, | ||
| get_contract_for_dt, | ||
| get_day, | ||
| get_month, | ||
| get_tariff, | ||
| redacted_cups, | ||
| ) | ||
| from edata.database.controller import EdataDB | ||
| from edata.models import Contract, Energy, EnergyPrice | ||
| from edata.models.bill import Bill, BillingRules, PVPCBillingRules | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class BillService: | ||
| "Definition of a bill service for energy supplies." | ||
| def __init__(self, cups: str, storage_path: str) -> None: | ||
| self._cups = cups | ||
| self._scups = redacted_cups(cups) | ||
| self.db = EdataDB(get_db_path(storage_path)) | ||
| async def get_bills( | ||
| self, | ||
| start: datetime | None = None, | ||
| end: datetime | None = None, | ||
| type_: typing.Literal["hour", "day", "month"] = "hour", | ||
| ) -> list[Bill]: | ||
| """Return the list of bills.""" | ||
| res = await self.db.list_bill(self._cups, type_, start, end) | ||
| return [x.data for x in res] | ||
| async def update( | ||
| self, | ||
| start: datetime | None = None, | ||
| end: datetime | None = None, | ||
| billing_rules: BillingRules | PVPCBillingRules | None = None, | ||
| is_pvpc: bool = True, | ||
| ) -> None: | ||
| """Update all missing billing data within optional date ranges.""" | ||
| if not billing_rules: | ||
| # assuming pvpc | ||
| _LOGGER.info("%s non explicit billing rules, assuming PVPC", self._scups) | ||
| billing_rules = PVPCBillingRules() | ||
| is_pvpc = True | ||
| # fetch cups | ||
| supply = await self.db.get_supply(self._cups) | ||
| if not supply: | ||
| _LOGGER.warning( | ||
| "%s the selected cups does not exist, please fetch data first", | ||
| self._scups, | ||
| ) | ||
| return | ||
| _LOGGER.info( | ||
| "%s the selected supply is available from %s to %s", | ||
| self._scups, | ||
| supply.data.date_start, | ||
| supply.data.date_end, | ||
| ) | ||
| if not start: | ||
| _LOGGER.debug( | ||
| "%s automatically setting start date as last hourly bill", self._scups | ||
| ) | ||
| start = await self._get_last_bill_dt() | ||
| if not start: | ||
| _LOGGER.debug( | ||
| "%s there are no bills for this cups, building since the start of the supply", | ||
| self._scups, | ||
| ) | ||
| start = supply.data.date_start | ||
| if not end: | ||
| _LOGGER.debug( | ||
| "%s automatically setting end date as supply date end", self._scups | ||
| ) | ||
| end = supply.data.date_end | ||
| _LOGGER.info( | ||
| "%s data will be updated from %s to %s", | ||
| self._scups, | ||
| start, | ||
| end, | ||
| ) | ||
| # fetch contracts | ||
| contracts = await self._get_contracts() | ||
| # fetch and filter energy items | ||
| energy = await self._get_energy() | ||
| _LOGGER.debug("%s compiling missing hourly bills", self._scups) | ||
| if is_pvpc: | ||
| pvpc = await self._get_pvpc() | ||
| billing_rules = PVPCBillingRules(**billing_rules.model_dump()) | ||
| bills = await asyncio.to_thread( | ||
| self._compile_pvpc, contracts, energy, pvpc, billing_rules | ||
| ) | ||
| confighash = f"pvpc-{hash(billing_rules.model_dump_json())}" | ||
| else: | ||
| bills = await asyncio.to_thread( | ||
| self._compile_j2, contracts, energy, billing_rules | ||
| ) | ||
| confighash = f"custom-{hash(billing_rules.model_dump_json())}" | ||
| _LOGGER.debug("%s pushing hourly bills", self._scups) | ||
| await self.db.add_bill_list( | ||
| cups=self._cups, | ||
| type_="hour", | ||
| confhash=confighash, | ||
| complete=True, | ||
| bill=bills, | ||
| ) | ||
| _LOGGER.debug("%s updating daily and monthly bills", self._scups) | ||
| await self.update_statistics(start, end) | ||
| await self.fix_missing_statistics() | ||
| async def update_statistics(self, start: datetime, end: datetime): | ||
| """Update the statistics during a period.""" | ||
| await self._update_daily_statistics(start, end) | ||
| await self._update_monthly_statistics(start, end) | ||
| async def _update_daily_statistics(self, start: datetime, end: datetime): | ||
| """Update daily statistics within a date range.""" | ||
| day_start = get_day(start) | ||
| day_end = end | ||
| daily = await self.db.list_bill(self._cups, "day", day_start, day_end) | ||
| complete = [] | ||
| for stat in daily: | ||
| if stat.complete: | ||
| complete.append(stat.datetime) | ||
| continue | ||
| data = await self.get_bills(day_start, day_end) | ||
| stats = await asyncio.to_thread( | ||
| self._compile_statistics, data, get_day, skip=complete | ||
| ) | ||
| for stat in stats: | ||
| is_complete = stat.delta_h == 24 | ||
| await self.db.add_bill(self._cups, "day", stat, "mix", is_complete) | ||
| if not is_complete: | ||
| _LOGGER.info( | ||
| "%s daily statistics for %s are incomplete", | ||
| self._scups, | ||
| stat.datetime.date(), | ||
| ) | ||
| async def _update_monthly_statistics(self, start: datetime, end: datetime): | ||
| """Update monthly statistics within a date range.""" | ||
| month_start = get_month(start) | ||
| month_end = end | ||
| monthly = await self.db.list_bill(self._cups, "month", month_start, month_end) | ||
| complete = [] | ||
| for stat in monthly: | ||
| if stat.complete: | ||
| complete.append(stat.datetime) | ||
| continue | ||
| data = await self.get_bills(month_start, month_end) | ||
| stats = await asyncio.to_thread( | ||
| self._compile_statistics, data, get_month, skip=complete | ||
| ) | ||
| for stat in stats: | ||
| target_hours = ( | ||
| calendar.monthrange(stat.datetime.year, stat.datetime.month)[1] * 24 | ||
| ) | ||
| is_complete = stat.delta_h == target_hours | ||
| await self.db.add_bill(self._cups, "month", stat, "mix", is_complete) | ||
| if not is_complete: | ||
| _LOGGER.info( | ||
| "%s monthly statistics for %s are incomplete", | ||
| self._scups, | ||
| stat.datetime.date(), | ||
| ) | ||
| async def _find_missing_stats(self): | ||
| """Return the list of days that are missing billing data.""" | ||
| stats = await self.db.list_bill(self._cups, "day", complete=False) | ||
| return [x.datetime for x in stats] | ||
| def _compile_statistics( | ||
| self, | ||
| data: list[Bill], | ||
| agg: typing.Callable[[datetime], datetime], | ||
| wanted: list[datetime] | None = None, | ||
| skip: list[datetime] | None = None, | ||
| ) -> list[Bill]: | ||
| """Return the aggregated bill data.""" | ||
| if not wanted: | ||
| wanted = [] | ||
| if not skip: | ||
| skip = [] | ||
| agg_data = {} | ||
| for item in data: | ||
| agg_dt = agg(item.datetime) | ||
| is_wanted = agg_dt in wanted or agg_dt not in skip | ||
| if not is_wanted: | ||
| continue | ||
| if agg_dt not in agg_data: | ||
| agg_data[agg_dt] = Bill( | ||
| datetime=agg_dt, | ||
| delta_h=0, | ||
| ) | ||
| ref = agg_data[agg_dt] | ||
| ref.delta_h += item.delta_h | ||
| ref.value_eur += item.value_eur | ||
| ref.energy_term += item.energy_term | ||
| ref.power_term += item.power_term | ||
| ref.others_term += item.others_term | ||
| ref.surplus_term += item.surplus_term | ||
| return [agg_data[x] for x in agg_data] | ||
| async def fix_missing_statistics(self): | ||
| """Recompile statistics to fix missing data.""" | ||
| missing = await self._find_missing_stats() | ||
| for day in missing: | ||
| _LOGGER.debug("%s updating daily statistics for date %s", self._scups, day) | ||
| end = day + relativedelta.relativedelta(day=1) - timedelta(minutes=1) | ||
| await self._update_daily_statistics(day, end) | ||
| missing_months = list(set([get_month(x) for x in missing])) | ||
| for month in missing_months: | ||
| _LOGGER.debug( | ||
| "%s updating monthly statistics for date %s", self._scups, month | ||
| ) | ||
| end = month + relativedelta.relativedelta(months=1) - timedelta(minutes=1) | ||
| await self._update_monthly_statistics(month, end) | ||
| def _compile_pvpc( | ||
| self, | ||
| contracts: list[Contract], | ||
| energy: list[Energy], | ||
| pvpc: list[EnergyPrice], | ||
| rules: PVPCBillingRules, | ||
| ) -> list[Bill]: | ||
| """Compile bills assuming PVPC billing.""" | ||
| energy_dt = [x.datetime for x in energy] | ||
| pvpc_dt = [x.datetime for x in pvpc] | ||
| # reduce computation range to valid periods | ||
| pvpc_valid_dt = [] | ||
| for contract in contracts: | ||
| for dt in pvpc_dt: | ||
| if dt in pvpc_valid_dt: | ||
| continue | ||
| if contract.date_start <= dt <= contract.date_end: | ||
| pvpc_valid_dt.append(dt) | ||
| pvpc_valid_dt = [x for x in pvpc_valid_dt if x in energy_dt] | ||
| e = {x.datetime: x for x in energy if x.datetime in pvpc_valid_dt} | ||
| p = {x.datetime: x for x in pvpc if x.datetime in pvpc_valid_dt} | ||
| b: dict[datetime, Bill] = {} | ||
| for dt in e.keys(): | ||
| c = get_contract_for_dt(contracts, dt) | ||
| if not c: | ||
| continue | ||
| p1_kw = c.power_p1 | ||
| p2_kw = c.power_p2 | ||
| if not p1_kw or not p2_kw: | ||
| continue | ||
| bill = Bill(datetime=dt, delta_h=1) | ||
| bill.energy_term = ( | ||
| rules.electricity_tax | ||
| * rules.iva_tax | ||
| * p[dt].value_eur_kWh | ||
| * e[dt].value_kWh | ||
| ) | ||
| bill.power_term = ( | ||
| rules.electricity_tax | ||
| * rules.iva_tax | ||
| * ( | ||
| p1_kw * (rules.p1_kw_year_eur + rules.market_kw_year_eur) | ||
| + p2_kw * rules.p2_kw_year_eur | ||
| ) | ||
| / 365 | ||
| / 24 | ||
| ) | ||
| bill.others_term = rules.iva_tax * rules.meter_month_eur / 30 / 24 | ||
| bill.value_eur = bill.energy_term + bill.power_term + bill.others_term | ||
| b[dt] = bill | ||
| return [x for x in b.values()] | ||
| def _compile_j2( | ||
| self, | ||
| contracts: list[Contract], | ||
| energy: list[Energy], | ||
| rules: BillingRules, | ||
| ): | ||
| """Compile bills from custom rules.""" | ||
| e = {x.datetime: x for x in energy} | ||
| b: dict[datetime, Bill] = {} | ||
| env = Environment() | ||
| energy_expr = env.compile_expression(f"({rules.energy_formula})|float") | ||
| power_expr = env.compile_expression(f"({rules.power_formula})|float") | ||
| others_expr = env.compile_expression(f"({rules.others_formula})|float") | ||
| for dt in e.keys(): | ||
| c = get_contract_for_dt(contracts, dt) | ||
| if not c: | ||
| continue | ||
| p1_kw = c.power_p1 | ||
| p2_kw = c.power_p2 | ||
| if not p1_kw or not p2_kw: | ||
| continue | ||
| bill = Bill(datetime=dt, delta_h=1) | ||
| params = rules.model_dump() | ||
| params["p1_kw"] = p1_kw | ||
| params["p2_kw"] = p2_kw | ||
| params["kwh"] = e[dt].value_kWh | ||
| tariff = get_tariff(dt) | ||
| if tariff == 1: | ||
| params["kwh_eur"] = rules.p1_kwh_eur | ||
| elif tariff == 2: | ||
| params["kwh_eur"] = rules.p2_kwh_eur | ||
| elif tariff == 3: | ||
| params["kwh_eur"] = rules.p3_kwh_eur | ||
| energy_term = energy_expr(**params) | ||
| power_term = power_expr(**params) | ||
| others_term = others_expr(**params) | ||
| if energy_term: | ||
| bill.energy_term = round(energy_term, 6) | ||
| if power_term: | ||
| bill.power_term = round(power_term, 6) | ||
| if others_term: | ||
| bill.others_term = round(others_term, 6) | ||
| bill.value_eur = bill.energy_term + bill.power_term + bill.others_term | ||
| b[dt] = bill | ||
| return [x for x in b.values()] | ||
| async def _get_contracts(self) -> list[Contract]: | ||
| res = await self.db.list_contracts(self._cups) | ||
| return [x.data for x in res] | ||
| async def _get_energy( | ||
| self, start: datetime | None = None, end: datetime | None = None | ||
| ) -> list[Energy]: | ||
| res = await self.db.list_energy(self._cups, start, end) | ||
| return [x.data for x in res] | ||
| async def _get_pvpc( | ||
| self, start: datetime | None = None, end: datetime | None = None | ||
| ) -> list[EnergyPrice]: | ||
| res = await self.db.list_pvpc(start, end) | ||
| return [x.data for x in res] | ||
| async def _get_last_bill_dt(self) -> datetime | None: | ||
| """Return the timestamp of the latest bill record.""" | ||
| last_record = await self.db.get_last_bill(self._cups) | ||
| if last_record: | ||
| return last_record.datetime |
| """Definition of a service for telemetry data handling.""" | ||
| import asyncio | ||
| import calendar | ||
| import logging | ||
| import os | ||
| import typing | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
| from tempfile import gettempdir | ||
| from dateutil import relativedelta | ||
| from edata.core.utils import get_db_path, get_day, get_month, get_tariff, redacted_cups | ||
| from edata.database.controller import EdataDB | ||
| from edata.models import Contract, Energy, Power, Statistics, Supply | ||
| from edata.models.bill import EnergyPrice | ||
| from edata.providers import DatadisConnector, REDataConnector | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class DataService: | ||
| "Definition of an energy and power data service based on Datadis." | ||
| def __init__( | ||
| self, | ||
| cups: str, | ||
| datadis_user: str, | ||
| datadis_pwd: str, | ||
| storage_path: str, | ||
| datadis_authorized_nif: str | None = None, | ||
| ) -> None: | ||
| self.datadis = DatadisConnector( | ||
| datadis_user, datadis_pwd, storage_path=storage_path | ||
| ) | ||
| self.redata = REDataConnector() | ||
| # params | ||
| self._cups = cups | ||
| self._scups = redacted_cups(cups) | ||
| self._authorized_nif = datadis_authorized_nif | ||
| self._measurement_type = "0" | ||
| if self._authorized_nif and self._authorized_nif == datadis_user: | ||
| _LOGGER.warning( | ||
| "Ignoring authorized NIF parameter since it matches the username" | ||
| ) | ||
| self._authorized_nif = None | ||
| self.db = EdataDB(get_db_path(storage_path)) | ||
| # data (in-memory cache) | ||
| self._supplies: list[Supply] = [] | ||
| self._contracts: list[Contract] = [] | ||
| async def get_supplies(self) -> list[Supply]: | ||
| """Return the list of supplies.""" | ||
| res = await self.db.list_supplies() | ||
| return [x.data for x in res] | ||
| async def get_supply(self) -> Supply | None: | ||
| res = await self.db.get_supply(self._cups) | ||
| if res: | ||
| return res.data | ||
| return None | ||
| async def get_contracts(self) -> list[Contract]: | ||
| """Return a list of contracts for the selected cups.""" | ||
| res = await self.db.list_contracts(self._cups) | ||
| return [x.data for x in res] | ||
| async def get_energy( | ||
| self, start: datetime | None = None, end: datetime | None = None | ||
| ) -> list[Energy]: | ||
| """Return a list of energy records for the selected cups.""" | ||
| res = await self.db.list_energy(self._cups, start, end) | ||
| return [x.data for x in res] | ||
| async def get_power( | ||
| self, start: datetime | None = None, end: datetime | None = None | ||
| ) -> list[Power]: | ||
| """Return a list of power records for the selected cups.""" | ||
| res = await self.db.list_power(self._cups, start, end) | ||
| return [x.data for x in res] | ||
| async def get_pvpc( | ||
| self, start: datetime | None = None, end: datetime | None = None | ||
| ) -> list[EnergyPrice]: | ||
| """Return a list of pvpc records (energy prices) for the selected cups.""" | ||
| res = await self.db.list_pvpc(start, end) | ||
| return [x.data for x in res] | ||
| async def get_statistics( | ||
| self, | ||
| type_: typing.Literal["day", "month"], | ||
| start: datetime | None = None, | ||
| end: datetime | None = None, | ||
| complete: bool | None = None, | ||
| ) -> list[Statistics]: | ||
| """Return a list of statistics records for the selected cups.""" | ||
| data = await self.db.list_statistics(self._cups, type_, start, end, complete) | ||
| return [x.data for x in data] | ||
| async def fix_missing_statistics(self): | ||
| """Recompile statistics to fix missing data.""" | ||
| missing = await self._find_missing_stats() | ||
| for day in missing: | ||
| _LOGGER.info("%s updating daily statistics for date %s", self._scups, day) | ||
| end = day + relativedelta.relativedelta(day=1) - timedelta(hours=1) | ||
| await self._update_daily_statistics(day, end) | ||
| missing_months = await self._find_missing_stats("month") | ||
| for month in missing_months: | ||
| _LOGGER.info( | ||
| "%s updating monthly statistics for date %s", self._scups, month | ||
| ) | ||
| end = month + relativedelta.relativedelta(month=1) - timedelta(hours=1) | ||
| await self._update_monthly_statistics(month, end) | ||
| async def update( | ||
| self, | ||
| start_date: datetime | None = None, | ||
| end_date: datetime | None = None, | ||
| ) -> bool: | ||
| """Update all missing data within optional date ranges.""" | ||
| await self._sync() | ||
| cups = self._cups | ||
| scups = self._scups | ||
| # update supplies | ||
| await self.update_supplies() | ||
| if not self._supplies: | ||
| _LOGGER.error( | ||
| "%s unable to retrieve any supplies, this is likely due to credential errors (check previous logs) or temporary Datadis unavailability", | ||
| scups, | ||
| ) | ||
| return False | ||
| # find requested cups in supplies | ||
| supply = self._find_supply_for_cups(cups) | ||
| if not supply: | ||
| _LOGGER.error( | ||
| ( | ||
| "%s the selected supply is not found in the provided account, got: %s." | ||
| "This may be expected if you have just registered this cups since it takes a while to appear," | ||
| "otherwise check Datadis website and copypaste the CUPS to avoid type errors" | ||
| ), | ||
| scups, | ||
| [redacted_cups(x.cups) for x in self._supplies], | ||
| ) | ||
| return False | ||
| _LOGGER.info( | ||
| "%s the selected supply is available from %s to %s", | ||
| scups, | ||
| supply.date_start, | ||
| supply.date_end, | ||
| ) | ||
| if not start_date: | ||
| start_date = supply.date_start | ||
| _LOGGER.debug( | ||
| "%s automatically setting start date as supply date start", self._scups | ||
| ) | ||
| if not end_date: | ||
| end_date = datetime.today() | ||
| _LOGGER.debug("%s automatically setting end date as today", self._scups) | ||
| _LOGGER.info( | ||
| "%s data will be updated from %s to %s", | ||
| scups, | ||
| start_date, | ||
| end_date, | ||
| ) | ||
| # update contracts to get valid periods | ||
| await self.update_contracts() | ||
| if not self._contracts: | ||
| _LOGGER.warning( | ||
| "%s unable to update contracts, edata will assume that the selected supply has no contractual issues", | ||
| scups, | ||
| ) | ||
| # update energy records | ||
| last_energy_dt = await self._get_last_energy_dt() | ||
| if last_energy_dt: | ||
| _LOGGER.info( | ||
| "%s the latest known energy timestamp is %s", | ||
| self._scups, | ||
| last_energy_dt, | ||
| ) | ||
| # look for missing dates | ||
| missing_days = await self._find_missing_stats() | ||
| missing_days = [ | ||
| x for x in missing_days if x >= start_date and x <= end_date | ||
| ] | ||
| _LOGGER.info( | ||
| "%s the following days are missing energy data %s", | ||
| self._scups, | ||
| missing_days, | ||
| ) | ||
| for day in missing_days: | ||
| _LOGGER.info("%s trying to update energy for %s", scups, day) | ||
| await self.update_energy(day, day + timedelta(days=1)) | ||
| # and recreate statistics | ||
| await self.fix_missing_statistics() | ||
| # fetch upstream records | ||
| await self.update_energy(last_energy_dt + timedelta(hours=1), end_date) | ||
| else: | ||
| # we have no data yet, fetch from start | ||
| await self.update_energy(start_date, end_date) | ||
| # update power records | ||
| await self.update_power(start_date, end_date) | ||
| # fetch pvpc data | ||
| await self.update_pvpc(start_date, end_date) | ||
| # update new statistics | ||
| await self.update_statistics(start_date, end_date) | ||
| return True | ||
| async def login(self): | ||
| """Test login at Datadis.""" | ||
| return await self.datadis.async_login() | ||
| async def update_supplies(self): | ||
| """Update the list of supplies for the configured user.""" | ||
| self._supplies = await self.datadis.async_get_supplies(self._authorized_nif) | ||
| for s in self._supplies: | ||
| await self.db.add_supply(s) | ||
| async def update_contracts(self) -> bool: | ||
| """Update the list of contracts for the selected cups.""" | ||
| cups = self._cups | ||
| supply = self._find_supply_for_cups(cups) | ||
| if supply: | ||
| self._contracts = await self.datadis.async_get_contract_detail( | ||
| cups, supply.distributorCode, self._authorized_nif | ||
| ) | ||
| for c in self._contracts: | ||
| await self.db.add_contract(self._cups, c) | ||
| return True | ||
| _LOGGER.warning("Unable to fetch contract details for %s", self._scups) | ||
| return False | ||
| async def update_energy(self, start: datetime, end: datetime): | ||
| """Update the list of energy consumptions for the selected cups.""" | ||
| cups = self._cups | ||
| supply = self._find_supply_for_cups(cups) | ||
| if supply: | ||
| data = await self.datadis.async_get_consumption_data( | ||
| cups, | ||
| supply.distributorCode, | ||
| start, | ||
| end, | ||
| self._measurement_type, | ||
| supply.pointType, | ||
| self._authorized_nif, | ||
| ) | ||
| await self.db.add_energy_list(self._cups, data) | ||
| return True | ||
| _LOGGER.warning("Unable to fetch energy data for %s", self._scups) | ||
| return False | ||
| async def update_power(self, start: datetime, end: datetime): | ||
| """Update the list of power peaks for the selected cups.""" | ||
| cups = self._cups | ||
| supply = self._find_supply_for_cups(cups) | ||
| if supply: | ||
| data = await self.datadis.async_get_max_power( | ||
| cups, | ||
| supply.distributorCode, | ||
| start, | ||
| end, | ||
| self._authorized_nif, | ||
| ) | ||
| await self.db.add_power_list(self._cups, data) | ||
| return True | ||
| _LOGGER.warning("Unable to fetch power data for %s", self._scups) | ||
| return False | ||
| async def update_pvpc(self, start: datetime, end: datetime): | ||
| """Update recent pvpc prices.""" | ||
| cups = self._cups | ||
| min_date = get_day(datetime.now()) - timedelta(days=28) | ||
| if start < min_date: | ||
| start = min_date | ||
| if end < min_date: | ||
| # end date out of bounds | ||
| return False | ||
| if _pvpc_dt := await self._get_last_pvpc_dt(): | ||
| start = _pvpc_dt + timedelta(hours=1) | ||
| if start >= end: | ||
| _LOGGER.info("%s pvpc prices are already synced", self._scups) | ||
| # data is already synced | ||
| return True | ||
| end = get_day(end) + timedelta(hours=23, minutes=59) | ||
| prices = await self.redata.async_get_realtime_prices(start, end) | ||
| if prices: | ||
| await self.db.add_pvpc_list(prices) | ||
| return True | ||
| _LOGGER.warning("%s unable to fetch pvpc prices", self._scups) | ||
| return False | ||
| async def update_statistics(self, start: datetime, end: datetime): | ||
| """Update the statistics during a period.""" | ||
| await self._update_daily_statistics(start, end) | ||
| await self._update_monthly_statistics(start, end) | ||
| async def _update_daily_statistics(self, start: datetime, end: datetime): | ||
| """Update daily statistics within a date range.""" | ||
| day_start = get_day(start) | ||
| day_end = end | ||
| daily = await self.db.list_statistics(self._cups, "day", day_start, day_end) | ||
| complete = [] | ||
| for stat in daily: | ||
| if stat.complete: | ||
| complete.append(stat.datetime) | ||
| continue | ||
| data = await self.get_energy(day_start, day_end) | ||
| stats = await asyncio.to_thread( | ||
| self._compile_statistics, data, get_day, skip=complete | ||
| ) | ||
| for stat in stats: | ||
| is_complete = stat.delta_h == 24 | ||
| await self.db.add_statistics(self._cups, "day", stat, is_complete) | ||
| if not is_complete: | ||
| _LOGGER.info( | ||
| "%s daily statistics for %s are incomplete", | ||
| self._scups, | ||
| stat.datetime.date(), | ||
| ) | ||
| async def _update_monthly_statistics(self, start: datetime, end: datetime): | ||
| """Update monthly statistics within a date range.""" | ||
| month_start = get_month(start) | ||
| month_end = end | ||
| monthly = await self.db.list_statistics( | ||
| self._cups, "month", month_start, month_end | ||
| ) | ||
| complete = [] | ||
| for stat in monthly: | ||
| if stat.complete: | ||
| complete.append(stat.datetime) | ||
| continue | ||
| data = await self.get_energy(month_start, month_end) | ||
| stats = await asyncio.to_thread( | ||
| self._compile_statistics, data, get_month, skip=complete | ||
| ) | ||
| for stat in stats: | ||
| target_hours = ( | ||
| calendar.monthrange(stat.datetime.year, stat.datetime.month)[1] * 24 | ||
| ) | ||
| is_complete = stat.delta_h == target_hours | ||
| await self.db.add_statistics(self._cups, "month", stat, is_complete) | ||
| if not is_complete: | ||
| _LOGGER.info( | ||
| "%s monthly statistics for %s are incomplete", | ||
| self._scups, | ||
| stat.datetime.date(), | ||
| ) | ||
| def _compile_statistics( | ||
| self, | ||
| data: list[Energy], | ||
| agg: typing.Callable[[datetime], datetime], | ||
| wanted: list[datetime] | None = None, | ||
| skip: list[datetime] | None = None, | ||
| ) -> list[Statistics]: | ||
| """Return the aggregated energy data.""" | ||
| if not wanted: | ||
| wanted = [] | ||
| if not skip: | ||
| skip = [] | ||
| agg_data = {} | ||
| for item in data: | ||
| agg_dt = agg(item.datetime) | ||
| is_wanted = agg_dt in wanted or agg_dt not in skip | ||
| if not is_wanted: | ||
| continue | ||
| tariff = get_tariff(item.datetime) | ||
| if agg_dt not in agg_data: | ||
| agg_data[agg_dt] = Statistics( | ||
| datetime=agg_dt, | ||
| delta_h=0, | ||
| value_kWh=0, | ||
| value_p1_kWh=0, | ||
| value_p2_kWh=0, | ||
| value_p3_kWh=0, | ||
| surplus_kWh=0, | ||
| surplus_p1_kWh=0, | ||
| surplus_p2_kWh=0, | ||
| surplus_p3_kWh=0, | ||
| ) | ||
| ref = agg_data[agg_dt] | ||
| ref.delta_h += item.delta_h | ||
| ref.value_kWh += item.value_kWh | ||
| ref.surplus_kWh += item.surplus_kWh | ||
| if 1 == tariff: | ||
| ref.value_p1_kWh += item.value_kWh | ||
| ref.surplus_p1_kWh += item.surplus_kWh | ||
| elif 2 == tariff: | ||
| ref.value_p2_kWh += item.value_kWh | ||
| ref.surplus_p2_kWh += item.surplus_kWh | ||
| elif 3 == tariff: | ||
| ref.value_p3_kWh += item.value_kWh | ||
| ref.surplus_p3_kWh += item.surplus_kWh | ||
| return [agg_data[x] for x in agg_data] | ||
| async def _find_missing_stats(self, agg: typing.Literal["day", "month"] = "day"): | ||
| """Return the list of days that are missing energy data.""" | ||
| stats = await self.get_statistics(agg, complete=False) | ||
| return [x.datetime for x in stats] | ||
| def _find_supply_for_cups(self, cups: str) -> Supply | None: | ||
| """Return the supply that matches the provided cups.""" | ||
| for supply in self._supplies: | ||
| if supply.cups == cups: | ||
| return supply | ||
| async def _get_last_energy_dt(self) -> datetime | None: | ||
| """Return the timestamp of the latest energy record.""" | ||
| last_record = await self.db.get_last_energy(self._cups) | ||
| if last_record: | ||
| return last_record.datetime | ||
| async def _get_last_pvpc_dt(self) -> datetime | None: | ||
| """Return the timestamp of the latest pvpc record.""" | ||
| last_record = await self.db.get_last_pvpc() | ||
| if last_record: | ||
| return last_record.datetime | ||
| async def _sync(self): | ||
| """Load state.""" | ||
| self._supplies = await self.get_supplies() | ||
| self._contracts = await self.get_contracts() |
| import json | ||
| import os | ||
| from tempfile import gettempdir | ||
| from unittest.mock import AsyncMock, patch | ||
| import pytest | ||
| import pytest_asyncio | ||
| from syrupy.assertion import SnapshotAssertion | ||
| from edata.models.bill import BillingRules | ||
| from edata.models.data import Energy, Power | ||
| from edata.models.supply import Contract, Supply | ||
| from edata.services.bill_service import BillService | ||
| from edata.services.data_service import DataService | ||
| ASSETS = os.path.join(os.path.dirname(__file__), "assets") | ||
| def load_json(filename): | ||
| with open(os.path.join(ASSETS, filename), encoding="utf-8") as f: | ||
| return json.load(f) | ||
| def load_models(filename, model): | ||
| return [model(**d) for d in load_json(filename)] | ||
| @pytest.fixture(scope="module") | ||
| def storage_dir(): | ||
| return gettempdir() | ||
| @pytest.fixture(scope="module") | ||
| def supplies(): | ||
| return load_models("supplies.json", Supply) | ||
| @pytest.fixture(scope="module") | ||
| def contracts(): | ||
| return load_models("contracts.json", Contract) | ||
| @pytest.fixture(scope="module") | ||
| def energy(): | ||
| return load_models("energy.json", Energy) | ||
| @pytest.fixture(scope="module") | ||
| def power(): | ||
| return load_models("power.json", Power) | ||
| @pytest.fixture(scope="module") | ||
| def mock_connector(supplies, contracts, energy, power): | ||
| mock = AsyncMock() | ||
| mock.async_login.return_value = True | ||
| mock.async_get_supplies.return_value = supplies | ||
| mock.async_get_contract_detail.return_value = contracts | ||
| mock.async_get_consumption_data.return_value = sorted( | ||
| energy, key=lambda x: x.datetime | ||
| ) | ||
| mock.async_get_max_power.return_value = power | ||
| return mock | ||
| @pytest_asyncio.fixture(scope="module", loop_scope="module") | ||
| async def populated_data_service(mock_connector, energy, power, storage_dir): | ||
| with patch("edata.services.data_service.DatadisConnector") as mock_connector_class: | ||
| mock_connector_class.side_effect = lambda *a, **k: mock_connector | ||
| ds = DataService( | ||
| "ESXXXXXXXXXXXXXXXXTEST", "user", "pwd", storage_path=storage_dir | ||
| ) | ||
| await ds.login() | ||
| await ds.update_supplies() | ||
| await ds.update_contracts() | ||
| all_energy = sorted(energy, key=lambda x: x.datetime) | ||
| start_date = all_energy[0].datetime | ||
| end_date = all_energy[-1].datetime | ||
| await ds.update_energy(start_date, end_date) | ||
| await ds.update_power(start_date, end_date) | ||
| await ds._update_daily_statistics(start_date, end_date) | ||
| await ds._update_monthly_statistics(start_date, end_date) | ||
| return ds | ||
| @pytest.mark.asyncio | ||
| async def test_data_fetch( | ||
| populated_data_service, snapshot: SnapshotAssertion, energy, power | ||
| ): | ||
| ds = populated_data_service | ||
| assert len(await ds.get_energy()) == len(energy) | ||
| assert len(await ds.get_power()) == len(power) | ||
| result_all = { | ||
| "supplies": ds._supplies, | ||
| "contracts": ds._contracts, | ||
| "energy": await ds.get_energy(), | ||
| "power": await ds.get_power(), | ||
| } | ||
| assert result_all == snapshot | ||
| @pytest.mark.asyncio | ||
| async def test_get_daily_energy(populated_data_service, snapshot: SnapshotAssertion): | ||
| ds = populated_data_service | ||
| daily = await ds.get_statistics("day") | ||
| assert daily == snapshot | ||
| @pytest.mark.asyncio | ||
| async def test_get_monthly_energy(populated_data_service, snapshot: SnapshotAssertion): | ||
| ds = populated_data_service | ||
| monthly = await ds.get_statistics("month") | ||
| assert monthly == snapshot | ||
| @pytest.mark.asyncio | ||
| async def test_bill_service( | ||
| populated_data_service, snapshot: SnapshotAssertion, energy, storage_dir | ||
| ): | ||
| # Ensure data service is populated | ||
| ds = populated_data_service | ||
| # Initialize BillService with same configuration | ||
| bs = BillService("ESXXXXXXXXXXXXXXXXTEST", storage_path=storage_dir) | ||
| all_energy = sorted(energy, key=lambda x: x.datetime) | ||
| start_date = all_energy[0].datetime | ||
| end_date = all_energy[-1].datetime | ||
| rules = BillingRules( | ||
| p1_kwh_eur=0.20, | ||
| p2_kwh_eur=0.20, | ||
| p3_kwh_eur=0.20, | ||
| p1_kw_year_eur=20, | ||
| p2_kw_year_eur=10, | ||
| ) | ||
| # Run update | ||
| await bs.update(start_date, end_date, rules, is_pvpc=False) | ||
| # Verify bills were created | ||
| bills = await bs.get_bills(start_date, end_date) | ||
| assert len(bills) > 0 | ||
| # Verify statistics were updated | ||
| daily_stats = await bs.get_bills(start_date, end_date, "day") | ||
| assert len(daily_stats) > 0 | ||
| monthly_stats = await bs.get_bills(start_date, end_date, "month") | ||
| assert len(monthly_stats) > 0 | ||
| assert monthly_stats == snapshot |
+120
-76
| Metadata-Version: 2.4 | ||
| Name: e-data | ||
| Version: 1.3.3 | ||
| Version: 2.0.0.dev112 | ||
| Summary: Python library for managing spanish energy data from various web providers | ||
@@ -684,19 +684,21 @@ Author-email: VMG <vmayorg@outlook.es> | ||
| Classifier: Programming Language :: Python | ||
| Classifier: Programming Language :: Python :: 3.11 | ||
| Classifier: Programming Language :: Python :: 3.12 | ||
| Classifier: Programming Language :: Python :: Implementation :: CPython | ||
| Classifier: Programming Language :: Python :: Implementation :: PyPy | ||
| Requires-Python: >=3.11.0 | ||
| Requires-Python: >=3.12.0 | ||
| Description-Content-Type: text/markdown | ||
| License-File: LICENSE | ||
| Requires-Dist: dateparser>=1.1.2 | ||
| Requires-Dist: freezegun>=1.2.1 | ||
| Requires-Dist: holidays>=0.14.2 | ||
| Requires-Dist: pytest>=7.1.2 | ||
| Requires-Dist: python_dateutil>=2.8.2 | ||
| Requires-Dist: requests>=2.28.1 | ||
| Requires-Dist: voluptuous>=0.13.1 | ||
| Requires-Dist: Jinja2>=3.1.2 | ||
| Requires-Dist: diskcache>=5.6.3 | ||
| Requires-Dist: aiohttp>=3.12.15 | ||
| Requires-Dist: aiohttp<4,>=3.12 | ||
| Requires-Dist: dateparser<2,>=1.2 | ||
| Requires-Dist: diskcache<6,>=5.6 | ||
| Requires-Dist: freezegun<2,>=1.5 | ||
| Requires-Dist: holidays<1,>=0.50 | ||
| Requires-Dist: Jinja2<4,>=3.1 | ||
| Requires-Dist: pydantic<3,>=2.10 | ||
| Requires-Dist: python_dateutil<3,>=2.8 | ||
| Requires-Dist: Requests<3,>=2.31 | ||
| Requires-Dist: SQLAlchemy<3,>=2.0 | ||
| Requires-Dist: sqlmodel<0.1,>=0.0.22 | ||
| Requires-Dist: typer<1,>=0.12 | ||
| Requires-Dist: aiosqlite<1,>=0.21 | ||
| Dynamic: license-file | ||
@@ -710,3 +712,3 @@ | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
@@ -731,81 +733,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_ | ||
| El paquete consta de tres módulos diferenciados: | ||
| El paquete consta de varios módulos: | ||
| * **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData. | ||
| * **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py` | ||
| * **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis). | ||
| * **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos. | ||
| - `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros). | ||
| - `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real). | ||
| Estos módulos corresponden a la siguiente estructura del paquete: | ||
| * **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos. | ||
| - `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas). | ||
| - `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes. | ||
| ``` | ||
| edata/ | ||
| · __init__.py | ||
| · connectors/ | ||
| · __init__.py | ||
| · datadis.py | ||
| · redata.py | ||
| · processors/ | ||
| · __init__.py | ||
| · base.py | ||
| · billing.py | ||
| · consumption.py | ||
| · maximeter.py | ||
| · utils.py | ||
| · helpers.py | ||
| ``` | ||
| * **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic. | ||
| - `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`). | ||
| - `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`). | ||
| - `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`). | ||
| * **Base de datos** (`database`): Gestión de persistencia local con SQLite. | ||
| - `controller.py`: Controlador principal de la base de datos (`EdataDB`). | ||
| - `models.py`: Modelos SQLModel para las tablas de la base de datos. | ||
| - `queries.py`: Consultas SQL predefinidas. | ||
| * **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc. | ||
| * **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas. | ||
| ## Ejemplo de uso | ||
| Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones: | ||
| ### Usando los servicios (recomendado) | ||
| Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones: | ||
| * No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto) | ||
| * El username suele ser el NIF del titular | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura. | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular. | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular | ||
| ``` python | ||
| import asyncio | ||
| from datetime import datetime | ||
| import json | ||
| from edata.services.data_service import DataService | ||
| from edata.services.bill_service import BillService | ||
| from edata.models.bill import PVPCBillingRules | ||
| # importamos definiciones de datos que nos interesen | ||
| from edata.definitions import PricingRules | ||
| # importamos el ayudante | ||
| from edata.helpers import EdataHelper | ||
| # importamos el procesador de utilidades | ||
| from edata.processors import utils | ||
| async def main(): | ||
| # Crear el servicio de datos | ||
| data_service = DataService( | ||
| cups="ES0000000000000000XX", # Tu CUPS | ||
| datadis_user="12345678A", # Tu NIF/usuario | ||
| datadis_pwd="tu_password", | ||
| storage_path="./my_data" # Directorio para la BD | ||
| datadis_authorized_nif=None, # NIF autorizado (opcional) | ||
| ) | ||
| # Actualizar todos los datos disponibles | ||
| await data_service.update() | ||
| # Obtener suministros y contratos | ||
| supplies = await data_service.get_supplies() | ||
| contracts = await data_service.get_contracts() | ||
| # Obtener datos de consumo y potencia | ||
| energy = await data_service.get_energy() | ||
| power = await data_service.get_power() | ||
| # Obtener estadísticas agregadas (diarias o mensuales) | ||
| daily_stats = await data_service.get_statistics("day") | ||
| monthly_stats = await data_service.get_statistics("month") | ||
| # Crear el servicio de facturación | ||
| bill_service = BillService( | ||
| cups="ES0000000000000000XX", | ||
| storage_path="./my_data" # Mismo directorio que data_service | ||
| ) | ||
| # Definir reglas de facturación PVPC (valores por defecto españoles) | ||
| pvpc_rules = PVPCBillingRules( | ||
| p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año) | ||
| p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año) | ||
| meter_month_eur=0.81, # Alquiler contador (€/mes) | ||
| market_kw_year_eur=3.113, # Otros cargos (€/kW/año) | ||
| electricity_tax=1.0511300560, # Impuesto eléctrico | ||
| iva_tax=1.21 # IVA (21%) | ||
| ) | ||
| # Actualizar facturación con PVPC | ||
| start_date = datetime(2024, 1, 1) | ||
| end_date = datetime(2024, 12, 31) | ||
| await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True) | ||
| # Obtener facturas calculadas | ||
| bills = await bill_service.get_bills(start_date, end_date, type_="hour") | ||
| daily_bills = await bill_service.get_bills(start_date, end_date, type_="day") | ||
| monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month") | ||
| # Mostrar resumen | ||
| total_cost = sum(b.value_eur for b in bills) | ||
| print(f"Coste total: {total_cost:.2f} €") | ||
| print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh") | ||
| # Preparar reglas de tarificación (si se quiere) | ||
| PRICING_RULES_PVPC = PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.05, | ||
| # podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc) | ||
| p1_kwh_eur=None, | ||
| p2_kwh_eur=None, | ||
| p3_kwh_eur=None, | ||
| ) | ||
| # Ejecutar | ||
| asyncio.run(main()) | ||
| ``` | ||
| # Instanciar el helper | ||
| # 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS. | ||
| # 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos. | ||
| edata = EdataHelper( | ||
| "datadis_user", | ||
| "datadis_password", | ||
| "cups", | ||
| datadis_authorized_nif=None, | ||
| pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación | ||
| data=None, # aquí podríamos cargar datos anteriores | ||
| ) | ||
| ### Usando la CLI | ||
| # Solicitar actualización de todo el histórico (se almacena en edata.data) | ||
| edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today()) | ||
| El paquete incluye una interfaz de línea de comandos para operaciones básicas: | ||
| # volcamos todo lo obtenido a un fichero | ||
| with open("backup.json", "w") as file: | ||
| json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup | ||
| ``` bash | ||
| # Ver suministros disponibles | ||
| python -m edata.cli show-supplies <username> | ||
| # Imprimir atributos | ||
| print(edata.attributes) | ||
| # Descargar todos los datos de un CUPS | ||
| python -m edata.cli download-all --cups ES0000000000000000XX <username> | ||
| # Actualizar facturación con tarifa fija | ||
| python -m edata.cli update-bill \ | ||
| --cups ES0000000000000000XX \ | ||
| --p1-kw-year-eur 30.67 \ | ||
| --p2-kw-year-eur 1.42 \ | ||
| --p1-kwh-eur 0.15 \ | ||
| --p2-kwh-eur 0.10 \ | ||
| --p3-kwh-eur 0.08 \ | ||
| --meter-month-eur 0.81 | ||
| ``` |
@@ -1,10 +0,13 @@ | ||
| dateparser>=1.1.2 | ||
| freezegun>=1.2.1 | ||
| holidays>=0.14.2 | ||
| pytest>=7.1.2 | ||
| python_dateutil>=2.8.2 | ||
| requests>=2.28.1 | ||
| voluptuous>=0.13.1 | ||
| Jinja2>=3.1.2 | ||
| diskcache>=5.6.3 | ||
| aiohttp>=3.12.15 | ||
| aiohttp<4,>=3.12 | ||
| dateparser<2,>=1.2 | ||
| diskcache<6,>=5.6 | ||
| freezegun<2,>=1.5 | ||
| holidays<1,>=0.50 | ||
| Jinja2<4,>=3.1 | ||
| pydantic<3,>=2.10 | ||
| python_dateutil<3,>=2.8 | ||
| Requests<3,>=2.31 | ||
| SQLAlchemy<3,>=2.0 | ||
| sqlmodel<0.1,>=0.0.22 | ||
| typer<1,>=0.12 | ||
| aiosqlite<1,>=0.21 |
@@ -12,19 +12,22 @@ LICENSE | ||
| edata/cli.py | ||
| edata/const.py | ||
| edata/definitions.py | ||
| edata/helpers.py | ||
| edata/storage.py | ||
| edata/connectors/__init__.py | ||
| edata/connectors/datadis.py | ||
| edata/connectors/redata.py | ||
| edata/processors/__init__.py | ||
| edata/processors/base.py | ||
| edata/processors/billing.py | ||
| edata/processors/consumption.py | ||
| edata/processors/maximeter.py | ||
| edata/processors/utils.py | ||
| edata/core/__init__.py | ||
| edata/core/const.py | ||
| edata/core/utils.py | ||
| edata/database/__init__.py | ||
| edata/database/controller.py | ||
| edata/database/models.py | ||
| edata/database/queries.py | ||
| edata/database/utils.py | ||
| edata/models/__init__.py | ||
| edata/models/bill.py | ||
| edata/models/data.py | ||
| edata/models/supply.py | ||
| edata/providers/__init__.py | ||
| edata/providers/datadis.py | ||
| edata/providers/redata.py | ||
| edata/services/bill_service.py | ||
| edata/services/data_service.py | ||
| edata/tests/__init__.py | ||
| edata/tests/test_datadis_connector.py | ||
| edata/tests/test_helpers.py | ||
| edata/tests/test_processors.py | ||
| edata/tests/test_redata_connector.py | ||
| edata/tests/test_redata_connector.py | ||
| edata/tests/test_services.py |
+134
-13
@@ -1,24 +0,145 @@ | ||
| import typer | ||
| import asyncio | ||
| import logging | ||
| from getpass import getpass | ||
| from edata.connectors.datadis import DatadisConnector | ||
| import json | ||
| from typing import Annotated | ||
| def main(): | ||
| """CLI básica para mostrar supplies y contracts de Datadis.""" | ||
| username = typer.prompt("Usuario (NIF)") | ||
| password = getpass("Contraseña: ") | ||
| import typer | ||
| from edata.models.bill import BillingRules | ||
| from edata.providers.datadis import DatadisConnector | ||
| from edata.services.bill_service import BillService | ||
| from edata.services.data_service import DataService | ||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
| app = typer.Typer() | ||
| @app.command() | ||
| def show_supplies(username: str): | ||
| """Show supplies and contracts for a given datadis user.""" | ||
| password = getpass() | ||
| connector = DatadisConnector(username, password) | ||
| supplies = connector.get_supplies() | ||
| typer.echo("\nSupplies:") | ||
| typer.echo(json.dumps(supplies, indent=2, default=str)) | ||
| for supply in supplies: | ||
| typer.echo(supply.model_dump_json()) | ||
| if supplies: | ||
| cups = supplies[0]["cups"] | ||
| distributor = supplies[0]["distributorCode"] | ||
| cups = supplies[0].cups | ||
| distributor = supplies[0].distributorCode | ||
| contracts = connector.get_contract_detail(cups, distributor) | ||
| typer.echo("\nContracts:") | ||
| typer.echo(json.dumps(contracts, indent=2, default=str)) | ||
| for contract in contracts: | ||
| typer.echo(contract.model_dump_json()) | ||
| else: | ||
| typer.echo("No se encontraron supplies.") | ||
| typer.echo("We found no supplies.") | ||
| async def _download_all( | ||
| nif: str, | ||
| cups: str, | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| password = getpass() | ||
| service = DataService( | ||
| cups, | ||
| nif, | ||
| password, | ||
| datadis_authorized_nif=authorized_nif, | ||
| storage_path="./edata_cli", | ||
| ) | ||
| await service.update_supplies() | ||
| supply = await service.get_supply() | ||
| if supply: | ||
| await service.update() | ||
| @app.command() | ||
| def download_all( | ||
| nif: str, | ||
| cups: Annotated[str, typer.Option(help="The identifier of the Supply")], | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| asyncio.run(_download_all(nif, cups, authorized_nif)) | ||
| async def _update_custom_bill( | ||
| cups: str, | ||
| p1_kw_year_eur: float, | ||
| p2_kw_year_eur: float, | ||
| p1_kwh_eur: float, | ||
| p2_kwh_eur: float, | ||
| p3_kwh_eur: float, | ||
| meter_month_eur: float, | ||
| ): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| bs = BillService(cups, storage_path="./edata_cli") | ||
| rules = BillingRules( | ||
| p1_kw_year_eur=p1_kw_year_eur, | ||
| p2_kw_year_eur=p2_kw_year_eur, | ||
| p1_kwh_eur=p1_kwh_eur, | ||
| p2_kwh_eur=p2_kwh_eur, | ||
| p3_kwh_eur=p3_kwh_eur, | ||
| meter_month_eur=meter_month_eur, | ||
| ) | ||
| await bs.update(billing_rules=rules, is_pvpc=False) | ||
| @app.command() | ||
| def update_custom_bill( | ||
| cups: Annotated[str, typer.Option(help="The identifier of the Supply")], | ||
| p1_kw_year_eur: Annotated[ | ||
| float, typer.Option(help="Price per kW at P1 tariff (by year)") | ||
| ], | ||
| p2_kw_year_eur: Annotated[ | ||
| float, typer.Option(help="Price per kW at P2 tariff (by year)") | ||
| ], | ||
| p1_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P1 tariff")], | ||
| p2_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P2 tariff")], | ||
| p3_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P3 tariff")], | ||
| meter_month_eur: Annotated[float, typer.Option(help="Monthly cost of the meter")], | ||
| ): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| asyncio.run( | ||
| _update_custom_bill( | ||
| cups, | ||
| p1_kw_year_eur, | ||
| p2_kw_year_eur, | ||
| p1_kwh_eur, | ||
| p2_kwh_eur, | ||
| p3_kwh_eur, | ||
| meter_month_eur, | ||
| ) | ||
| ) | ||
| async def _update_pvpc_bill(cups: str): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| bs = BillService(cups, storage_path="./edata_cli") | ||
| await bs.update(is_pvpc=True) | ||
| @app.command() | ||
| def update_pvpc_bill( | ||
| cups: Annotated[str, typer.Option(help="The identifier of the Supply")], | ||
| ): | ||
| """Download all data for a given datadis account and CUPS.""" | ||
| asyncio.run( | ||
| _update_pvpc_bill( | ||
| cups, | ||
| ) | ||
| ) | ||
| if __name__ == "__main__": | ||
| typer.run(main) | ||
| app() |
| """Tests for DatadisConnector (offline).""" | ||
| import datetime | ||
| import datetime | ||
| from unittest.mock import patch, AsyncMock, MagicMock | ||
| from ..connectors.datadis import DatadisConnector | ||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
| from ..connectors.datadis import DatadisConnector | ||
| from edata.providers.datadis import DatadisConnector | ||
@@ -73,6 +71,9 @@ MOCK_USERNAME = "USERNAME" | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_supplies(mock_token, mock_get, snapshot): | ||
| """Test a successful 'get_supplies' query (syrupy snapshot).""" | ||
| """Test a successful 'get_supplies' query.""" | ||
| mock_response = MagicMock() | ||
@@ -87,7 +88,8 @@ mock_response.status = 200 | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_contract_detail(mock_token, mock_get, snapshot): | ||
| """Test a successful 'get_contract_detail' query (syrupy snapshot).""" | ||
| """Test a successful 'get_contract_detail' query.""" | ||
| mock_response = MagicMock() | ||
@@ -102,7 +104,8 @@ mock_response.status = 200 | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_consumption_data(mock_token, mock_get, snapshot): | ||
| """Test a successful 'get_consumption_data' query (syrupy snapshot).""" | ||
| """Test a successful 'get_consumption_data' query.""" | ||
| mock_response = MagicMock() | ||
@@ -114,17 +117,21 @@ mock_response.status = 200 | ||
| connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD) | ||
| assert connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) == snapshot | ||
| assert ( | ||
| connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) | ||
| == snapshot | ||
| ) | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_max_power(mock_token, mock_get, snapshot): | ||
| """Test a successful 'get_max_power' query (syrupy snapshot).""" | ||
| """Test a successful 'get_max_power' query.""" | ||
| mock_response = MagicMock() | ||
@@ -136,15 +143,20 @@ mock_response.status = 200 | ||
| connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD) | ||
| assert connector.get_max_power( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 3, 1, 0, 0, 0), | ||
| datetime.datetime(2022, 4, 1, 0, 0, 0), | ||
| None, | ||
| ) == snapshot | ||
| assert ( | ||
| connector.get_max_power( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 3, 1, 0, 0, 0), | ||
| datetime.datetime(2022, 4, 1, 0, 0, 0), | ||
| None, | ||
| ) | ||
| == snapshot | ||
| ) | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_supplies_empty_response(mock_token, mock_get, snapshot): | ||
| """Test get_supplies with empty response (syrupy snapshot).""" | ||
| """Test get_supplies with empty response.""" | ||
| mock_response = MagicMock() | ||
@@ -160,3 +172,5 @@ mock_response.status = 200 | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_supplies_malformed_response(mock_token, mock_get, snapshot): | ||
@@ -175,5 +189,7 @@ """Test get_supplies with malformed response (missing required fields, syrupy snapshot).""" | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_supplies_partial_response(mock_token, mock_get, snapshot): | ||
| """Test get_supplies with partial valid/invalid response (syrupy snapshot).""" | ||
| """Test get_supplies with partial valid/invalid response.""" | ||
| partial = [ | ||
@@ -192,5 +208,6 @@ SUPPLIES_RESPONSE[0], | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_consumption_data_cache(mock_token, mock_get, snapshot): | ||
@@ -205,20 +222,26 @@ """Test get_consumption_data uses cache on second call (should not call HTTP again, syrupy snapshot).""" | ||
| # First call populates cache | ||
| assert connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) == snapshot | ||
| assert ( | ||
| connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) | ||
| == snapshot | ||
| ) | ||
| # Second call should use cache, not call HTTP again | ||
| mock_get.reset_mock() | ||
| assert connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) == snapshot | ||
| assert ( | ||
| connector.get_consumption_data( | ||
| "ESXXXXXXXXXXXXXXXXTEST", | ||
| "2", | ||
| datetime.datetime(2022, 10, 22, 0, 0, 0), | ||
| datetime.datetime(2022, 10, 22, 2, 0, 0), | ||
| "0", | ||
| 5, | ||
| ) | ||
| == snapshot | ||
| ) | ||
| mock_get.assert_not_called() | ||
@@ -228,5 +251,7 @@ | ||
| @patch("aiohttp.ClientSession.get") | ||
| @patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True) | ||
| @patch.object( | ||
| DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True | ||
| ) | ||
| def test_get_supplies_optional_fields_none(mock_token, mock_get, snapshot): | ||
| """Test get_supplies with optional fields as None (syrupy snapshot).""" | ||
| """Test get_supplies with optional fields as None.""" | ||
| response = [ | ||
@@ -252,2 +277,2 @@ { | ||
| connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD) | ||
| assert connector.get_supplies() == snapshot | ||
| assert connector.get_supplies() == snapshot |
@@ -1,7 +0,8 @@ | ||
| """Tests for REData (online)""" | ||
| """Tests for REDataConnector (online)""" | ||
| from datetime import datetime, timedelta | ||
| from ..connectors.redata import REDataConnector | ||
| from edata.providers.redata import REDataConnector | ||
| def test_get_realtime_prices(): | ||
@@ -8,0 +9,0 @@ """Test a successful 'get_realtime_prices' query""" |
+120
-76
| Metadata-Version: 2.4 | ||
| Name: e-data | ||
| Version: 1.3.3 | ||
| Version: 2.0.0.dev112 | ||
| Summary: Python library for managing spanish energy data from various web providers | ||
@@ -684,19 +684,21 @@ Author-email: VMG <vmayorg@outlook.es> | ||
| Classifier: Programming Language :: Python | ||
| Classifier: Programming Language :: Python :: 3.11 | ||
| Classifier: Programming Language :: Python :: 3.12 | ||
| Classifier: Programming Language :: Python :: Implementation :: CPython | ||
| Classifier: Programming Language :: Python :: Implementation :: PyPy | ||
| Requires-Python: >=3.11.0 | ||
| Requires-Python: >=3.12.0 | ||
| Description-Content-Type: text/markdown | ||
| License-File: LICENSE | ||
| Requires-Dist: dateparser>=1.1.2 | ||
| Requires-Dist: freezegun>=1.2.1 | ||
| Requires-Dist: holidays>=0.14.2 | ||
| Requires-Dist: pytest>=7.1.2 | ||
| Requires-Dist: python_dateutil>=2.8.2 | ||
| Requires-Dist: requests>=2.28.1 | ||
| Requires-Dist: voluptuous>=0.13.1 | ||
| Requires-Dist: Jinja2>=3.1.2 | ||
| Requires-Dist: diskcache>=5.6.3 | ||
| Requires-Dist: aiohttp>=3.12.15 | ||
| Requires-Dist: aiohttp<4,>=3.12 | ||
| Requires-Dist: dateparser<2,>=1.2 | ||
| Requires-Dist: diskcache<6,>=5.6 | ||
| Requires-Dist: freezegun<2,>=1.5 | ||
| Requires-Dist: holidays<1,>=0.50 | ||
| Requires-Dist: Jinja2<4,>=3.1 | ||
| Requires-Dist: pydantic<3,>=2.10 | ||
| Requires-Dist: python_dateutil<3,>=2.8 | ||
| Requires-Dist: Requests<3,>=2.31 | ||
| Requires-Dist: SQLAlchemy<3,>=2.0 | ||
| Requires-Dist: sqlmodel<0.1,>=0.0.22 | ||
| Requires-Dist: typer<1,>=0.12 | ||
| Requires-Dist: aiosqlite<1,>=0.21 | ||
| Dynamic: license-file | ||
@@ -710,3 +712,3 @@ | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
@@ -731,81 +733,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_ | ||
| El paquete consta de tres módulos diferenciados: | ||
| El paquete consta de varios módulos: | ||
| * **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData. | ||
| * **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py` | ||
| * **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis). | ||
| * **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos. | ||
| - `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros). | ||
| - `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real). | ||
| Estos módulos corresponden a la siguiente estructura del paquete: | ||
| * **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos. | ||
| - `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas). | ||
| - `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes. | ||
| ``` | ||
| edata/ | ||
| · __init__.py | ||
| · connectors/ | ||
| · __init__.py | ||
| · datadis.py | ||
| · redata.py | ||
| · processors/ | ||
| · __init__.py | ||
| · base.py | ||
| · billing.py | ||
| · consumption.py | ||
| · maximeter.py | ||
| · utils.py | ||
| · helpers.py | ||
| ``` | ||
| * **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic. | ||
| - `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`). | ||
| - `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`). | ||
| - `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`). | ||
| * **Base de datos** (`database`): Gestión de persistencia local con SQLite. | ||
| - `controller.py`: Controlador principal de la base de datos (`EdataDB`). | ||
| - `models.py`: Modelos SQLModel para las tablas de la base de datos. | ||
| - `queries.py`: Consultas SQL predefinidas. | ||
| * **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc. | ||
| * **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas. | ||
| ## Ejemplo de uso | ||
| Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones: | ||
| ### Usando los servicios (recomendado) | ||
| Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones: | ||
| * No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto) | ||
| * El username suele ser el NIF del titular | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura. | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular. | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular | ||
| ``` python | ||
| import asyncio | ||
| from datetime import datetime | ||
| import json | ||
| from edata.services.data_service import DataService | ||
| from edata.services.bill_service import BillService | ||
| from edata.models.bill import PVPCBillingRules | ||
| # importamos definiciones de datos que nos interesen | ||
| from edata.definitions import PricingRules | ||
| # importamos el ayudante | ||
| from edata.helpers import EdataHelper | ||
| # importamos el procesador de utilidades | ||
| from edata.processors import utils | ||
| async def main(): | ||
| # Crear el servicio de datos | ||
| data_service = DataService( | ||
| cups="ES0000000000000000XX", # Tu CUPS | ||
| datadis_user="12345678A", # Tu NIF/usuario | ||
| datadis_pwd="tu_password", | ||
| storage_path="./my_data" # Directorio para la BD | ||
| datadis_authorized_nif=None, # NIF autorizado (opcional) | ||
| ) | ||
| # Actualizar todos los datos disponibles | ||
| await data_service.update() | ||
| # Obtener suministros y contratos | ||
| supplies = await data_service.get_supplies() | ||
| contracts = await data_service.get_contracts() | ||
| # Obtener datos de consumo y potencia | ||
| energy = await data_service.get_energy() | ||
| power = await data_service.get_power() | ||
| # Obtener estadísticas agregadas (diarias o mensuales) | ||
| daily_stats = await data_service.get_statistics("day") | ||
| monthly_stats = await data_service.get_statistics("month") | ||
| # Crear el servicio de facturación | ||
| bill_service = BillService( | ||
| cups="ES0000000000000000XX", | ||
| storage_path="./my_data" # Mismo directorio que data_service | ||
| ) | ||
| # Definir reglas de facturación PVPC (valores por defecto españoles) | ||
| pvpc_rules = PVPCBillingRules( | ||
| p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año) | ||
| p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año) | ||
| meter_month_eur=0.81, # Alquiler contador (€/mes) | ||
| market_kw_year_eur=3.113, # Otros cargos (€/kW/año) | ||
| electricity_tax=1.0511300560, # Impuesto eléctrico | ||
| iva_tax=1.21 # IVA (21%) | ||
| ) | ||
| # Actualizar facturación con PVPC | ||
| start_date = datetime(2024, 1, 1) | ||
| end_date = datetime(2024, 12, 31) | ||
| await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True) | ||
| # Obtener facturas calculadas | ||
| bills = await bill_service.get_bills(start_date, end_date, type_="hour") | ||
| daily_bills = await bill_service.get_bills(start_date, end_date, type_="day") | ||
| monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month") | ||
| # Mostrar resumen | ||
| total_cost = sum(b.value_eur for b in bills) | ||
| print(f"Coste total: {total_cost:.2f} €") | ||
| print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh") | ||
| # Preparar reglas de tarificación (si se quiere) | ||
| PRICING_RULES_PVPC = PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.05, | ||
| # podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc) | ||
| p1_kwh_eur=None, | ||
| p2_kwh_eur=None, | ||
| p3_kwh_eur=None, | ||
| ) | ||
| # Ejecutar | ||
| asyncio.run(main()) | ||
| ``` | ||
| # Instanciar el helper | ||
| # 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS. | ||
| # 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos. | ||
| edata = EdataHelper( | ||
| "datadis_user", | ||
| "datadis_password", | ||
| "cups", | ||
| datadis_authorized_nif=None, | ||
| pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación | ||
| data=None, # aquí podríamos cargar datos anteriores | ||
| ) | ||
| ### Usando la CLI | ||
| # Solicitar actualización de todo el histórico (se almacena en edata.data) | ||
| edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today()) | ||
| El paquete incluye una interfaz de línea de comandos para operaciones básicas: | ||
| # volcamos todo lo obtenido a un fichero | ||
| with open("backup.json", "w") as file: | ||
| json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup | ||
| ``` bash | ||
| # Ver suministros disponibles | ||
| python -m edata.cli show-supplies <username> | ||
| # Imprimir atributos | ||
| print(edata.attributes) | ||
| # Descargar todos los datos de un CUPS | ||
| python -m edata.cli download-all --cups ES0000000000000000XX <username> | ||
| # Actualizar facturación con tarifa fija | ||
| python -m edata.cli update-bill \ | ||
| --cups ES0000000000000000XX \ | ||
| --p1-kw-year-eur 30.67 \ | ||
| --p2-kw-year-eur 1.42 \ | ||
| --p1-kwh-eur 0.15 \ | ||
| --p2-kwh-eur 0.10 \ | ||
| --p3-kwh-eur 0.08 \ | ||
| --meter-month-eur 0.81 | ||
| ``` |
+15
-13
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "e-data" | ||
| version = "1.3.3" | ||
| version = "2.0.0.dev112" | ||
| description = "Python library for managing spanish energy data from various web providers" | ||
@@ -15,7 +15,6 @@ readme = "README.md" | ||
| license = { file = "LICENSE" } | ||
| requires-python = ">=3.11.0" | ||
| requires-python = ">=3.12.0" | ||
| classifiers = [ | ||
| "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", | ||
| "Programming Language :: Python", | ||
| "Programming Language :: Python :: 3.11", | ||
| "Programming Language :: Python :: 3.12", | ||
@@ -26,12 +25,15 @@ "Programming Language :: Python :: Implementation :: CPython", | ||
| dependencies = [ | ||
| "dateparser>=1.1.2", | ||
| "freezegun>=1.2.1", | ||
| "holidays>=0.14.2", | ||
| "pytest>=7.1.2", | ||
| "python_dateutil>=2.8.2", | ||
| "requests>=2.28.1", | ||
| "voluptuous>=0.13.1", | ||
| "Jinja2>=3.1.2", | ||
| "diskcache>=5.6.3", | ||
| "aiohttp>=3.12.15" | ||
| "aiohttp>=3.12,<4", | ||
| "dateparser>=1.2,<2", | ||
| "diskcache>=5.6,<6", | ||
| "freezegun>=1.5,<2", | ||
| "holidays>=0.50,<1", | ||
| "Jinja2>=3.1,<4", | ||
| "pydantic>=2.10,<3", | ||
| "python_dateutil>=2.8,<3", | ||
| "Requests>=2.31,<3", | ||
| "SQLAlchemy>=2.0,<3", | ||
| "sqlmodel>=0.0.22,<0.1", | ||
| "typer>=0.12,<1", | ||
| "aiosqlite>=0.21,<1", | ||
| ] | ||
@@ -38,0 +40,0 @@ |
+105
-63
@@ -7,3 +7,3 @@ [](https://pepy.tech/project/e-data) | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
| Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata). | ||
@@ -28,81 +28,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_ | ||
| El paquete consta de tres módulos diferenciados: | ||
| El paquete consta de varios módulos: | ||
| * **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData. | ||
| * **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py` | ||
| * **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis). | ||
| * **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos. | ||
| - `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros). | ||
| - `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real). | ||
| Estos módulos corresponden a la siguiente estructura del paquete: | ||
| * **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos. | ||
| - `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas). | ||
| - `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes. | ||
| ``` | ||
| edata/ | ||
| · __init__.py | ||
| · connectors/ | ||
| · __init__.py | ||
| · datadis.py | ||
| · redata.py | ||
| · processors/ | ||
| · __init__.py | ||
| · base.py | ||
| · billing.py | ||
| · consumption.py | ||
| · maximeter.py | ||
| · utils.py | ||
| · helpers.py | ||
| ``` | ||
| * **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic. | ||
| - `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`). | ||
| - `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`). | ||
| - `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`). | ||
| * **Base de datos** (`database`): Gestión de persistencia local con SQLite. | ||
| - `controller.py`: Controlador principal de la base de datos (`EdataDB`). | ||
| - `models.py`: Modelos SQLModel para las tablas de la base de datos. | ||
| - `queries.py`: Consultas SQL predefinidas. | ||
| * **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc. | ||
| * **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas. | ||
| ## Ejemplo de uso | ||
| Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones: | ||
| ### Usando los servicios (recomendado) | ||
| Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones: | ||
| * No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto) | ||
| * El username suele ser el NIF del titular | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura. | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular. | ||
| * Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura | ||
| * La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular | ||
| ``` python | ||
| import asyncio | ||
| from datetime import datetime | ||
| import json | ||
| from edata.services.data_service import DataService | ||
| from edata.services.bill_service import BillService | ||
| from edata.models.bill import PVPCBillingRules | ||
| # importamos definiciones de datos que nos interesen | ||
| from edata.definitions import PricingRules | ||
| # importamos el ayudante | ||
| from edata.helpers import EdataHelper | ||
| # importamos el procesador de utilidades | ||
| from edata.processors import utils | ||
| async def main(): | ||
| # Crear el servicio de datos | ||
| data_service = DataService( | ||
| cups="ES0000000000000000XX", # Tu CUPS | ||
| datadis_user="12345678A", # Tu NIF/usuario | ||
| datadis_pwd="tu_password", | ||
| storage_path="./my_data" # Directorio para la BD | ||
| datadis_authorized_nif=None, # NIF autorizado (opcional) | ||
| ) | ||
| # Actualizar todos los datos disponibles | ||
| await data_service.update() | ||
| # Obtener suministros y contratos | ||
| supplies = await data_service.get_supplies() | ||
| contracts = await data_service.get_contracts() | ||
| # Obtener datos de consumo y potencia | ||
| energy = await data_service.get_energy() | ||
| power = await data_service.get_power() | ||
| # Obtener estadísticas agregadas (diarias o mensuales) | ||
| daily_stats = await data_service.get_statistics("day") | ||
| monthly_stats = await data_service.get_statistics("month") | ||
| # Crear el servicio de facturación | ||
| bill_service = BillService( | ||
| cups="ES0000000000000000XX", | ||
| storage_path="./my_data" # Mismo directorio que data_service | ||
| ) | ||
| # Definir reglas de facturación PVPC (valores por defecto españoles) | ||
| pvpc_rules = PVPCBillingRules( | ||
| p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año) | ||
| p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año) | ||
| meter_month_eur=0.81, # Alquiler contador (€/mes) | ||
| market_kw_year_eur=3.113, # Otros cargos (€/kW/año) | ||
| electricity_tax=1.0511300560, # Impuesto eléctrico | ||
| iva_tax=1.21 # IVA (21%) | ||
| ) | ||
| # Actualizar facturación con PVPC | ||
| start_date = datetime(2024, 1, 1) | ||
| end_date = datetime(2024, 12, 31) | ||
| await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True) | ||
| # Obtener facturas calculadas | ||
| bills = await bill_service.get_bills(start_date, end_date, type_="hour") | ||
| daily_bills = await bill_service.get_bills(start_date, end_date, type_="day") | ||
| monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month") | ||
| # Mostrar resumen | ||
| total_cost = sum(b.value_eur for b in bills) | ||
| print(f"Coste total: {total_cost:.2f} €") | ||
| print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh") | ||
| # Preparar reglas de tarificación (si se quiere) | ||
| PRICING_RULES_PVPC = PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.05, | ||
| # podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc) | ||
| p1_kwh_eur=None, | ||
| p2_kwh_eur=None, | ||
| p3_kwh_eur=None, | ||
| ) | ||
| # Ejecutar | ||
| asyncio.run(main()) | ||
| ``` | ||
| # Instanciar el helper | ||
| # 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS. | ||
| # 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos. | ||
| edata = EdataHelper( | ||
| "datadis_user", | ||
| "datadis_password", | ||
| "cups", | ||
| datadis_authorized_nif=None, | ||
| pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación | ||
| data=None, # aquí podríamos cargar datos anteriores | ||
| ) | ||
| ### Usando la CLI | ||
| # Solicitar actualización de todo el histórico (se almacena en edata.data) | ||
| edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today()) | ||
| El paquete incluye una interfaz de línea de comandos para operaciones básicas: | ||
| # volcamos todo lo obtenido a un fichero | ||
| with open("backup.json", "w") as file: | ||
| json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup | ||
| ``` bash | ||
| # Ver suministros disponibles | ||
| python -m edata.cli show-supplies <username> | ||
| # Imprimir atributos | ||
| print(edata.attributes) | ||
| # Descargar todos los datos de un CUPS | ||
| python -m edata.cli download-all --cups ES0000000000000000XX <username> | ||
| # Actualizar facturación con tarifa fija | ||
| python -m edata.cli update-bill \ | ||
| --cups ES0000000000000000XX \ | ||
| --p1-kw-year-eur 30.67 \ | ||
| --p2-kw-year-eur 1.42 \ | ||
| --p1-kwh-eur 0.15 \ | ||
| --p2-kwh-eur 0.10 \ | ||
| --p3-kwh-eur 0.08 \ | ||
| --meter-month-eur 0.81 | ||
| ``` |
| """Datadis API connector. | ||
| To fetch data from datadis.es private API. | ||
| There a few issues that are workarounded: | ||
| - You have to wait 24h between two identical requests. | ||
| - Datadis server does not like ranges greater than 1 month. | ||
| """ | ||
| import contextlib | ||
| from datetime import datetime, timedelta | ||
| import hashlib | ||
| import logging | ||
| import os | ||
| import tempfile | ||
| import diskcache | ||
| from dateutil.relativedelta import relativedelta | ||
| import aiohttp | ||
| import asyncio | ||
| from ..definitions import ConsumptionData, ContractData, MaxPowerData, SupplyData | ||
| from ..processors import utils | ||
| _LOGGER = logging.getLogger(__name__) | ||
| # Token-related constants | ||
| URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login" | ||
| TOKEN_USERNAME = "username" | ||
| TOKEN_PASSWD = "password" | ||
| # Supplies-related constants | ||
| URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies" | ||
| GET_SUPPLIES_MANDATORY_FIELDS = [ | ||
| "cups", | ||
| "validDateFrom", | ||
| "validDateTo", | ||
| "pointType", | ||
| "distributorCode", | ||
| ] | ||
| # Contracts-related constants | ||
| URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail" | ||
| GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [ | ||
| "startDate", | ||
| "endDate", | ||
| "marketer", | ||
| "contractedPowerkW", | ||
| ] | ||
| # Consumption-related constants | ||
| URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data" | ||
| GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [ | ||
| "time", | ||
| "date", | ||
| "consumptionKWh", | ||
| "obtainMethod", | ||
| ] | ||
| MAX_CONSUMPTIONS_MONTHS = ( | ||
| 1 # max consumptions in a single request (fixed to 1 due to datadis limitations) | ||
| ) | ||
| # Maximeter-related constants | ||
| URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power" | ||
| GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"] | ||
| # Timing constants | ||
| TIMEOUT = 3 * 60 # requests timeout | ||
| QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again... | ||
| # Cache-related constants | ||
| RECENT_CACHE_SUBDIR = "cache" | ||
| def migrate_storage(storage_dir): | ||
| """Migrate storage from older versions.""" | ||
| with contextlib.suppress(FileNotFoundError): | ||
| os.remove(os.path.join(storage_dir, "edata_recent_queries.json")) | ||
| os.remove(os.path.join(storage_dir, "edata_recent_queries_cache.json")) | ||
| class DatadisConnector: | ||
| """A Datadis private API connector.""" | ||
| def __init__( | ||
| self, | ||
| username: str, | ||
| password: str, | ||
| enable_smart_fetch: bool = True, | ||
| storage_path: str | None = None, | ||
| ) -> None: | ||
| self._usr = username | ||
| self._pwd = password | ||
| self._token = {} | ||
| self._smart_fetch = enable_smart_fetch | ||
| self._recent_queries = {} | ||
| self._recent_cache = {} | ||
| self._warned_queries = [] | ||
| if storage_path is not None: | ||
| self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR) | ||
| migrate_storage(storage_path) | ||
| else: | ||
| self._recent_cache_dir = os.path.join( | ||
| tempfile.gettempdir(), RECENT_CACHE_SUBDIR | ||
| ) | ||
| os.makedirs(self._recent_cache_dir, exist_ok=True) | ||
| self._cache = diskcache.Cache(self._recent_cache_dir) | ||
| def _update_recent_queries(self, query: str, data: dict | None = None) -> None: | ||
| """Cache a successful query to avoid exceeding query limits (diskcache).""" | ||
| hash_query = hashlib.md5(query.encode()).hexdigest() | ||
| try: | ||
| self._cache.set(hash_query, data, expire=QUERY_LIMIT.total_seconds()) | ||
| _LOGGER.info("Updating cache item '%s'", hash_query) | ||
| except Exception as e: | ||
| _LOGGER.warning("Unknown error while updating cache: %s", e) | ||
| def _is_recent_query(self, query: str) -> bool: | ||
| """Check if a query has been done recently to avoid exceeding query limits (diskcache).""" | ||
| hash_query = hashlib.md5(query.encode()).hexdigest() | ||
| return hash_query in self._cache | ||
| def _get_cache_for_query(self, query: str) -> dict | None: | ||
| """Return cached response for a query (diskcache).""" | ||
| hash_query = hashlib.md5(query.encode()).hexdigest() | ||
| try: | ||
| return self._cache.get(hash_query, default=None) | ||
| except Exception: | ||
| return None | ||
| async def _async_get_token(self): | ||
| """Private async method that fetches a new token if needed.""" | ||
| _LOGGER.info("No token found, fetching a new one") | ||
| is_valid_token = False | ||
| timeout = aiohttp.ClientTimeout(total=TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.post( | ||
| URL_TOKEN, | ||
| data={ | ||
| TOKEN_USERNAME: self._usr, | ||
| TOKEN_PASSWD: self._pwd, | ||
| }, | ||
| ) as response: | ||
| text = await response.text() | ||
| if response.status == 200: | ||
| self._token["encoded"] = text | ||
| self._token["headers"] = {"Authorization": "Bearer " + self._token["encoded"]} | ||
| is_valid_token = True | ||
| else: | ||
| _LOGGER.error("Unknown error while retrieving token, got %s", text) | ||
| except Exception as e: | ||
| _LOGGER.error("Exception while retrieving token: %s", e) | ||
| return is_valid_token | ||
| def login(self): | ||
| """Test to login with provided credentials (sync wrapper).""" | ||
| return asyncio.run(self._async_get_token()) | ||
| async def _async_get( | ||
| self, | ||
| url: str, | ||
| request_data: dict | None = None, | ||
| refresh_token: bool = False, | ||
| is_retry: bool = False, | ||
| ignore_recent_queries: bool = False, | ||
| ): | ||
| """Async get request for Datadis API.""" | ||
| if request_data is None: | ||
| data = {} | ||
| else: | ||
| data = request_data | ||
| is_valid_token = False | ||
| response = [] | ||
| if refresh_token: | ||
| is_valid_token = await self._async_get_token() | ||
| if is_valid_token or not refresh_token: | ||
| params = "?" if len(data) > 0 else "" | ||
| for param in data: | ||
| key = param | ||
| value = data[param] | ||
| params = params + f"{key}={value}&" | ||
| anonym_params = "?" if len(data) > 0 else "" | ||
| for anonym_param in data: | ||
| key = anonym_param | ||
| if key == "cups": | ||
| value = "xxxx" + str(data[anonym_param])[-5:] | ||
| elif key == "authorizedNif": | ||
| value = "xxxx" | ||
| else: | ||
| value = data[anonym_param] | ||
| anonym_params = anonym_params + f"{key}={value}&" | ||
| if not ignore_recent_queries and self._is_recent_query(url + params): | ||
| _cache = self._get_cache_for_query(url + params) | ||
| if _cache is not None: | ||
| _LOGGER.info( | ||
| "Returning cached response for '%s'", url + anonym_params | ||
| ) | ||
| return _cache | ||
| return [] | ||
| try: | ||
| _LOGGER.info("GET %s", url + anonym_params) | ||
| headers = {"Accept-Encoding": "identity"} | ||
| if self._token.get("headers"): | ||
| headers.update(self._token["headers"]) | ||
| timeout = aiohttp.ClientTimeout(total=TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| async with session.get( | ||
| url + params, | ||
| headers=headers, | ||
| ) as reply: | ||
| text = await reply.text() | ||
| if reply.status == 200: | ||
| _LOGGER.info("Got 200 OK") | ||
| try: | ||
| json_data = await reply.json(content_type=None) | ||
| if json_data: | ||
| response = json_data | ||
| if not ignore_recent_queries: | ||
| self._update_recent_queries(url + params, response) | ||
| else: | ||
| _LOGGER.info("Got an empty response") | ||
| if not ignore_recent_queries: | ||
| self._update_recent_queries(url + params) | ||
| except Exception as e: | ||
| _LOGGER.warning("Failed to parse JSON response") | ||
| elif reply.status == 401 and not refresh_token: | ||
| response = await self._async_get( | ||
| url, | ||
| request_data=data, | ||
| refresh_token=True, | ||
| ignore_recent_queries=ignore_recent_queries, | ||
| ) | ||
| elif reply.status == 429: | ||
| _LOGGER.warning( | ||
| "Got status code '%s' with message '%s'", | ||
| reply.status, | ||
| text, | ||
| ) | ||
| if not ignore_recent_queries: | ||
| self._update_recent_queries(url + params) | ||
| elif is_retry: | ||
| if (url + params) not in self._warned_queries: | ||
| _LOGGER.warning( | ||
| "Got status code '%s' with message '%s'. %s. %s", | ||
| reply.status, | ||
| text, | ||
| "Query temporary disabled", | ||
| "Future 500 code errors for this query will be silenced until restart", | ||
| ) | ||
| if not ignore_recent_queries: | ||
| self._update_recent_queries(url + params) | ||
| self._warned_queries.append(url + params) | ||
| else: | ||
| response = await self._async_get( | ||
| url, | ||
| request_data, | ||
| is_retry=True, | ||
| ignore_recent_queries=ignore_recent_queries, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| _LOGGER.warning("Timeout at %s", url + anonym_params) | ||
| return [] | ||
| except Exception as e: | ||
| _LOGGER.warning("Exception at %s: %s", url + anonym_params, e) | ||
| return [] | ||
| return response | ||
| async def async_get_supplies(self, authorized_nif: str | None = None): | ||
| data = {} | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get( | ||
| URL_GET_SUPPLIES, request_data=data, ignore_recent_queries=True | ||
| ) | ||
| supplies = [] | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS): | ||
| supplies.append( | ||
| SupplyData( | ||
| cups=i["cups"], | ||
| date_start=datetime.strptime( | ||
| ( | ||
| i["validDateFrom"] | ||
| if i["validDateFrom"] != "" | ||
| else "1970/01/01" | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), | ||
| date_end=datetime.strptime( | ||
| ( | ||
| i["validDateTo"] | ||
| if i["validDateTo"] != "" | ||
| else tomorrow_str | ||
| ), | ||
| "%Y/%m/%d", | ||
| ), | ||
| address=i.get("address", None), | ||
| postal_code=i.get("postalCode", None), | ||
| province=i.get("province", None), | ||
| municipality=i.get("municipality", None), | ||
| distributor=i.get("distributor", None), | ||
| pointType=i["pointType"], | ||
| distributorCode=i["distributorCode"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching supplies data, got %s", | ||
| response, | ||
| ) | ||
| return supplies | ||
| def get_supplies(self, authorized_nif: str | None = None): | ||
| """Datadis 'get_supplies' query (sync wrapper).""" | ||
| return asyncio.run(self.async_get_supplies(authorized_nif=authorized_nif)) | ||
| async def async_get_contract_detail( | ||
| self, cups: str, distributor_code: str, authorized_nif: str | None = None | ||
| ): | ||
| data = {"cups": cups, "distributorCode": distributor_code} | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get( | ||
| URL_GET_CONTRACT_DETAIL, request_data=data, ignore_recent_queries=True | ||
| ) | ||
| contracts = [] | ||
| tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d") | ||
| for i in response: | ||
| if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS): | ||
| contracts.append( | ||
| ContractData( | ||
| date_start=datetime.strptime( | ||
| i["startDate"] if i["startDate"] != "" else "1970/01/01", | ||
| "%Y/%m/%d", | ||
| ), | ||
| date_end=datetime.strptime( | ||
| i["endDate"] if i["endDate"] != "" else tomorrow_str, | ||
| "%Y/%m/%d", | ||
| ), | ||
| marketer=i["marketer"], | ||
| distributorCode=distributor_code, | ||
| power_p1=( | ||
| i["contractedPowerkW"][0] | ||
| if isinstance(i["contractedPowerkW"], list) | ||
| else None | ||
| ), | ||
| power_p2=( | ||
| i["contractedPowerkW"][1] | ||
| if (len(i["contractedPowerkW"]) > 1) | ||
| else None | ||
| ), | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching contracts data, got %s", | ||
| response, | ||
| ) | ||
| return contracts | ||
| def get_contract_detail( | ||
| self, cups: str, distributor_code: str, authorized_nif: str | None = None | ||
| ): | ||
| """Datadis get_contract_detail query (sync wrapper).""" | ||
| return asyncio.run(self.async_get_contract_detail(cups, distributor_code, authorized_nif)) | ||
| async def async_get_consumption_data( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| authorized_nif: str | None = None, | ||
| is_smart_fetch: bool = False, | ||
| ): | ||
| if self._smart_fetch and not is_smart_fetch: | ||
| _start = start_date | ||
| consumptions = [] | ||
| while _start < end_date: | ||
| _end = min( | ||
| _start + relativedelta(months=MAX_CONSUMPTIONS_MONTHS), end_date | ||
| ) | ||
| sub_consumptions = await self.async_get_consumption_data( | ||
| cups, | ||
| distributor_code, | ||
| _start, | ||
| _end, | ||
| measurement_type, | ||
| point_type, | ||
| authorized_nif, | ||
| is_smart_fetch=True, | ||
| ) | ||
| consumptions = utils.extend_by_key( | ||
| consumptions, | ||
| sub_consumptions, | ||
| "datetime", | ||
| ) | ||
| _start = _end | ||
| return consumptions | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| "measurementType": measurement_type, | ||
| "pointType": point_type, | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get(URL_GET_CONSUMPTION_DATA, request_data=data) | ||
| consumptions = [] | ||
| for i in response: | ||
| if "consumptionKWh" in i: | ||
| if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS): | ||
| hour = str(int(i["time"].split(":")[0]) - 1) | ||
| date_as_dt = datetime.strptime( | ||
| f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M" | ||
| ) | ||
| if not (start_date <= date_as_dt <= end_date): | ||
| continue # skip element if dt is out of range | ||
| _surplus = i.get("surplusEnergyKWh", 0) | ||
| if _surplus is None: | ||
| _surplus = 0 | ||
| consumptions.append( | ||
| ConsumptionData( | ||
| datetime=date_as_dt, | ||
| delta_h=1, | ||
| value_kWh=i["consumptionKWh"], | ||
| surplus_kWh=_surplus, | ||
| real=i["obtainMethod"] == "Real", | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching consumption data, got %s", | ||
| response, | ||
| ) | ||
| return consumptions | ||
| def get_consumption_data( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| authorized_nif: str | None = None, | ||
| is_smart_fetch: bool = False, | ||
| ): | ||
| """Datadis get_consumption_data query (sync wrapper).""" | ||
| return asyncio.run(self.async_get_consumption_data( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| measurement_type, | ||
| point_type, | ||
| authorized_nif, | ||
| is_smart_fetch, | ||
| )) | ||
| async def async_get_max_power( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| data = { | ||
| "cups": cups, | ||
| "distributorCode": distributor_code, | ||
| "startDate": datetime.strftime(start_date, "%Y/%m"), | ||
| "endDate": datetime.strftime(end_date, "%Y/%m"), | ||
| } | ||
| if authorized_nif is not None: | ||
| data["authorizedNif"] = authorized_nif | ||
| response = await self._async_get(URL_GET_MAX_POWER, request_data=data) | ||
| maxpower_values = [] | ||
| for i in response: | ||
| if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS): | ||
| maxpower_values.append( | ||
| MaxPowerData( | ||
| datetime=datetime.strptime( | ||
| f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M" | ||
| ), | ||
| value_kW=i["maxPower"], | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.warning( | ||
| "Weird data structure while fetching maximeter data, got %s", | ||
| response, | ||
| ) | ||
| return maxpower_values | ||
| def get_max_power( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| authorized_nif: str | None = None, | ||
| ): | ||
| """Datadis get_max_power query (sync wrapper).""" | ||
| return asyncio.run(self.async_get_max_power( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| authorized_nif, | ||
| )) |
| """A REData API connector""" | ||
| import datetime as dt | ||
| import logging | ||
| import aiohttp | ||
| import asyncio | ||
| from dateutil import parser | ||
| from ..definitions import PricingData | ||
| _LOGGER = logging.getLogger(__name__) | ||
| REQUESTS_TIMEOUT = 15 | ||
| URL_REALTIME_PRICES = ( | ||
| "https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real" | ||
| "?time_trunc=hour" | ||
| "&geo_ids={geo_id}" | ||
| "&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}" | ||
| ) | ||
| class REDataConnector: | ||
| """Main class for REData connector""" | ||
| def __init__( | ||
| self, | ||
| ) -> None: | ||
| """Init method for REDataConnector""" | ||
| async def async_get_realtime_prices( | ||
| self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False | ||
| ) -> list: | ||
| """GET query to fetch realtime pvpc prices, historical data is limited to current month (async)""" | ||
| url = URL_REALTIME_PRICES.format( | ||
| geo_id=8744 if is_ceuta_melilla else 8741, | ||
| start=dt_from, | ||
| end=dt_to, | ||
| ) | ||
| data = [] | ||
| timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT) | ||
| async with aiohttp.ClientSession(timeout=timeout) as session: | ||
| try: | ||
| async with session.get(url) as res: | ||
| text = await res.text() | ||
| if res.status == 200: | ||
| try: | ||
| res_json = await res.json() | ||
| res_list = res_json["included"][0]["attributes"]["values"] | ||
| except (IndexError, KeyError): | ||
| _LOGGER.error( | ||
| "%s returned a malformed response: %s ", | ||
| url, | ||
| text, | ||
| ) | ||
| return data | ||
| for element in res_list: | ||
| data.append( | ||
| PricingData( | ||
| datetime=parser.parse(element["datetime"]).replace(tzinfo=None), | ||
| value_eur_kWh=element["value"] / 1000, | ||
| delta_h=1, | ||
| ) | ||
| ) | ||
| else: | ||
| _LOGGER.error( | ||
| "%s returned %s with code %s", | ||
| url, | ||
| text, | ||
| res.status, | ||
| ) | ||
| except Exception as e: | ||
| _LOGGER.error("Exception fetching realtime prices: %s", e) | ||
| return data | ||
| def get_realtime_prices( | ||
| self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False | ||
| ) -> list: | ||
| """GET query to fetch realtime pvpc prices, historical data is limited to current month (sync wrapper)""" | ||
| return asyncio.run(self.async_get_realtime_prices(dt_from, dt_to, is_ceuta_melilla)) |
| """Constants file.""" | ||
| PROG_NAME = "edata" |
| """Definitions for data structures.""" | ||
| import voluptuous as vol | ||
| import datetime as dt | ||
| from typing import TypedDict | ||
| ATTRIBUTES = { | ||
| "cups": None, | ||
| "contract_p1_kW": "kW", | ||
| "contract_p2_kW": "kW", | ||
| "yesterday_kWh": "kWh", | ||
| "yesterday_hours": "h", | ||
| "yesterday_p1_kWh": "kWh", | ||
| "yesterday_p2_kWh": "kWh", | ||
| "yesterday_p3_kWh": "kWh", | ||
| "yesterday_surplus_kWh": "kWh", | ||
| "yesterday_surplus_p1_kWh": "kWh", | ||
| "yesterday_surplus_p2_kWh": "kWh", | ||
| "yesterday_surplus_p3_kWh": "kWh", | ||
| "last_registered_date": None, | ||
| "last_registered_day_kWh": "kWh", | ||
| "last_registered_day_hours": "h", | ||
| "last_registered_day_p1_kWh": "kWh", | ||
| "last_registered_day_p2_kWh": "kWh", | ||
| "last_registered_day_p3_kWh": "kWh", | ||
| "last_registered_day_surplus_kWh": "kWh", | ||
| "last_registered_day_surplus_p1_kWh": "kWh", | ||
| "last_registered_day_surplus_p2_kWh": "kWh", | ||
| "last_registered_day_surplus_p3_kWh": "kWh", | ||
| "month_kWh": "kWh", | ||
| "month_daily_kWh": "kWh", | ||
| "month_days": "d", | ||
| "month_p1_kWh": "kWh", | ||
| "month_p2_kWh": "kWh", | ||
| "month_p3_kWh": "kWh", | ||
| "month_surplus_kWh": "kWh", | ||
| "month_surplus_p1_kWh": "kWh", | ||
| "month_surplus_p2_kWh": "kWh", | ||
| "month_surplus_p3_kWh": "kWh", | ||
| "month_€": "€", | ||
| "last_month_kWh": "kWh", | ||
| "last_month_daily_kWh": "kWh", | ||
| "last_month_days": "d", | ||
| "last_month_p1_kWh": "kWh", | ||
| "last_month_p2_kWh": "kWh", | ||
| "last_month_p3_kWh": "kWh", | ||
| "last_month_surplus_kWh": "kWh", | ||
| "last_month_surplus_p1_kWh": "kWh", | ||
| "last_month_surplus_p2_kWh": "kWh", | ||
| "last_month_surplus_p3_kWh": "kWh", | ||
| "last_month_€": "€", | ||
| "max_power_kW": "kW", | ||
| "max_power_date": None, | ||
| "max_power_mean_kW": "kW", | ||
| "max_power_90perc_kW": "kW", | ||
| } | ||
| # Energy term with taxes | ||
| DEFAULT_BILLING_ENERGY_FORMULA = "electricity_tax * iva_tax * kwh_eur * kwh" | ||
| # Power term with taxes | ||
| DEFAULT_BILLING_POWER_FORMULA = "electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24" | ||
| # Others term with taxes | ||
| DEFAULT_BILLING_OTHERS_FORMULA = "iva_tax * meter_month_eur / 30 / 24" | ||
| # Surplus term with taxes | ||
| DEFAULT_BILLING_SURPLUS_FORMULA = ( | ||
| "electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur" | ||
| ) | ||
| # Sum energy and power terms, and substract surplus until 0. | ||
| # An alternative would be "[(energy_term + power_term - surplus_term), 0]|max + others_term" | ||
| DEFAULT_BILLING_MAIN_FORMULA = "energy_term + power_term + others_term" | ||
| class SupplyData(TypedDict): | ||
| """Data structure to represent a supply.""" | ||
| cups: str | ||
| date_start: dt.datetime | ||
| date_end: dt.datetime | ||
| address: str | None | ||
| postal_code: str | None | ||
| province: str | None | ||
| municipality: str | None | ||
| distributor: str | None | ||
| pointType: int | ||
| distributorCode: str | ||
| SupplySchema = vol.Schema( | ||
| { | ||
| vol.Required("cups"): str, | ||
| vol.Required("date_start"): dt.datetime, | ||
| vol.Required("date_end"): dt.datetime, | ||
| vol.Required("address"): vol.Union(str, None), | ||
| vol.Required("postal_code"): vol.Union(str, None), | ||
| vol.Required("province"): vol.Union(str, None), | ||
| vol.Required("municipality"): vol.Union(str, None), | ||
| vol.Required("distributor"): vol.Union(str, None), | ||
| vol.Required("pointType"): int, | ||
| vol.Required("distributorCode"): str, | ||
| } | ||
| ) | ||
| class ContractData(TypedDict): | ||
| """Data structure to represent a contract.""" | ||
| date_start: dt.datetime | ||
| date_end: dt.datetime | ||
| marketer: str | ||
| distributorCode: str | ||
| power_p1: float | None | ||
| power_p2: float | None | ||
| ContractSchema = vol.Schema( | ||
| { | ||
| vol.Required("date_start"): dt.datetime, | ||
| vol.Required("date_end"): dt.datetime, | ||
| vol.Required("marketer"): str, | ||
| vol.Required("distributorCode"): str, | ||
| vol.Required("power_p1"): vol.Union(vol.Coerce(float), None), | ||
| vol.Required("power_p2"): vol.Union(vol.Coerce(float), None), | ||
| } | ||
| ) | ||
| class ConsumptionData(TypedDict): | ||
| """Data structure to represent a consumption.""" | ||
| datetime: dt.datetime | ||
| delta_h: float | ||
| value_kWh: float | ||
| surplus_kWh: float | ||
| real: bool | ||
| ConsumptionSchema = vol.Schema( | ||
| { | ||
| vol.Required("datetime"): dt.datetime, | ||
| vol.Required("delta_h"): vol.Coerce(float), | ||
| vol.Required("value_kWh"): vol.Coerce(float), | ||
| vol.Optional("surplus_kWh", default=0): vol.Coerce(float), | ||
| vol.Required("real"): bool, | ||
| } | ||
| ) | ||
| class MaxPowerData(TypedDict): | ||
| """Data structure to represent a MaxPower.""" | ||
| datetime: dt.datetime | ||
| value_kW: float | ||
| MaxPowerSchema = vol.Schema( | ||
| { | ||
| vol.Required("datetime"): dt.datetime, | ||
| vol.Required("value_kW"): vol.Coerce(float), | ||
| } | ||
| ) | ||
| class PricingData(TypedDict): | ||
| """Data structure to represent pricing data.""" | ||
| datetime: dt.datetime | ||
| value_eur_kWh: float | ||
| delta_h: float | ||
| PricingSchema = vol.Schema( | ||
| { | ||
| vol.Required("datetime"): dt.datetime, | ||
| vol.Required("value_eur_kWh"): vol.Coerce(float), | ||
| vol.Required("delta_h"): vol.Coerce(float), | ||
| } | ||
| ) | ||
| class PricingRules(TypedDict): | ||
| """Data structure to represent custom pricing rules.""" | ||
| p1_kw_year_eur: float | ||
| p2_kw_year_eur: float | ||
| p1_kwh_eur: float | None | ||
| p2_kwh_eur: float | None | ||
| p3_kwh_eur: float | None | ||
| surplus_p1_kwh_eur: float | None | ||
| surplus_p2_kwh_eur: float | None | ||
| surplus_p3_kwh_eur: float | None | ||
| meter_month_eur: float | ||
| market_kw_year_eur: float | ||
| electricity_tax: float | ||
| iva_tax: float | ||
| energy_formula: str | None | ||
| power_formula: str | None | ||
| others_formula: str | None | ||
| surplus_formula: str | None | ||
| cycle_start_day: int | None | ||
| PricingRulesSchema = vol.Schema( | ||
| { | ||
| vol.Required("p1_kw_year_eur"): vol.Coerce(float), | ||
| vol.Required("p2_kw_year_eur"): vol.Coerce(float), | ||
| vol.Optional("p1_kwh_eur", default=None): vol.Union(vol.Coerce(float), None), | ||
| vol.Optional("p2_kwh_eur", default=None): vol.Union(vol.Coerce(float), None), | ||
| vol.Optional("p3_kwh_eur", default=None): vol.Union(vol.Coerce(float), None), | ||
| vol.Optional("surplus_p1_kwh_eur", default=None): vol.Union( | ||
| vol.Coerce(float), None | ||
| ), | ||
| vol.Optional("surplus_p2_kwh_eur", default=None): vol.Union( | ||
| vol.Coerce(float), None | ||
| ), | ||
| vol.Optional("surplus_p3_kwh_eur", default=None): vol.Union( | ||
| vol.Coerce(float), None | ||
| ), | ||
| vol.Required("meter_month_eur"): vol.Coerce(float), | ||
| vol.Required("market_kw_year_eur"): vol.Coerce(float), | ||
| vol.Required("electricity_tax"): vol.Coerce(float), | ||
| vol.Required("iva_tax"): vol.Coerce(float), | ||
| vol.Optional("energy_formula", default=DEFAULT_BILLING_ENERGY_FORMULA): str, | ||
| vol.Optional("power_formula", default=DEFAULT_BILLING_POWER_FORMULA): str, | ||
| vol.Optional("others_formula", default=DEFAULT_BILLING_OTHERS_FORMULA): str, | ||
| vol.Optional("surplus_formula", default=DEFAULT_BILLING_SURPLUS_FORMULA): str, | ||
| vol.Optional("main_formula", default=DEFAULT_BILLING_MAIN_FORMULA): str, | ||
| vol.Optional("cycle_start_day", default=1): vol.Range(1, 30), | ||
| } | ||
| ) | ||
| DEFAULT_PVPC_RULES = PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.05, | ||
| ) | ||
| class ConsumptionAggData(TypedDict): | ||
| """A dict holding a Consumption item.""" | ||
| datetime: dt.datetime | ||
| value_kWh: float | ||
| value_p1_kWh: float | ||
| value_p2_kWh: float | ||
| value_p3_kWh: float | ||
| surplus_kWh: float | ||
| surplus_p1_kWh: float | ||
| surplus_p2_kWh: float | ||
| surplus_p3_kWh: float | ||
| delta_h: float | ||
| ConsumptionAggSchema = vol.Schema( | ||
| { | ||
| vol.Required("datetime"): dt.datetime, | ||
| vol.Required("value_kWh"): vol.Coerce(float), | ||
| vol.Required("value_p1_kWh"): vol.Coerce(float), | ||
| vol.Required("value_p2_kWh"): vol.Coerce(float), | ||
| vol.Required("value_p3_kWh"): vol.Coerce(float), | ||
| vol.Optional("surplus_kWh", default=0): vol.Coerce(float), | ||
| vol.Optional("surplus_p1_kWh", default=0): vol.Coerce(float), | ||
| vol.Optional("surplus_p2_kWh", default=0): vol.Coerce(float), | ||
| vol.Optional("surplus_p3_kWh", default=0): vol.Coerce(float), | ||
| vol.Required("delta_h"): vol.Coerce(float), | ||
| } | ||
| ) | ||
| class PricingAggData(TypedDict): | ||
| """A dict holding a Billing item.""" | ||
| datetime: dt.datetime | ||
| value_eur: float | ||
| energy_term: float | ||
| power_term: float | ||
| others_term: float | ||
| surplus_term: float | ||
| delta_h: float | ||
| PricingAggSchema = vol.Schema( | ||
| { | ||
| vol.Required("datetime"): dt.datetime, | ||
| vol.Required("value_eur"): vol.Coerce(float), | ||
| vol.Required("energy_term"): vol.Coerce(float), | ||
| vol.Required("power_term"): vol.Coerce(float), | ||
| vol.Required("others_term"): vol.Coerce(float), | ||
| vol.Optional("surplus_term", default=0): vol.Coerce(float), | ||
| vol.Optional("delta_h", default=1): vol.Coerce(float), | ||
| }, | ||
| ) | ||
| class EdataData(TypedDict): | ||
| """A Typed Dict to handle Edata Aggregated Data.""" | ||
| supplies: list[SupplyData] | ||
| contracts: list[ContractData] | ||
| consumptions: list[ConsumptionData] | ||
| maximeter: list[MaxPowerData] | ||
| pvpc: list[PricingData] | ||
| consumptions_daily_sum: list[ConsumptionAggData] | ||
| consumptions_monthly_sum: list[ConsumptionAggData] | ||
| cost_hourly_sum: list[PricingAggData] | ||
| cost_daily_sum: list[PricingAggData] | ||
| cost_monthly_sum: list[PricingAggData] | ||
| EdataSchema = vol.Schema( | ||
| { | ||
| vol.Required("supplies"): [SupplySchema], | ||
| vol.Required("contracts"): [ContractSchema], | ||
| vol.Required("consumptions"): [ConsumptionSchema], | ||
| vol.Required("maximeter"): [MaxPowerSchema], | ||
| vol.Optional("pvpc", default=[]): [PricingSchema], | ||
| vol.Optional("consumptions_daily_sum", []): [ConsumptionAggSchema], | ||
| vol.Optional("consumptions_monthly_sum", []): [ConsumptionAggSchema], | ||
| vol.Optional("cost_hourly_sum", default=[]): [PricingAggSchema], | ||
| vol.Optional("cost_daily_sum", default=[]): [PricingAggSchema], | ||
| vol.Optional("cost_monthly_sum", default=[]): [PricingAggSchema], | ||
| } | ||
| ) |
-769
| """A module for edata helpers.""" | ||
| import asyncio | ||
| import contextlib | ||
| from datetime import datetime, timedelta | ||
| import logging | ||
| import os | ||
| from dateutil.relativedelta import relativedelta | ||
| import requests | ||
| from . import const | ||
| from .connectors.datadis import DatadisConnector | ||
| from .connectors.redata import REDataConnector | ||
| from .definitions import ATTRIBUTES, EdataData, PricingRules | ||
| from .processors import utils | ||
| from .processors.billing import BillingInput, BillingProcessor | ||
| from .processors.consumption import ConsumptionProcessor | ||
| from .processors.maximeter import MaximeterProcessor | ||
| from .storage import check_storage_integrity, dump_storage, load_storage | ||
| _LOGGER = logging.getLogger(__name__) | ||
| def acups(cups): | ||
| """Print an abbreviated and anonymized CUPS.""" | ||
| return cups[-5:] | ||
| class EdataHelper: | ||
| """Main EdataHelper class.""" | ||
| UPDATE_INTERVAL = timedelta(hours=1) | ||
| def __init__( | ||
| self, | ||
| datadis_username: str, | ||
| datadis_password: str, | ||
| cups: str, | ||
| datadis_authorized_nif: str | None = None, | ||
| pricing_rules: PricingRules | None = None, | ||
| storage_dir_path: str | None = None, | ||
| data: EdataData | None = None, | ||
| ) -> None: | ||
| self.data = EdataData( | ||
| supplies=[], | ||
| contracts=[], | ||
| consumptions=[], | ||
| maximeter=[], | ||
| pvpc=[], | ||
| consumptions_daily_sum=[], | ||
| consumptions_monthly_sum=[], | ||
| cost_hourly_sum=[], | ||
| cost_daily_sum=[], | ||
| cost_monthly_sum=[], | ||
| ) | ||
| self.attributes = {} | ||
| self._storage_dir = storage_dir_path | ||
| self._cups = cups | ||
| self._scups = acups(cups) | ||
| self._authorized_nif = datadis_authorized_nif | ||
| self.last_update = {x: datetime(1970, 1, 1) for x in self.data} | ||
| self._date_from = datetime(1970, 1, 1) | ||
| self._date_to = datetime.today() | ||
| self._must_dump = True | ||
| self._incremental_update = True | ||
| if data is not None: | ||
| data = check_storage_integrity(data) | ||
| self.data = data | ||
| else: | ||
| with contextlib.suppress(Exception): | ||
| self.data = load_storage(self._cups, self._storage_dir) | ||
| for attr in ATTRIBUTES: | ||
| self.attributes[attr] = None | ||
| self.datadis_api = DatadisConnector( | ||
| datadis_username, | ||
| datadis_password, | ||
| storage_path=( | ||
| os.path.join(storage_dir_path, const.PROG_NAME) | ||
| if storage_dir_path is not None | ||
| else None | ||
| ), | ||
| ) | ||
| self.redata_api = REDataConnector() | ||
| self.pricing_rules = pricing_rules | ||
| if self.pricing_rules is not None: | ||
| self.enable_billing = True | ||
| if not all( | ||
| x in self.pricing_rules and self.pricing_rules[x] is not None | ||
| for x in ("p1_kwh_eur", "p2_kwh_eur", "p3_kwh_eur") | ||
| ): | ||
| self.is_pvpc = True | ||
| else: | ||
| self.is_pvpc = False | ||
| else: | ||
| self.enable_billing = False | ||
| self.is_pvpc = False | ||
| async def async_update( | ||
| self, | ||
| date_from: datetime = datetime(1970, 1, 1), | ||
| date_to: datetime = datetime.today(), | ||
| incremental_update: bool = True, | ||
| ): | ||
| """Async call of update method.""" | ||
| _LOGGER.info( | ||
| "%s: update triggered", | ||
| self._scups, | ||
| ) | ||
| self._date_from = date_from | ||
| self._date_to = date_to | ||
| # update datadis resources | ||
| await self.update_datadis(self._cups, date_from, date_to) | ||
| # update redata resources if pvpc is requested | ||
| if self.is_pvpc: | ||
| try: | ||
| await asyncio.to_thread(self.update_redata, date_from, date_to) | ||
| except requests.exceptions.Timeout: | ||
| _LOGGER.error("Timeout exception while updating from REData") | ||
| await asyncio.to_thread(self.process_data, incremental_update=incremental_update) | ||
| if self._must_dump: | ||
| await asyncio.to_thread(dump_storage, self._cups, self.data, self._storage_dir) | ||
| def update( | ||
| self, | ||
| date_from: datetime = datetime(1970, 1, 1), | ||
| date_to: datetime = datetime.today(), | ||
| incremental_update: bool = True, | ||
| ): | ||
| """Update synchronously.""" | ||
| asyncio.run(self.async_update(date_from, date_to, incremental_update)) | ||
| async def update_supplies(self): | ||
| """Update supplies.""" | ||
| _LOGGER.debug("%s: supplies update triggered", self._scups) | ||
| if datetime.today().date() != self.last_update["supplies"].date(): | ||
| # if supplies haven't been updated today | ||
| supplies = await self.datadis_api.async_get_supplies( | ||
| authorized_nif=self._authorized_nif | ||
| ) # fetch supplies | ||
| if len(supplies) > 0: | ||
| self.data["supplies"] = supplies | ||
| # if we got something, update last_update flag | ||
| self.last_update["supplies"] = datetime.now() | ||
| _LOGGER.info("%s: supplies update succeeded", self._scups) | ||
| else: | ||
| _LOGGER.info("%s: supplies are already updated (skipping)", self._scups) | ||
| async def update_contracts(self, cups: str, distributor_code: str): | ||
| """Update contracts.""" | ||
| _LOGGER.debug("%s: contracts update triggered", self._scups) | ||
| if datetime.today().date() != self.last_update["contracts"].date(): | ||
| # if contracts haven't been updated today | ||
| contracts = await self.datadis_api.async_get_contract_detail( | ||
| cups, distributor_code, authorized_nif=self._authorized_nif | ||
| ) | ||
| if len(contracts) > 0: | ||
| self.data["contracts"] = utils.extend_by_key( | ||
| self.data["contracts"], contracts, "date_start" | ||
| ) # extend contracts data with new ones | ||
| # if we got something, update last_update flag | ||
| self.last_update["contracts"] = datetime.now() | ||
| _LOGGER.info("%s: contracts update succeeded", self._scups) | ||
| else: | ||
| _LOGGER.info("%s: contracts are already updated (skipping)", self._scups) | ||
| async def update_consumptions( | ||
| self, | ||
| cups: str, | ||
| distributor_code: str, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| measurement_type: str, | ||
| point_type: int, | ||
| ): | ||
| """Update consumptions.""" | ||
| _LOGGER.debug("%s: consumptions update triggered", self._scups) | ||
| if (datetime.now() - self.last_update["consumptions"]) > self.UPDATE_INTERVAL: | ||
| consumptions = await self.datadis_api.async_get_consumption_data( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| measurement_type, | ||
| point_type, | ||
| authorized_nif=self._authorized_nif, | ||
| ) | ||
| if len(consumptions) > 0: | ||
| _LOGGER.info( | ||
| "%s: got consumptions from %s to %s", | ||
| self._scups, | ||
| consumptions[0]["datetime"].isoformat(), | ||
| consumptions[-1]["datetime"].isoformat(), | ||
| ) | ||
| self.data["consumptions"] = utils.extend_by_key( | ||
| self.data["consumptions"], consumptions, "datetime" | ||
| ) | ||
| self.last_update["consumptions"] = datetime.now() | ||
| else: | ||
| _LOGGER.info("%s: consumptions are up to date", self._scups) | ||
| else: | ||
| _LOGGER.info("%s: consumptions are already updated (skipping)", self._scups) | ||
| async def update_maximeter(self, cups, distributor_code, start_date, end_date): | ||
| """Update maximeter.""" | ||
| _LOGGER.debug("%s: maximeter update triggered", self._scups) | ||
| if (datetime.now() - self.last_update["maximeter"]) > self.UPDATE_INTERVAL: | ||
| maximeter = await self.datadis_api.async_get_max_power( | ||
| cups, | ||
| distributor_code, | ||
| start_date, | ||
| end_date, | ||
| authorized_nif=self._authorized_nif, | ||
| ) | ||
| if len(maximeter) > 0: | ||
| _LOGGER.info( | ||
| "%s: maximeter update succeeded", | ||
| self._scups, | ||
| ) | ||
| self.data["maximeter"] = utils.extend_by_key( | ||
| self.data["maximeter"], maximeter, "datetime" | ||
| ) | ||
| self.last_update["maximeter"] = datetime.now() | ||
| else: | ||
| _LOGGER.info("%s: maximeter is up to date", self._scups) | ||
| else: | ||
| _LOGGER.info("%s: maximeter is already updated (skipping)", self._scups) | ||
| async def update_datadis( | ||
| self, | ||
| cups: str, | ||
| date_from: datetime = datetime(1970, 1, 1), | ||
| date_to: datetime = datetime.today(), | ||
| ): | ||
| """Update all data from Datadis.""" | ||
| _LOGGER.info( | ||
| "%s: datadis update triggered (from %s to %s)", | ||
| self._scups, | ||
| date_from.isoformat(), | ||
| date_to.isoformat(), | ||
| ) | ||
| # update supplies and get distributorCode | ||
| await self.update_supplies() | ||
| if len(self.data["supplies"]) == 0: | ||
| # return if no supplies were discovered | ||
| _LOGGER.warning( | ||
| "%s: supplies update failed or no supplies found in the provided account", | ||
| self._scups, | ||
| ) | ||
| return False | ||
| # find requested cups in supplies | ||
| supply = utils.get_by_key(self.data["supplies"], "cups", cups) | ||
| if supply is None: | ||
| # return if specified cups seems not valid | ||
| _LOGGER.error( | ||
| "%s: CUPS not found. Got: %s", | ||
| self._scups, | ||
| [acups(x["cups"]) for x in self.data["supplies"]], | ||
| ) | ||
| return False | ||
| _LOGGER.info("%s: CUPS found in account", self._scups) | ||
| # get some supply-related data | ||
| supply_date_start = supply["date_start"] | ||
| distributor_code = supply["distributorCode"] | ||
| point_type = supply["pointType"] | ||
| _LOGGER.info( | ||
| "%s: CUPS start date is %s", self._scups, supply_date_start.isoformat() | ||
| ) | ||
| _LOGGER.info( | ||
| "%s: CUPS end date is %s", self._scups, supply["date_end"].isoformat() | ||
| ) | ||
| # update contracts to get valid periods | ||
| await self.update_contracts(cups, distributor_code) | ||
| if len(self.data["contracts"]) == 0: | ||
| _LOGGER.warning( | ||
| "%s: contracts update failed or no contracts found in the provided account", | ||
| self._scups, | ||
| ) | ||
| # return False | ||
| # filter consumptions and maximeter, and log gaps | ||
| def sort_and_filter(dt_from, dt_to): | ||
| self.data["consumptions"], miss_cons = utils.extract_dt_ranges( | ||
| self.data["consumptions"], | ||
| dt_from, | ||
| dt_to, | ||
| gap_interval=timedelta(hours=6), | ||
| ) | ||
| self.data["maximeter"], miss_maxim = utils.extract_dt_ranges( | ||
| self.data["maximeter"], | ||
| dt_from, | ||
| dt_to, | ||
| gap_interval=timedelta(days=60), | ||
| ) | ||
| return miss_cons, miss_maxim | ||
| miss_cons, miss_maxim = sort_and_filter(date_from, date_to) | ||
| # update consumptions | ||
| _LOGGER.info( | ||
| "%s: missing consumptions: %s", | ||
| self._scups, | ||
| ", ".join( | ||
| [ | ||
| "from " | ||
| + (x["from"] + timedelta(hours=1)).isoformat() | ||
| + " to " | ||
| + x["to"].isoformat() | ||
| for x in miss_cons | ||
| ] | ||
| ), | ||
| ) | ||
| for gap in miss_cons: | ||
| if not ( | ||
| gap["to"] < supply["date_start"] or gap["from"] > supply["date_end"] | ||
| ): | ||
| # fetch consumptions for each consumptions gap in valid periods | ||
| start = max([gap["from"] + timedelta(hours=1), supply["date_start"]]) | ||
| end = min([gap["to"], supply["date_end"]]) | ||
| _LOGGER.info( | ||
| "%s: requesting consumptions from %s to %s", | ||
| self._scups, | ||
| start.isoformat(), | ||
| end.isoformat(), | ||
| ) | ||
| await self.update_consumptions( | ||
| cups, | ||
| distributor_code, | ||
| start, | ||
| end, | ||
| "0", | ||
| point_type, | ||
| ) | ||
| # update maximeter | ||
| _LOGGER.info( | ||
| "%s: missing maximeter: %s", | ||
| self._scups, | ||
| ", ".join( | ||
| [ | ||
| "from " + x["from"].isoformat() + " to " + x["to"].isoformat() | ||
| for x in miss_maxim | ||
| ] | ||
| ), | ||
| ) | ||
| for gap in miss_maxim: | ||
| if not (date_to < supply["date_start"] or date_from > supply["date_end"]): | ||
| # fetch maximeter for each maximeter gap in valid periods | ||
| start = max( | ||
| [gap["from"], supply["date_start"] + relativedelta(months=1)] | ||
| ) | ||
| end = min([gap["to"], supply["date_end"]]) | ||
| start = min([start, end]) | ||
| _LOGGER.info( | ||
| "%s: requesting maximeter from %s to %s", | ||
| self._scups, | ||
| start.isoformat(), | ||
| end.isoformat(), | ||
| ) | ||
| await self.update_maximeter(cups, distributor_code, start, end) | ||
| miss_cons, miss_maxim = sort_and_filter(date_from, date_to) | ||
| return True | ||
| def update_redata( | ||
| self, | ||
| date_from: datetime = (datetime.today() - timedelta(days=30)).replace( | ||
| hour=0, minute=0 | ||
| ), | ||
| date_to: datetime = (datetime.today() + timedelta(days=2)).replace( | ||
| hour=0, minute=0 | ||
| ), | ||
| ): | ||
| """Fetch PVPC prices using REData API.""" | ||
| _LOGGER.info( | ||
| "%s: updating PVPC prices", | ||
| self._scups, | ||
| ) | ||
| self.data["pvpc"], missing = utils.extract_dt_ranges( | ||
| self.data["pvpc"], | ||
| date_from, | ||
| date_to, | ||
| gap_interval=timedelta(hours=1), | ||
| ) | ||
| for gap in missing: | ||
| prices = [] | ||
| gap["from"] = max( | ||
| (datetime.today() - timedelta(days=30)).replace(hour=0, minute=0), | ||
| gap["from"], | ||
| ) | ||
| while len(prices) == 0 and gap["from"] < gap["to"]: | ||
| prices = self.redata_api.get_realtime_prices(gap["from"], gap["to"]) | ||
| gap["from"] = gap["from"] + timedelta(days=1) | ||
| self.data["pvpc"] = utils.extend_by_key( | ||
| self.data["pvpc"], prices, "datetime" | ||
| ) | ||
| return True | ||
| def process_data(self, incremental_update: bool = True): | ||
| """Process all raw data.""" | ||
| self._incremental_update = incremental_update | ||
| for process_method in [ | ||
| self.process_supplies, | ||
| self.process_contracts, | ||
| self.process_consumptions, | ||
| self.process_maximeter, | ||
| self.process_cost, | ||
| ]: | ||
| try: | ||
| process_method() | ||
| except Exception as ex: # pylint: disable=broad-except | ||
| _LOGGER.error("Unhandled exception while updating attributes") | ||
| _LOGGER.exception(ex) | ||
| for attribute in self.attributes: | ||
| if attribute in ATTRIBUTES and ATTRIBUTES[attribute] is not None: | ||
| self.attributes[attribute] = ( | ||
| round(self.attributes[attribute], 2) | ||
| if self.attributes[attribute] is not None | ||
| else None | ||
| ) | ||
| if not incremental_update: | ||
| dump_storage(self._cups, self.data, self._storage_dir) | ||
| def process_supplies(self): | ||
| """Process supplies data.""" | ||
| for i in self.data["supplies"]: | ||
| if i["cups"] == self._cups: | ||
| self.attributes["cups"] = self._cups | ||
| break | ||
| def process_contracts(self): | ||
| """Process contracts data.""" | ||
| most_recent_date = datetime(1970, 1, 1) | ||
| for i in self.data["contracts"]: | ||
| if i["date_end"] > most_recent_date: | ||
| most_recent_date = i["date_end"] | ||
| self.attributes["contract_p1_kW"] = i.get("power_p1", None) | ||
| self.attributes["contract_p2_kW"] = i.get("power_p2", None) | ||
| break | ||
| def process_consumptions(self): | ||
| """Process consumptions data.""" | ||
| if len(self.data["consumptions"]) > 0: | ||
| new_data_from = self._date_from | ||
| if self._incremental_update: | ||
| with contextlib.suppress(Exception): | ||
| new_data_from = self.data["consumptions_monthly_sum"][-1][ | ||
| "datetime" | ||
| ] | ||
| proc = ConsumptionProcessor( | ||
| { | ||
| "consumptions": [ | ||
| x | ||
| for x in self.data["consumptions"] | ||
| if x["datetime"] >= new_data_from | ||
| ], | ||
| "cycle_start_day": 1, | ||
| } | ||
| ) | ||
| today_starts = datetime( | ||
| datetime.today().year, | ||
| datetime.today().month, | ||
| datetime.today().day, | ||
| 0, | ||
| 0, | ||
| 0, | ||
| ) | ||
| month_starts = datetime( | ||
| datetime.today().year, datetime.today().month, 1, 0, 0, 0 | ||
| ) | ||
| # append new data | ||
| self.data["consumptions_daily_sum"] = utils.extend_and_filter( | ||
| self.data["consumptions_daily_sum"], | ||
| proc.output["daily"], | ||
| "datetime", | ||
| self._date_from, | ||
| self._date_to, | ||
| ) | ||
| self.data["consumptions_monthly_sum"] = utils.extend_and_filter( | ||
| self.data["consumptions_monthly_sum"], | ||
| proc.output["monthly"], | ||
| "datetime", | ||
| self._date_from, | ||
| self._date_to, | ||
| ) | ||
| yday = utils.get_by_key( | ||
| self.data["consumptions_daily_sum"], | ||
| "datetime", | ||
| today_starts - timedelta(days=1), | ||
| ) | ||
| self.attributes["yesterday_kWh"] = ( | ||
| yday.get("value_kWh", None) if yday is not None else None | ||
| ) | ||
| for tariff in (1, 2, 3): | ||
| self.attributes[f"yesterday_p{tariff}_kWh"] = ( | ||
| yday.get(f"value_p{tariff}_kWh", None) if yday is not None else None | ||
| ) | ||
| self.attributes["yesterday_surplus_kWh"] = ( | ||
| yday.get("surplus_kWh", None) if yday is not None else None | ||
| ) | ||
| for tariff in (1, 2, 3): | ||
| self.attributes[f"yesterday_surplus_p{tariff}_kWh"] = ( | ||
| yday.get(f"surplus_p{tariff}_kWh", None) | ||
| if yday is not None | ||
| else None | ||
| ) | ||
| self.attributes["yesterday_hours"] = ( | ||
| yday.get("delta_h", None) if yday is not None else None | ||
| ) | ||
| month = utils.get_by_key( | ||
| self.data["consumptions_monthly_sum"], "datetime", month_starts | ||
| ) | ||
| self.attributes["month_kWh"] = ( | ||
| month.get("value_kWh", None) if month is not None else None | ||
| ) | ||
| self.attributes["month_surplus_kWh"] = ( | ||
| month.get("surplus_kWh", None) if month is not None else None | ||
| ) | ||
| self.attributes["month_days"] = ( | ||
| month.get("delta_h", 0) / 24 if month is not None else None | ||
| ) | ||
| self.attributes["month_daily_kWh"] = ( | ||
| ( | ||
| (self.attributes["month_kWh"] / self.attributes["month_days"]) | ||
| if self.attributes["month_days"] > 0 | ||
| else 0 | ||
| ) | ||
| if month is not None | ||
| else None | ||
| ) | ||
| for tariff in (1, 2, 3): | ||
| self.attributes[f"month_p{tariff}_kWh"] = ( | ||
| month.get(f"value_p{tariff}_kWh", None) | ||
| if month is not None | ||
| else None | ||
| ) | ||
| self.attributes[f"month_surplus_p{tariff}_kWh"] = ( | ||
| month.get(f"surplus_p{tariff}_kWh", None) | ||
| if month is not None | ||
| else None | ||
| ) | ||
| last_month = utils.get_by_key( | ||
| self.data["consumptions_monthly_sum"], | ||
| "datetime", | ||
| (month_starts - relativedelta(months=1)), | ||
| ) | ||
| self.attributes["last_month_kWh"] = ( | ||
| last_month.get("value_kWh", None) if last_month is not None else None | ||
| ) | ||
| self.attributes["last_month_surplus_kWh"] = ( | ||
| last_month.get("surplus_kWh", None) if last_month is not None else None | ||
| ) | ||
| self.attributes["last_month_days"] = ( | ||
| last_month.get("delta_h", 0) / 24 if last_month is not None else None | ||
| ) | ||
| self.attributes["last_month_daily_kWh"] = ( | ||
| ( | ||
| ( | ||
| self.attributes["last_month_kWh"] | ||
| / self.attributes["last_month_days"] | ||
| ) | ||
| if self.attributes["last_month_days"] > 0 | ||
| else 0 | ||
| ) | ||
| if last_month is not None | ||
| else None | ||
| ) | ||
| for tariff in (1, 2, 3): | ||
| self.attributes[f"last_month_p{tariff}_kWh"] = ( | ||
| last_month.get(f"value_p{tariff}_kWh", None) | ||
| if last_month is not None | ||
| else None | ||
| ) | ||
| self.attributes[f"last_month_surplus_p{tariff}_kWh"] = ( | ||
| last_month.get(f"surplus_p{tariff}_kWh", None) | ||
| if last_month is not None | ||
| else None | ||
| ) | ||
| if len(self.data["consumptions"]) > 0: | ||
| self.attributes["last_registered_date"] = self.data["consumptions"][-1][ | ||
| "datetime" | ||
| ] | ||
| if len(self.data["consumptions_daily_sum"]) > 0: | ||
| last_day = self.data["consumptions_daily_sum"][-1] | ||
| self.attributes["last_registered_day_kWh"] = ( | ||
| last_day.get("value_kWh", None) | ||
| if last_day is not None | ||
| else None | ||
| ) | ||
| self.attributes["last_registered_day_surplus_kWh"] = ( | ||
| last_day.get("surplus_kWh", None) | ||
| if last_day is not None | ||
| else None | ||
| ) | ||
| for tariff in (1, 2, 3): | ||
| self.attributes[f"last_registered_day_p{tariff}_kWh"] = ( | ||
| last_day.get(f"value_p{tariff}_kWh", None) | ||
| if last_day is not None | ||
| else None | ||
| ) | ||
| self.attributes[ | ||
| f"last_registered_day_surplus_p{tariff}_kWh" | ||
| ] = ( | ||
| last_day.get(f"surplus_p{tariff}_kWh", None) | ||
| if last_day is not None | ||
| else None | ||
| ) | ||
| self.attributes["last_registered_day_hours"] = ( | ||
| last_day.get("delta_h", None) if last_day is not None else None | ||
| ) | ||
| def process_maximeter(self): | ||
| """Process maximeter data.""" | ||
| if len(self.data["maximeter"]) > 0: | ||
| processor = MaximeterProcessor(self.data["maximeter"]) | ||
| last_relative_year = processor.output["stats"] | ||
| self.attributes["max_power_kW"] = last_relative_year.get( | ||
| "value_max_kW", None | ||
| ) | ||
| self.attributes["max_power_date"] = last_relative_year.get("date_max", None) | ||
| self.attributes["max_power_mean_kW"] = last_relative_year.get( | ||
| "value_mean_kW", None | ||
| ) | ||
| self.attributes["max_power_90perc_kW"] = last_relative_year.get( | ||
| "value_tile90_kW", None | ||
| ) | ||
| def process_cost(self): | ||
| """Process costs.""" | ||
| if self.enable_billing: | ||
| new_data_from = self._date_from | ||
| if self._incremental_update: | ||
| with contextlib.suppress(Exception): | ||
| new_data_from = self.data["cost_monthly_sum"][-1]["datetime"] | ||
| proc = BillingProcessor( | ||
| BillingInput( | ||
| contracts=self.data["contracts"], | ||
| consumptions=[ | ||
| x | ||
| for x in self.data["consumptions"] | ||
| if x["datetime"] >= new_data_from | ||
| ], | ||
| prices=( | ||
| [x for x in self.data["pvpc"] if x["datetime"] >= new_data_from] | ||
| if self.is_pvpc | ||
| else None | ||
| ), | ||
| rules=self.pricing_rules, | ||
| ) | ||
| ) | ||
| month_starts = datetime( | ||
| datetime.today().year, datetime.today().month, 1, 0, 0, 0 | ||
| ) | ||
| # append new data | ||
| hourly = proc.output["hourly"] | ||
| self.data["cost_hourly_sum"] = utils.extend_and_filter( | ||
| self.data["cost_hourly_sum"], | ||
| hourly, | ||
| "datetime", | ||
| self._date_from, | ||
| self._date_to, | ||
| ) | ||
| daily = proc.output["daily"] | ||
| self.data["cost_daily_sum"] = utils.extend_and_filter( | ||
| self.data["cost_daily_sum"], | ||
| daily, | ||
| "datetime", | ||
| self._date_from, | ||
| self._date_to, | ||
| ) | ||
| monthly = proc.output["monthly"] | ||
| self.data["cost_monthly_sum"] = utils.extend_and_filter( | ||
| self.data["cost_monthly_sum"], | ||
| monthly, | ||
| "datetime", | ||
| self._date_from, | ||
| self._date_to, | ||
| ) | ||
| this_month = utils.get_by_key( | ||
| self.data["cost_monthly_sum"], | ||
| "datetime", | ||
| month_starts, | ||
| ) | ||
| last_month = utils.get_by_key( | ||
| self.data["cost_monthly_sum"], | ||
| "datetime", | ||
| (month_starts - relativedelta(months=1)), | ||
| ) | ||
| if this_month is not None: | ||
| self.attributes["month_€"] = this_month.get("value_eur", None) | ||
| if last_month is not None: | ||
| self.attributes["last_month_€"] = last_month.get("value_eur", None) | ||
| def reset(self): | ||
| """Reset in-mem objects.""" | ||
| self.data = EdataData( | ||
| supplies=[], | ||
| contracts=[], | ||
| consumptions=[], | ||
| maximeter=[], | ||
| pvpc=[], | ||
| consumptions_daily_sum=[], | ||
| consumptions_monthly_sum=[], | ||
| cost_hourly_sum=[], | ||
| cost_daily_sum=[], | ||
| cost_monthly_sum=[], | ||
| ) | ||
| for attr in ATTRIBUTES: | ||
| self.attributes[attr] = None | ||
| self.last_update = {x: datetime(1970, 1, 1) for x in self.data} |
| """Base definitions for processors.""" | ||
| from abc import ABC, abstractmethod | ||
| from copy import deepcopy | ||
| from typing import Any | ||
| class Processor(ABC): | ||
| """A base class for data processors.""" | ||
| _LABEL = "Processor" | ||
| def __init__(self, input_data: Any, auto: bool = True) -> None: | ||
| """Init method.""" | ||
| self._input = deepcopy(input_data) | ||
| self._output = None | ||
| if auto: | ||
| self.do_process() | ||
| @abstractmethod | ||
| def do_process(self): | ||
| """Process method.""" | ||
| @property | ||
| def output(self): | ||
| """An output property.""" | ||
| return deepcopy(self._output) |
| """Billing data processors.""" | ||
| import contextlib | ||
| from datetime import datetime, timedelta | ||
| import logging | ||
| from typing import Optional, TypedDict | ||
| from jinja2 import Environment | ||
| import voluptuous | ||
| from ..definitions import ( | ||
| ConsumptionData, | ||
| ConsumptionSchema, | ||
| ContractData, | ||
| ContractSchema, | ||
| PricingAggData, | ||
| PricingData, | ||
| PricingRules, | ||
| PricingRulesSchema, | ||
| PricingSchema, | ||
| ) | ||
| from ..processors import utils | ||
| from ..processors.base import Processor | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class BillingOutput(TypedDict): | ||
| """A dict holding BillingProcessor output property.""" | ||
| hourly: list[PricingAggData] | ||
| daily: list[PricingAggData] | ||
| monthly: list[PricingAggData] | ||
| class BillingInput(TypedDict): | ||
| """A dict holding BillingProcessor input data.""" | ||
| contracts: list[ContractData] | ||
| consumptions: list[ConsumptionData] | ||
| prices: Optional[list[PricingData]] | ||
| rules: PricingRules | ||
| class BillingProcessor(Processor): | ||
| """A billing processor for edata.""" | ||
| def do_process(self): | ||
| """Process billing and get hourly/daily/monthly metrics.""" | ||
| self._output = BillingOutput(hourly=[], daily=[], monthly=[]) | ||
| _schema = voluptuous.Schema( | ||
| { | ||
| voluptuous.Required("contracts"): [ContractSchema], | ||
| voluptuous.Required("consumptions"): [ConsumptionSchema], | ||
| voluptuous.Optional("prices", default=None): voluptuous.Union( | ||
| [voluptuous.Union(PricingSchema)], None | ||
| ), | ||
| voluptuous.Required("rules"): PricingRulesSchema, | ||
| } | ||
| ) | ||
| self._input = _schema(self._input) | ||
| self._cycle_offset = self._input["rules"]["cycle_start_day"] - 1 | ||
| # joint data by datetime | ||
| _data = { | ||
| x["datetime"]: { | ||
| "datetime": x["datetime"], | ||
| "kwh": x["value_kWh"], | ||
| "surplus_kwh": x["surplus_kWh"] if x["surplus_kWh"] is not None else 0, | ||
| } | ||
| for x in self._input["consumptions"] | ||
| } | ||
| for contract in self._input["contracts"]: | ||
| start = contract["date_start"] | ||
| end = contract["date_end"] | ||
| finish = False | ||
| while not finish: | ||
| if start in _data: | ||
| _data[start]["p1_kw"] = contract["power_p1"] | ||
| _data[start]["p2_kw"] = contract["power_p2"] | ||
| start = start + timedelta(hours=1) | ||
| finish = not (end > start) | ||
| if self._input["prices"]: | ||
| for x in self._input["prices"]: | ||
| start = x["datetime"] | ||
| if start in _data: | ||
| _data[start]["kwh_eur"] = x["value_eur_kWh"] | ||
| env = Environment() | ||
| energy_expr = env.compile_expression( | ||
| f'({self._input["rules"]["energy_formula"]})|float' | ||
| ) | ||
| power_expr = env.compile_expression( | ||
| f'({self._input["rules"]["power_formula"]})|float' | ||
| ) | ||
| others_expr = env.compile_expression( | ||
| f'({self._input["rules"]["others_formula"]})|float' | ||
| ) | ||
| surplus_expr = env.compile_expression( | ||
| f'({self._input["rules"]["surplus_formula"]})|float' | ||
| ) | ||
| main_expr = env.compile_expression( | ||
| f'({self._input["rules"]["main_formula"]})|float' | ||
| ) | ||
| _data = sorted([_data[x] for x in _data], key=lambda x: x["datetime"]) | ||
| hourly = [] | ||
| for x in _data: | ||
| x.update(self._input["rules"]) | ||
| tariff = utils.get_pvpc_tariff(x["datetime"]) | ||
| if "kwh_eur" not in x: | ||
| if tariff == "p1": | ||
| x["kwh_eur"] = x["p1_kwh_eur"] | ||
| elif tariff == "p2": | ||
| x["kwh_eur"] = x["p2_kwh_eur"] | ||
| elif tariff == "p3": | ||
| x["kwh_eur"] = x["p3_kwh_eur"] | ||
| if x["kwh_eur"] is None: | ||
| continue | ||
| if tariff == "p1": | ||
| x["surplus_kwh_eur"] = x["surplus_p1_kwh_eur"] | ||
| elif tariff == "p2": | ||
| x["surplus_kwh_eur"] = x["surplus_p2_kwh_eur"] | ||
| elif tariff == "p3": | ||
| x["surplus_kwh_eur"] = x["surplus_p3_kwh_eur"] | ||
| _energy_term = 0 | ||
| _power_term = 0 | ||
| _others_term = 0 | ||
| _surplus_term = 0 | ||
| with contextlib.suppress(Exception): | ||
| _energy_term = round(energy_expr(**x), 6) | ||
| _power_term = round(power_expr(**x), 6) | ||
| _others_term = round(others_expr(**x), 6) | ||
| _surplus_term = round(surplus_expr(**x), 6) | ||
| new_item = PricingAggData( | ||
| datetime=x["datetime"], | ||
| energy_term=_energy_term, | ||
| power_term=_power_term, | ||
| others_term=_others_term, | ||
| surplus_term=_surplus_term, | ||
| value_eur=0, | ||
| delta_h=1, | ||
| ) | ||
| hourly.append(new_item) | ||
| self._output["hourly"] = hourly | ||
| last_day_dt = None | ||
| last_month_dt = None | ||
| for hour in hourly: | ||
| curr_hour_dt: datetime = hour["datetime"] | ||
| curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0) | ||
| curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace( | ||
| day=1 | ||
| ) | ||
| if last_day_dt is None or curr_day_dt != last_day_dt: | ||
| self._output["daily"].append( | ||
| PricingAggData( | ||
| datetime=curr_day_dt, | ||
| energy_term=hour["energy_term"], | ||
| power_term=hour["power_term"], | ||
| others_term=hour["others_term"], | ||
| surplus_term=hour["surplus_term"], | ||
| value_eur=hour["value_eur"], | ||
| delta_h=hour["delta_h"], | ||
| ) | ||
| ) | ||
| else: | ||
| self._output["daily"][-1]["energy_term"] += hour["energy_term"] | ||
| self._output["daily"][-1]["power_term"] += hour["power_term"] | ||
| self._output["daily"][-1]["others_term"] += hour["others_term"] | ||
| self._output["daily"][-1]["surplus_term"] += hour["surplus_term"] | ||
| self._output["daily"][-1]["delta_h"] += hour["delta_h"] | ||
| self._output["daily"][-1]["value_eur"] += hour["value_eur"] | ||
| if last_month_dt is None or curr_month_dt != last_month_dt: | ||
| self._output["monthly"].append( | ||
| PricingAggData( | ||
| datetime=curr_month_dt, | ||
| energy_term=hour["energy_term"], | ||
| power_term=hour["power_term"], | ||
| others_term=hour["others_term"], | ||
| surplus_term=hour["surplus_term"], | ||
| value_eur=hour["value_eur"], | ||
| delta_h=hour["delta_h"], | ||
| ) | ||
| ) | ||
| else: | ||
| self._output["monthly"][-1]["energy_term"] += hour["energy_term"] | ||
| self._output["monthly"][-1]["power_term"] += hour["power_term"] | ||
| self._output["monthly"][-1]["others_term"] += hour["others_term"] | ||
| self._output["monthly"][-1]["surplus_term"] += hour["surplus_term"] | ||
| self._output["monthly"][-1]["value_eur"] += hour["value_eur"] | ||
| self._output["monthly"][-1]["delta_h"] += hour["delta_h"] | ||
| last_day_dt = curr_day_dt | ||
| last_month_dt = curr_month_dt | ||
| for item in self._output: | ||
| for cost in self._output[item]: | ||
| cost["value_eur"] = round(main_expr(**cost, **self._input["rules"]), 6) | ||
| cost["energy_term"] = round(cost["energy_term"], 6) | ||
| cost["power_term"] = round(cost["power_term"], 6) | ||
| cost["others_term"] = round(cost["others_term"], 6) | ||
| cost["surplus_term"] = round(cost["surplus_term"], 6) |
| """Consumption data processors.""" | ||
| import logging | ||
| from collections.abc import Iterable | ||
| from typing import TypedDict | ||
| from datetime import datetime, timedelta | ||
| import voluptuous | ||
| from ..definitions import ConsumptionAggData, ConsumptionSchema | ||
| from . import utils | ||
| from .base import Processor | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class ConsumptionOutput(TypedDict): | ||
| """A dict holding ConsumptionProcessor output property.""" | ||
| daily: Iterable[ConsumptionAggData] | ||
| monthly: Iterable[ConsumptionAggData] | ||
| class ConsumptionProcessor(Processor): | ||
| """A consumptions processor.""" | ||
| def do_process(self): | ||
| """Calculate daily and monthly consumption stats.""" | ||
| self._output = ConsumptionOutput(daily=[], monthly=[]) | ||
| last_day_dt = None | ||
| last_month_dt = None | ||
| _schema = voluptuous.Schema( | ||
| { | ||
| voluptuous.Required("consumptions"): [ConsumptionSchema], | ||
| voluptuous.Optional("cycle_start_day", default=1): voluptuous.Range( | ||
| 1, 30 | ||
| ), | ||
| } | ||
| ) | ||
| self._input = _schema(self._input) | ||
| self._cycle_offset = self._input["cycle_start_day"] - 1 | ||
| for consumption in self._input["consumptions"]: | ||
| curr_hour_dt: datetime = consumption["datetime"] | ||
| curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0) | ||
| curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace( | ||
| day=1 | ||
| ) | ||
| tariff = utils.get_pvpc_tariff(curr_hour_dt) | ||
| kwh = consumption["value_kWh"] | ||
| surplus_kwh = consumption["surplus_kWh"] | ||
| delta_h = consumption["delta_h"] | ||
| kwh_by_tariff = [0, 0, 0] | ||
| surplus_kwh_by_tariff = [0, 0, 0] | ||
| match tariff: | ||
| case "p1": | ||
| kwh_by_tariff[0] = kwh | ||
| surplus_kwh_by_tariff[0] = surplus_kwh | ||
| case "p2": | ||
| kwh_by_tariff[1] = kwh | ||
| surplus_kwh_by_tariff[1] = surplus_kwh | ||
| case "p3": | ||
| kwh_by_tariff[2] = kwh | ||
| surplus_kwh_by_tariff[2] = surplus_kwh | ||
| if last_day_dt is None or curr_day_dt != last_day_dt: | ||
| self._output["daily"].append( | ||
| ConsumptionAggData( | ||
| datetime=curr_day_dt, | ||
| value_kWh=kwh, | ||
| delta_h=delta_h, | ||
| value_p1_kWh=kwh_by_tariff[0], | ||
| value_p2_kWh=kwh_by_tariff[1], | ||
| value_p3_kWh=kwh_by_tariff[2], | ||
| surplus_kWh=surplus_kwh, | ||
| surplus_p1_kWh=surplus_kwh_by_tariff[0], | ||
| surplus_p2_kWh=surplus_kwh_by_tariff[1], | ||
| surplus_p3_kWh=surplus_kwh_by_tariff[2], | ||
| ) | ||
| ) | ||
| else: | ||
| self._output["daily"][-1]["value_kWh"] += kwh | ||
| self._output["daily"][-1]["value_p1_kWh"] += kwh_by_tariff[0] | ||
| self._output["daily"][-1]["value_p2_kWh"] += kwh_by_tariff[1] | ||
| self._output["daily"][-1]["value_p3_kWh"] += kwh_by_tariff[2] | ||
| self._output["daily"][-1]["surplus_kWh"] += surplus_kwh | ||
| self._output["daily"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[0] | ||
| self._output["daily"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[1] | ||
| self._output["daily"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[2] | ||
| self._output["daily"][-1]["delta_h"] += delta_h | ||
| if last_month_dt is None or curr_month_dt != last_month_dt: | ||
| self._output["monthly"].append( | ||
| ConsumptionAggData( | ||
| datetime=curr_month_dt, | ||
| value_kWh=kwh, | ||
| delta_h=delta_h, | ||
| value_p1_kWh=kwh_by_tariff[0], | ||
| value_p2_kWh=kwh_by_tariff[1], | ||
| value_p3_kWh=kwh_by_tariff[2], | ||
| surplus_kWh=surplus_kwh, | ||
| surplus_p1_kWh=surplus_kwh_by_tariff[0], | ||
| surplus_p2_kWh=surplus_kwh_by_tariff[1], | ||
| surplus_p3_kWh=surplus_kwh_by_tariff[2], | ||
| ) | ||
| ) | ||
| else: | ||
| self._output["monthly"][-1]["value_kWh"] += kwh | ||
| self._output["monthly"][-1]["value_p1_kWh"] += kwh_by_tariff[0] | ||
| self._output["monthly"][-1]["value_p2_kWh"] += kwh_by_tariff[1] | ||
| self._output["monthly"][-1]["value_p3_kWh"] += kwh_by_tariff[2] | ||
| self._output["monthly"][-1]["surplus_kWh"] += surplus_kwh | ||
| self._output["monthly"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[ | ||
| 0 | ||
| ] | ||
| self._output["monthly"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[ | ||
| 1 | ||
| ] | ||
| self._output["monthly"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[ | ||
| 2 | ||
| ] | ||
| self._output["monthly"][-1]["delta_h"] += delta_h | ||
| last_day_dt = curr_day_dt | ||
| last_month_dt = curr_month_dt | ||
| # Round to two decimals | ||
| for item in self._output: | ||
| for cons in self._output[item]: | ||
| for key in cons: | ||
| if isinstance(cons[key], float): | ||
| cons[key] = round(cons[key], 2) |
| """Maximeter data processors.""" | ||
| import logging | ||
| from datetime import datetime | ||
| from typing import TypedDict | ||
| from dateparser import parse | ||
| import voluptuous | ||
| from edata.definitions import MaxPowerSchema | ||
| from edata.processors import utils | ||
| from .base import Processor | ||
| _LOGGER = logging.getLogger(__name__) | ||
| class MaximeterStats(TypedDict): | ||
| """A dict holding MaximeterProcessor stats.""" | ||
| value_max_kW: float | ||
| date_max: datetime | ||
| value_mean_kW: float | ||
| value_tile90_kW: float | ||
| class MaximeterOutput(TypedDict): | ||
| """A dict holding MaximeterProcessor output property.""" | ||
| stats: MaximeterStats | ||
| class MaximeterProcessor(Processor): | ||
| """A processor for Maximeter data.""" | ||
| def do_process(self): | ||
| """Calculate maximeter stats.""" | ||
| self._output = MaximeterOutput(stats={}) | ||
| _schema = voluptuous.Schema([MaxPowerSchema]) | ||
| self._input = _schema(self._input) | ||
| _values = [x["value_kW"] for x in self._input] | ||
| _max_kW = max(_values) | ||
| _dt_max_kW = parse(str(self._input[_values.index(_max_kW)]["datetime"])) | ||
| _mean_kW = sum(_values) / len(_values) | ||
| _tile90_kW = utils.percentile(_values, 0.9) | ||
| self._output["stats"] = MaximeterOutput( | ||
| value_max_kW=round(_max_kW, 2), | ||
| date_max=_dt_max_kW, | ||
| value_mean_kW=round(_mean_kW, 2), | ||
| value_tile90_kW=round(_tile90_kW, 2), | ||
| ) |
| """Generic utilities for processing data.""" | ||
| import json | ||
| import logging | ||
| from copy import deepcopy | ||
| from datetime import date, datetime, timedelta | ||
| from json import JSONEncoder | ||
| import holidays | ||
| import math | ||
| import functools | ||
| import contextlib | ||
| _LOGGER = logging.getLogger(__name__) | ||
| logging.basicConfig(level=logging.INFO) | ||
| HOURS_P1 = [10, 11, 12, 13, 18, 19, 20, 21] | ||
| HOURS_P2 = [8, 9, 14, 15, 16, 17, 22, 23] | ||
| WEEKDAYS_P3 = [5, 6] | ||
| def is_empty(lst): | ||
| """Check if a list is empty.""" | ||
| return len(lst) == 0 | ||
| def extract_dt_ranges(lst, dt_from, dt_to, gap_interval=timedelta(hours=1)): | ||
| """Filter a list of dicts between two datetimes.""" | ||
| new_lst = [] | ||
| missing = [] | ||
| oldest_dt = None | ||
| newest_dt = None | ||
| last_dt = None | ||
| if len(lst) > 0: | ||
| sorted_lst = sorted(lst, key=lambda i: i["datetime"]) | ||
| last_dt = dt_from | ||
| for i in sorted_lst: | ||
| if dt_from <= i["datetime"] <= dt_to: | ||
| if (i["datetime"] - last_dt) > gap_interval: | ||
| missing.append({"from": last_dt, "to": i["datetime"]}) | ||
| if i.get("value_kWh", 1) > 0: | ||
| if oldest_dt is None or i["datetime"] < oldest_dt: | ||
| oldest_dt = i["datetime"] | ||
| if newest_dt is None or i["datetime"] > newest_dt: | ||
| newest_dt = i["datetime"] | ||
| if i["datetime"] != last_dt: # remove duplicates | ||
| new_lst.append(i) | ||
| last_dt = i["datetime"] | ||
| if dt_to > last_dt: | ||
| missing.append({"from": last_dt, "to": dt_to}) | ||
| _LOGGER.debug("found data from %s to %s", oldest_dt, newest_dt) | ||
| else: | ||
| missing.append({"from": dt_from, "to": dt_to}) | ||
| return new_lst, missing | ||
| def extend_by_key(old_lst, new_lst, key): | ||
| """Extend a list of dicts by key.""" | ||
| lst = deepcopy(old_lst) | ||
| temp_list = [] | ||
| for new_element in new_lst: | ||
| for old_element in lst: | ||
| if new_element[key] == old_element[key]: | ||
| for i in old_element: | ||
| old_element[i] = new_element[i] | ||
| break | ||
| else: | ||
| temp_list.append(new_element) | ||
| lst.extend(temp_list) | ||
| return lst | ||
| def extend_and_filter(old_lst, new_lst, key, dt_from, dt_to): | ||
| data = extend_by_key(old_lst, new_lst, key) | ||
| data, _ = extract_dt_ranges( | ||
| data, | ||
| dt_from, | ||
| dt_to, | ||
| gap_interval=timedelta(days=365), # trick | ||
| ) | ||
| return data | ||
| def get_by_key(lst, key, value): | ||
| """Obtain an element of a list of dicts by key=value.""" | ||
| for i in lst: | ||
| if i[key] == value: | ||
| return i | ||
| return None | ||
| def get_pvpc_tariff(a_datetime): | ||
| """Evals the PVPC tariff for a given datetime.""" | ||
| hdays = holidays.country_holidays("ES") | ||
| hour = a_datetime.hour | ||
| weekday = a_datetime.weekday() | ||
| if weekday in WEEKDAYS_P3 or a_datetime.date() in hdays: | ||
| return "p3" | ||
| elif hour in HOURS_P1: | ||
| return "p1" | ||
| elif hour in HOURS_P2: | ||
| return "p2" | ||
| else: | ||
| return "p3" | ||
| def serialize_dict(data: dict) -> dict: | ||
| """Serialize dicts as json.""" | ||
| class DateTimeEncoder(JSONEncoder): | ||
| """Replace datetime objects with ISO strings.""" | ||
| def default(self, o): | ||
| if isinstance(o, (date, datetime)): | ||
| return o.isoformat() | ||
| return json.loads(json.dumps(data, cls=DateTimeEncoder)) | ||
| def deserialize_dict(serialized_dict: dict) -> dict: | ||
| """Deserializes a json replacing ISOTIME strings into datetime.""" | ||
| def datetime_parser(json_dict): | ||
| """Parse JSON while converting ISO strings into datetime objects.""" | ||
| for key, value in json_dict.items(): | ||
| if "date" in key: | ||
| with contextlib.suppress(Exception): | ||
| json_dict[key] = datetime.fromisoformat(value) | ||
| return json_dict | ||
| return json.loads(json.dumps(serialized_dict), object_hook=datetime_parser) | ||
| def percentile(N, percent, key=lambda x: x): | ||
| """Find the percentile of a list of values.""" | ||
| if not N: | ||
| return None | ||
| k = (len(N) - 1) * percent | ||
| f = math.floor(k) | ||
| c = math.ceil(k) | ||
| if f == c: | ||
| return key(N[int(k)]) | ||
| d0 = key(N[int(f)]) * (c - k) | ||
| d1 = key(N[int(c)]) * (k - f) | ||
| return d0 + d1 |
| import json | ||
| import logging | ||
| import os | ||
| from .processors import utils | ||
| from . import const as const | ||
| from . import definitions as defs | ||
| _LOGGER = logging.getLogger(__name__) | ||
| DEFAULT_STORAGE_DIR = os.getenv("HOME") | ||
| RECENT_CACHE_FILENAME = "edata_{id}.json" | ||
| compile_storage_id = lambda cups: cups.lower() | ||
| def check_storage_integrity(data: defs.EdataData): | ||
| """Check if an EdataData object follows a schema.""" | ||
| return defs.EdataSchema(data) | ||
| def load_storage(cups: str, storage_dir: str | None = None): | ||
| """Load EdataData storage from its config dir.""" | ||
| if storage_dir is None: | ||
| storage_dir = DEFAULT_STORAGE_DIR | ||
| _subdir = os.path.join(storage_dir, const.PROG_NAME) | ||
| _recent_cache = os.path.join( | ||
| _subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups)) | ||
| ) | ||
| os.makedirs(_subdir, exist_ok=True) | ||
| with open(_recent_cache, encoding="utf-8") as f: | ||
| return check_storage_integrity(utils.deserialize_dict(json.load(f))) | ||
| def dump_storage(cups: str, storage: defs.EdataData, storage_dir: str | None = None): | ||
| """Update EdataData storage.""" | ||
| if storage_dir is None: | ||
| storage_dir = DEFAULT_STORAGE_DIR | ||
| _subdir = os.path.join(storage_dir, const.PROG_NAME) | ||
| _recent_cache = os.path.join( | ||
| _subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups)) | ||
| ) | ||
| os.makedirs(_subdir, exist_ok=True) | ||
| with open(_recent_cache, "w", encoding="utf-8") as f: | ||
| json.dump(utils.serialize_dict(check_storage_integrity(storage)), f) |
| """A collection of tests for e-data processors""" | ||
| import json | ||
| import pathlib | ||
| from freezegun import freeze_time | ||
| from ..definitions import PricingRules | ||
| from ..helpers import EdataHelper | ||
| from ..processors import utils | ||
| AT_TIME = "2022-10-22" | ||
| TESTS_DIR = str(pathlib.Path(__file__).parent.resolve()) | ||
| TEST_GOOD_INPUT = TESTS_DIR + "/assets/helpers/edata.storage_TEST" | ||
| TEST_EXPECTATIONS_DATA = TESTS_DIR + "/assets/helpers/data.out" | ||
| TEST_EXPECTATIONS_ATTRIBUTES = ( | ||
| TESTS_DIR + f"/assets/helpers/attributes_at_{AT_TIME}.out" | ||
| ) | ||
| PRICING_RULES_PVPC = PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.05, | ||
| p1_kwh_eur=None, | ||
| p2_kwh_eur=None, | ||
| p3_kwh_eur=None, | ||
| ) | ||
| @freeze_time(AT_TIME) | ||
| def test_helper_offline(snapshot) -> None: | ||
| """Tests EdataHelper (syrupy snapshot)""" | ||
| with open(TEST_GOOD_INPUT, "r", encoding="utf-8") as original_file: | ||
| data = utils.deserialize_dict(json.load(original_file)) | ||
| helper = EdataHelper( | ||
| "USER", | ||
| "PASS", | ||
| "CUPS", | ||
| datadis_authorized_nif=None, | ||
| pricing_rules=PRICING_RULES_PVPC, | ||
| data=data, | ||
| ) | ||
| helper.process_data() | ||
| # Compara ambos outputs con snapshot | ||
| assert { | ||
| "data": utils.serialize_dict(helper.data), | ||
| "attributes": utils.serialize_dict(helper.attributes), | ||
| } == snapshot |
| """A collection of tests for e-data processors""" | ||
| import datetime as dt | ||
| import json | ||
| import pathlib | ||
| import typing | ||
| from collections.abc import Iterable | ||
| import pytest | ||
| from ..definitions import PricingData, PricingRules | ||
| from ..processors import utils | ||
| from ..processors.base import Processor | ||
| from ..processors.billing import BillingProcessor | ||
| from ..processors.consumption import ConsumptionProcessor | ||
| from ..processors.maximeter import MaximeterProcessor | ||
| TESTS_DIR = str(pathlib.Path(__file__).parent.resolve()) | ||
| TEST_GOOD_INPUT = TESTS_DIR + "/assets/processors/edata.storage_TEST" | ||
| def _compare_processor_output( | ||
| source_filepath: str, | ||
| processor_class: Processor, | ||
| key: str, | ||
| snapshot, | ||
| ): | ||
| with open(source_filepath, encoding="utf-8") as original_file: | ||
| data = utils.deserialize_dict(json.load(original_file)) | ||
| if key == "consumptions": | ||
| processor = processor_class({"consumptions": data[key]}) | ||
| else: | ||
| processor = processor_class(data[key]) | ||
| assert utils.serialize_dict(processor.output) == snapshot | ||
| @pytest.mark.parametrize( | ||
| "processor, key", | ||
| [(ConsumptionProcessor, "consumptions"), (MaximeterProcessor, "maximeter")], | ||
| ) | ||
| def test_processor(processor: Processor, key: str, snapshot) -> None: | ||
| """Tests all processors but billing (syrupy snapshot)""" | ||
| _compare_processor_output( | ||
| TEST_GOOD_INPUT, | ||
| processor, | ||
| key, | ||
| snapshot, | ||
| ) | ||
| @pytest.mark.parametrize( | ||
| "_id, rules, prices", | ||
| [ | ||
| ( | ||
| "custom_prices", | ||
| PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.1, | ||
| p1_kwh_eur=None, | ||
| p2_kwh_eur=None, | ||
| p3_kwh_eur=None, | ||
| ), | ||
| [ | ||
| PricingData( | ||
| datetime=dt.datetime(2022, 10, 22, x, 0, 0), | ||
| value_eur_kWh=1, | ||
| delta_h=1, | ||
| ) | ||
| for x in range(0, 24) | ||
| ], | ||
| ), | ||
| ( | ||
| "constant_prices", | ||
| PricingRules( | ||
| p1_kw_year_eur=30.67266, | ||
| p2_kw_year_eur=1.4243591, | ||
| meter_month_eur=0.81, | ||
| market_kw_year_eur=3.113, | ||
| electricity_tax=1.0511300560, | ||
| iva_tax=1.1, | ||
| p1_kwh_eur=1, | ||
| p2_kwh_eur=1, | ||
| p3_kwh_eur=1, | ||
| ), | ||
| None, | ||
| ), | ||
| ], | ||
| ) | ||
| def test_processor_billing( | ||
| _id: str, rules: PricingRules, prices: typing.Optional[Iterable[PricingData]], snapshot | ||
| ): | ||
| """Tests billing processor (syrupy snapshot)""" | ||
| with open(TEST_GOOD_INPUT, "r", encoding="utf-8") as original_file: | ||
| data = utils.deserialize_dict(json.load(original_file)) | ||
| processor = BillingProcessor( | ||
| { | ||
| "consumptions": data["consumptions"], | ||
| "contracts": data["contracts"], | ||
| "prices": prices, | ||
| "rules": rules, | ||
| } | ||
| ) | ||
| assert utils.serialize_dict(processor.output) == snapshot |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
245954
5.34%34
9.68%2634
9.25%