Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

e-data

Package Overview
Dependencies
Maintainers
1
Versions
85
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

e-data - npm Package Compare versions

Comparing version
1.3.3
to
2.0.0.dev112
edata/core/__init__.py
+3
"""Constants file."""
PROG_NAME = "edata"
"""Collection of utilities."""
import logging
from datetime import datetime
from pathlib import Path
import holidays
from edata.models.supply import Contract
TARIFF_BY_HOUR = [
[10, 11, 12, 13, 18, 19, 20, 21], # p1
[8, 9, 14, 15, 16, 17, 22, 23], # p2
[0, 1, 2, 3, 4, 5, 6, 7], # p3
]
TARIFF_BY_WEEKDAY = [[], [], [5, 6]]
_LOGGER = logging.getLogger(__name__)
def get_tariff(dt: datetime) -> int:
"""Return the tariff for the selected datetime."""
hdays = holidays.country_holidays("ES")
hour = dt.hour
weekday = dt.weekday()
if dt.date() in hdays:
# holidays are p3
return 3
for idx, weekdays in enumerate(TARIFF_BY_WEEKDAY):
if weekday in weekdays:
return idx + 1
for idx, hours in enumerate(TARIFF_BY_HOUR):
if hour in hours:
return idx + 1
# we shouldn't get here
_LOGGER.error("Cannot decide the tariff for %s", dt.isoformat())
return 0
def get_contract_for_dt(contracts: list[Contract], date: datetime):
"""Return the active contract for a provided datetime."""
for contract in contracts:
if contract.date_start <= date <= contract.date_end:
return contract
return None
def get_month(dt: datetime) -> datetime:
"""Return a datetime that represents the month start for a provided datetime."""
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
def get_day(dt: datetime) -> datetime:
"""Return a datetime that represents the day start for a provided datetime."""
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def redacted_cups(cups: str) -> str:
"""Return an anonymized version of the cups identifier."""
return cups[-5:]
def get_db_path(storage_dir: str) -> str:
"""Return the database path for a given root storage dir."""
return str(Path(storage_dir).absolute() / "edata.db")
from edata.database.controller import EdataDB
import logging
import os
import typing
from datetime import datetime
from sqlalchemy import Select, insert
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import SelectOfScalar
import edata.database.queries as q
from edata.database.models import (
BillModel,
ContractModel,
EnergyModel,
PowerModel,
PVPCModel,
StatisticsModel,
SupplyModel,
)
from edata.models import Contract, Energy, Power, Statistics, Supply
from edata.models.bill import Bill, EnergyPrice
_LOGGER = logging.getLogger(__name__)
T = typing.TypeVar("T", bound=SQLModel)
class EdataDB:
_instance = None
_engine: AsyncEngine | None = None
_db_url: str | None = None
def __new__(cls, sqlite_path: str):
db_url = f"sqlite+aiosqlite:////{os.path.abspath(sqlite_path)}"
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._db_url = db_url
cls._engine = create_async_engine(db_url, future=True)
# Ensure parent directory exists
dir_path = os.path.dirname(os.path.abspath(sqlite_path))
os.makedirs(dir_path, exist_ok=True)
cls._instance._tables_initialized = False
elif db_url != cls._db_url:
raise ValueError("EdataDB already initialized with a different db_url")
return cls._instance
@property
def engine(self):
"""Return the async database engine."""
return self._engine
async def _ensure_tables(self):
"""Create tables if not already created (lazy init)."""
if self._tables_initialized:
return
if self.engine:
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
self._tables_initialized = True
async def _add_one(
self,
session: AsyncSession,
record: T,
commit: bool = True,
) -> T:
"""Add a single record into the database."""
session.add(record)
if commit:
await session.commit()
await session.refresh(record)
else:
await session.flush()
return record
async def _update_one(
self,
session: AsyncSession,
query: SelectOfScalar,
data,
commit: bool = True,
overrides: dict[str, typing.Any] | None = None,
) -> T | None: # type: ignore
"""Updates a single record in the database."""
result = await session.exec(query)
existing = result.first()
if existing and getattr(existing, "data") == data and not overrides:
return existing
setattr(existing, "data", data)
if overrides:
for key, value in overrides.items():
setattr(existing, key, value)
if commit:
await session.commit()
await session.refresh(existing)
else:
await session.flush()
return existing
async def _add_or_update_one(
self,
session: AsyncSession,
query: SelectOfScalar,
record: T,
commit: bool = True,
override: list[str] | None = None,
) -> T | None:
"""Add a single record into the database and fallback to update safely."""
try:
async with session.begin_nested():
session.add(record)
await session.flush()
if commit:
await session.commit()
await session.refresh(record)
return record
except IntegrityError:
new_data = getattr(record, "data")
override_dict = None
record_json = record.model_dump()
if override:
override_dict = {
x: record_json[x] for x in record.model_dump() if x in override
}
return await self._update_one(
session, query, new_data, commit=commit, overrides=override_dict
)
async def _add_or_update_many(
self,
session: AsyncSession,
queries: list[SelectOfScalar],
records: list[T],
batch_size: int = 100,
override: list[str] | None = None,
) -> list[T]:
"""Updates many records in the database"""
if not records:
return []
for i in range(0, len(records), batch_size):
chunk_records = records[i : i + batch_size]
chunk_queries = queries[i : i + batch_size]
try:
async with session.begin_nested():
session.add_all(chunk_records)
await session.flush()
except IntegrityError:
for j, record in enumerate(chunk_records):
await self._add_or_update_one(
session,
chunk_queries[j],
record,
commit=False,
override=override,
)
await session.commit()
return records
async def get_supply(self, cups: str) -> SupplyModel | None:
"""Get a supply record by cups."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.get_supply(cups))
return result.first()
async def get_contract(
self, cups: str, date_start: datetime | None = None
) -> ContractModel | None:
"""Get a contract record by cups."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.get_contract(cups, date_start))
return result.first()
async def get_last_energy(self, cups: str) -> EnergyModel | None:
"""Get the most recent Energy record by cups."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.get_last_energy(cups))
return result.first()
async def get_last_pvpc(self) -> PVPCModel | None:
"""Get the most recent pvpc."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.get_last_pvpc())
return result.first()
async def get_last_bill(self, cups: str) -> BillModel | None:
"""Get the most recent bill record by cups."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.get_last_bill(cups))
return result.first()
async def add_contract(self, cups: str, contract: Contract) -> ContractModel | None:
"""Add or update a contract record."""
await self._ensure_tables()
record = ContractModel(cups=cups, date_start=contract.date_start, data=contract)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session, q.get_contract(cups, contract.date_start), record
)
async def add_supply(self, supply: Supply) -> SupplyModel | None:
"""Add or update a supply record."""
await self._ensure_tables()
record = SupplyModel(cups=supply.cups, data=supply)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session, q.get_supply(supply.cups), record
)
async def add_energy(self, cups: str, energy: Energy) -> EnergyModel | None:
"""Add or update an energy record."""
await self._ensure_tables()
record = EnergyModel(
cups=cups, delta_h=energy.delta_h, datetime=energy.datetime, data=energy
)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session, q.get_energy(cups, energy.datetime), record
)
async def add_energy_list(self, cups: str, energy: list[Energy]):
"""Add or update a list of energy records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
unique_map = {item.datetime: item for item in energy}
unique = list(unique_map.values())
queries = [q.get_energy(cups, x.datetime) for x in unique]
items = [
EnergyModel(cups=cups, delta_h=x.delta_h, datetime=x.datetime, data=x)
for x in unique
]
await self._add_or_update_many(session, queries, items)
async def add_power(self, cups: str, power: Power) -> PowerModel | None:
"""Add or update a power record for a given CUPS and Power instance."""
await self._ensure_tables()
record = PowerModel(cups=cups, datetime=power.datetime, data=power)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session, q.get_power(cups, power.datetime), record
)
async def add_power_list(self, cups: str, power: list[Power]):
"""Add or update a list of power records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
unique_map = {item.datetime: item for item in power}
unique = list(unique_map.values())
queries = [q.get_power(cups, x.datetime) for x in unique]
items = [PowerModel(cups=cups, datetime=x.datetime, data=x) for x in unique]
await self._add_or_update_many(session, queries, items)
async def add_pvpc(self, pvpc: EnergyPrice) -> PVPCModel | None:
"""Add or update a pvpc record."""
await self._ensure_tables()
record = PVPCModel(datetime=pvpc.datetime, data=pvpc)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session, q.get_pvpc(pvpc.datetime), record
)
async def add_pvpc_list(self, pvpc: list[EnergyPrice]):
"""Add or update a list of pvpc records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
unique_map = {item.datetime: item for item in pvpc}
unique = list(unique_map.values())
queries = [q.get_pvpc(x.datetime) for x in unique]
items = [PVPCModel(datetime=x.datetime, data=x) for x in unique]
await self._add_or_update_many(session, queries, items)
async def add_statistics(
self,
cups: str,
type_: typing.Literal["day", "month"],
data: Statistics,
complete: bool = False,
) -> StatisticsModel | None:
"""Add or update a statistics record."""
await self._ensure_tables()
record = StatisticsModel(
cups=cups, datetime=data.datetime, type=type_, data=data, complete=complete
)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session,
q.get_statistics(cups, type_, data.datetime),
record,
override=["complete"],
)
async def add_bill(
self,
cups: str,
type_: typing.Literal["hour", "day", "month"],
data: Bill,
confhash: str,
complete: bool,
) -> BillModel | None:
"""Add or update a bill record."""
await self._ensure_tables()
record = BillModel(
cups=cups,
datetime=data.datetime,
type=type_,
complete=complete,
confhash=confhash,
data=data,
)
async with AsyncSession(self.engine) as session:
return await self._add_or_update_one(
session,
q.get_bill(cups, type_, data.datetime),
record,
override=["complete", "confhash"],
)
async def add_bill_list(
self,
cups: str,
type_: typing.Literal["hour", "day", "month"],
confhash: str,
complete: bool,
bill: list[Bill],
):
"""Add or update a list of bill records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
unique_map = {item.datetime: item for item in bill}
unique = list(unique_map.values())
queries = [q.get_bill(cups, type_, x.datetime) for x in unique]
items = [
BillModel(
cups=cups,
datetime=x.datetime,
type=type_,
confhash=confhash,
complete=complete,
data=x,
)
for x in unique
]
await self._add_or_update_many(
session, queries, items, override=["complete", "confhash"]
)
async def list_supplies(self):
"""List all supply records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.list_supply())
return result.all()
async def list_contracts(self, cups: str | None = None):
"""List all contract records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.list_contract(cups))
return result.all()
async def list_energy(
self,
cups: str,
date_from: datetime | None = None,
date_to: datetime | None = None,
):
"""List energy records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.list_energy(cups, date_from, date_to))
return result.all()
async def list_power(
self,
cups: str,
date_from: datetime | None = None,
date_to: datetime | None = None,
):
"""List power records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.list_power(cups, date_from, date_to))
return result.all()
async def list_pvpc(
self,
date_from: datetime | None = None,
date_to: datetime | None = None,
):
"""List pvpc records."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(q.list_pvpc(date_from, date_to))
return result.all()
async def list_statistics(
self,
cups: str,
type_: typing.Literal["day", "month"],
date_from: datetime | None = None,
date_to: datetime | None = None,
complete: bool | None = None,
):
"""List statistics records filtered by type ('day' or 'month') and date range."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(
q.list_statistics(cups, type_, date_from, date_to, complete)
)
return result.all()
async def list_bill(
self,
cups: str,
type_: typing.Literal["hour", "day", "month"],
date_from: datetime | None = None,
date_to: datetime | None = None,
complete: bool | None = None,
):
"""List bill records filtered by type ('hour', 'day' or 'month') and date range."""
await self._ensure_tables()
async with AsyncSession(self.engine) as session:
result = await session.exec(
q.list_bill(cups, type_, date_from, date_to, complete)
)
return result.all()
import typing
from datetime import datetime as dt
from sqlmodel import AutoString, Column, Field, SQLModel, UniqueConstraint
from edata.database.utils import PydanticJSON
from edata.models import Bill, Contract, Energy, EnergyPrice, Power, Statistics, Supply
class SupplyModel(SQLModel, table=True):
__tablename__ = "supply" # type: ignore
__table_args__ = {"extend_existing": True}
cups: str = Field(default=None, primary_key=True)
data: Supply = Field(sa_column=Column(PydanticJSON(Supply)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class ContractModel(SQLModel, table=True):
__tablename__ = "contract" # type: ignore
__table_args__ = (
UniqueConstraint("cups", "date_start", name="uq_contract_cups_start"),
{"extend_existing": True},
)
id: int | None = Field(default=None, primary_key=True)
cups: str = Field(foreign_key="supply.cups", index=True)
date_start: dt = Field(index=True)
data: Contract = Field(sa_column=Column(PydanticJSON(Contract)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class EnergyModel(SQLModel, table=True):
__tablename__ = "energy" # type: ignore
__table_args__ = (
UniqueConstraint(
"cups", "delta_h", "datetime", name="uq_energy_cups_delta_datetime"
),
{"extend_existing": True},
)
id: int | None = Field(default=None, primary_key=True)
cups: str = Field(foreign_key="supply.cups", index=True)
delta_h: float
datetime: dt = Field(index=True)
data: Energy = Field(sa_column=Column(PydanticJSON(Energy)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class PowerModel(SQLModel, table=True):
__tablename__ = "power" # type: ignore
__table_args__ = (
UniqueConstraint("cups", "datetime", name="uq_power_cups_datetime"),
{"extend_existing": True},
)
id: int | None = Field(default=None, primary_key=True)
cups: str = Field(foreign_key="supply.cups", index=True)
datetime: dt = Field(index=True)
data: Power = Field(sa_column=Column(PydanticJSON(Power)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class StatisticsModel(SQLModel, table=True):
__tablename__ = "statistics" # type: ignore
__table_args__ = (
UniqueConstraint(
"cups",
"datetime",
"type",
name="uq_statistics_datetime_type",
),
{"extend_existing": True},
)
id: int | None = Field(default=None, primary_key=True)
cups: str = Field(foreign_key="supply.cups", index=True)
datetime: dt = Field(index=True)
type: typing.Literal["day", "month"] = Field(index=True, sa_type=AutoString)
complete: bool = Field(False)
data: Statistics = Field(sa_column=Column(PydanticJSON(Statistics)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class PVPCModel(SQLModel, table=True):
__tablename__ = "pvpc" # type: ignore
id: int | None = Field(default=None, primary_key=True)
datetime: dt = Field(index=True, unique=True)
data: EnergyPrice = Field(sa_column=Column(PydanticJSON(EnergyPrice)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
class BillModel(SQLModel, table=True):
__tablename__ = "bill" # type: ignore
__table_args__ = (
UniqueConstraint(
"cups",
"datetime",
"type",
name="uq_bill_datetime_type",
),
{"extend_existing": True},
)
id: int | None = Field(default=None, primary_key=True)
cups: str = Field(foreign_key="supply.cups", index=True)
datetime: dt = Field(index=True)
type: typing.Literal["hour", "day", "month"] = Field(index=True, sa_type=AutoString)
complete: bool = Field(False)
confhash: str
data: Bill = Field(sa_column=Column(PydanticJSON(Bill)))
version: int = Field(default=1)
created_at: dt = Field(default_factory=dt.now, nullable=False)
updated_at: dt = Field(
default_factory=dt.now, nullable=False, sa_column_kwargs={"onupdate": dt.now}
)
import typing
from datetime import datetime
from sqlmodel import asc, desc, select
from sqlmodel.sql.expression import SelectOfScalar
from edata.database.models import (
BillModel,
ContractModel,
EnergyModel,
PowerModel,
PVPCModel,
StatisticsModel,
SupplyModel,
)
# Queries for "supply" table
def get_supply(cups: str) -> SelectOfScalar[SupplyModel]:
"""Query that selects a Supply."""
return select(SupplyModel).where(SupplyModel.cups == cups)
def list_supply() -> SelectOfScalar[SupplyModel]:
"""Query that selects all Supply data."""
return select(SupplyModel)
# Queries for "contract" table
def get_contract(
cups: str, date_start: datetime | None = None
) -> SelectOfScalar[ContractModel]:
"""Query that selects a Contract."""
query = select(ContractModel).where((ContractModel.cups == cups))
if date_start:
query = query.where(ContractModel.date_start == date_start)
else:
query = query.order_by(desc(ContractModel.date_start))
return query
def list_contract(cups: str | None = None) -> SelectOfScalar[ContractModel]:
"""Query that selects all Contract data."""
query = select(ContractModel)
if cups is not None:
query = query.where(ContractModel.cups == cups)
query = query.order_by(asc(ContractModel.date_start))
return query
# Queries for "energy" table
def get_energy(
cups: str, datetime_: datetime | None = None
) -> SelectOfScalar[EnergyModel]:
"""Query that selects a Energy."""
query = select(EnergyModel).where(EnergyModel.cups == cups)
if datetime_ is not None:
query = query.where(EnergyModel.datetime == datetime_)
return query
def list_energy(
cups: str, date_from: datetime | None = None, date_to: datetime | None = None
) -> SelectOfScalar[EnergyModel]:
"""Query that selects all Energy data."""
query = select(EnergyModel).where(EnergyModel.cups == cups)
if date_from:
query = query.where(EnergyModel.datetime >= date_from)
if date_to:
query = query.where(EnergyModel.datetime <= date_to)
query = query.order_by(asc(EnergyModel.datetime))
return query
def get_last_energy(
cups: str,
) -> SelectOfScalar[EnergyModel]:
"""Query that selects the most recent Energy record."""
query = select(EnergyModel).where(EnergyModel.cups == cups)
query = query.order_by(desc(EnergyModel.datetime))
query = query.limit(1)
return query
# Queries for "power" table
def get_power(
cups: str, datetime_: datetime | None = None
) -> SelectOfScalar[PowerModel]:
"""Query that selects a power."""
query = select(PowerModel).where(PowerModel.cups == cups)
if datetime_ is not None:
query = query.where(PowerModel.datetime == datetime_)
return query
def list_power(
cups: str, date_from: datetime | None = None, date_to: datetime | None = None
) -> SelectOfScalar[PowerModel]:
"""Query that selects all power data."""
query = select(PowerModel).where(PowerModel.cups == cups)
if date_from:
query = query.where(PowerModel.datetime >= date_from)
if date_to:
query = query.where(PowerModel.datetime <= date_to)
query = query.order_by(asc(PowerModel.datetime))
return query
# Queries for "statistics" table
def get_statistics(
cups: str, type_: typing.Literal["day", "month"], datetime_: datetime | None = None
) -> SelectOfScalar[StatisticsModel]:
"""Query that selects a statistics."""
query = select(StatisticsModel).where(StatisticsModel.cups == cups)
query = query.where(StatisticsModel.type == type_)
if datetime_ is not None:
query = query.where(StatisticsModel.datetime == datetime_)
return query
def list_statistics(
cups: str,
type_: typing.Literal["day", "month"],
date_from: datetime | None = None,
date_to: datetime | None = None,
complete: bool | None = None,
) -> SelectOfScalar[StatisticsModel]:
"""Query that selects all statistics data."""
query = select(StatisticsModel).where(StatisticsModel.cups == cups)
query = query.where(StatisticsModel.type == type_)
if date_from:
query = query.where(StatisticsModel.datetime >= date_from)
if date_to:
query = query.where(StatisticsModel.datetime <= date_to)
if complete is not None:
query = query.where(StatisticsModel.complete == complete)
query = query.order_by(asc(StatisticsModel.datetime))
return query
def list_bill(
cups: str,
type_: typing.Literal["hour", "day", "month"],
date_from: datetime | None = None,
date_to: datetime | None = None,
complete: bool | None = None,
) -> SelectOfScalar[BillModel]:
"""Query that selects all bill data."""
query = select(BillModel).where(BillModel.cups == cups)
query = query.where(BillModel.type == type_)
if date_from:
query = query.where(BillModel.datetime >= date_from)
if date_to:
query = query.where(BillModel.datetime <= date_to)
if complete is not None:
query = query.where(BillModel.complete == complete)
query = query.order_by(asc(BillModel.datetime))
return query
def get_last_bill(
cups: str,
) -> SelectOfScalar[BillModel]:
"""Query that selects the most recent bill record."""
query = select(BillModel).where(BillModel.cups == cups)
query = query.order_by(desc(BillModel.datetime))
query = query.limit(1)
return query
# Queries for "pvpc" table
def get_pvpc(datetime_: datetime) -> SelectOfScalar[PVPCModel]:
"""Query that selects a PVPC record."""
query = select(PVPCModel).where(PVPCModel.datetime == datetime_)
return query
def get_last_pvpc() -> SelectOfScalar[PVPCModel]:
"""Query that selects the most recent pvpc record."""
query = select(PVPCModel)
query = query.order_by(desc(PVPCModel.datetime))
query = query.limit(1)
return query
def list_pvpc(
date_from: datetime | None = None, date_to: datetime | None = None
) -> SelectOfScalar[PVPCModel]:
"""Query that selects a PVPC record."""
query = select(PVPCModel)
if date_from:
query = query.where(PVPCModel.datetime >= date_from)
if date_to:
query = query.where(PVPCModel.datetime <= date_to)
return query
def get_bill(cups, type_, datetime_):
"""Query that selects a bill."""
query = select(BillModel).where(BillModel.cups == cups)
query = query.where(BillModel.type == type_)
if datetime_ is not None:
query = query.where(BillModel.datetime == datetime_)
return query
from typing import Any, Type, TypeVar
from pydantic import BaseModel
from sqlalchemy.types import JSON, TypeDecorator
T = TypeVar("T", bound=BaseModel)
class PydanticJSON(TypeDecorator):
"""
Tipo de SQLAlchemy para guardar modelos Pydantic como JSON.
Automáticamente serializa al guardar y deserializa al leer.
"""
impl = JSON
cache_ok = True
def __init__(self, pydantic_model: Type[T]):
super().__init__()
self.pydantic_model = pydantic_model
def process_bind_param(self, value: T | None, dialect: Any) -> Any:
# Python -> Base de Datos
if value is None:
return None
# Aquí ocurre la magia: mode='json' convierte datetime a string ISO
return value.model_dump(mode="json")
def process_result_value(self, value: Any, dialect: Any) -> T | None:
# Base de Datos -> Python
if value is None:
return None
# Reconstruimos el objeto Pydantic desde el diccionario
return self.pydantic_model.model_validate(value) # type: ignore
from edata.models.bill import Bill, EnergyPrice
from edata.models.data import Energy, Power, Statistics
from edata.models.supply import Contract, Supply
"""Models for billing-related data"""
from datetime import datetime
from pydantic import BaseModel, Field
class EnergyPrice(BaseModel):
"""Data structure to represent pricing data."""
datetime: datetime
value_eur_kWh: float
delta_h: float
class Bill(BaseModel):
"""Data structure to represent a bill during a period."""
datetime: datetime
delta_h: float
value_eur: float = Field(default=0.0)
energy_term: float = Field(default=0.0)
power_term: float = Field(default=0.0)
others_term: float = Field(default=0.0)
surplus_term: float = Field(default=0.0)
class BillingRules(BaseModel):
"""Data structure to represent a generic billing rule."""
p1_kw_year_eur: float
p2_kw_year_eur: float
p1_kwh_eur: float
p2_kwh_eur: float
p3_kwh_eur: float
surplus_p1_kwh_eur: float | None = Field(default=None)
surplus_p2_kwh_eur: float | None = Field(default=None)
surplus_p3_kwh_eur: float | None = Field(default=None)
meter_month_eur: float = Field(default=0.81)
market_kw_year_eur: float = Field(default=3.113)
electricity_tax: float = Field(default=1.0511300560)
iva_tax: float = Field(default=1.21)
energy_formula: str = Field(default="electricity_tax * iva_tax * kwh_eur * kwh")
power_formula: str = Field(
default="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24"
)
others_formula: str = Field(default="iva_tax * meter_month_eur / 30 / 24")
surplus_formula: str | None = Field(default=None)
cycle_start_day: int = Field(default=1)
class PVPCBillingRules(BillingRules):
"""Data structure to represent a PVPC billing rule."""
p1_kw_year_eur: float = Field(default=30.67266)
p2_kw_year_eur: float = Field(default=1.4243591)
p1_kwh_eur: None = Field(default=None)
p2_kwh_eur: None = Field(default=None)
p3_kwh_eur: None = Field(default=None)
surplus_p1_kwh_eur: None = Field(default=None)
surplus_p2_kwh_eur: None = Field(default=None)
surplus_p3_kwh_eur: None = Field(default=None)
meter_month_eur: float = Field(default=0.81)
market_kw_year_eur: float = Field(default=3.113)
electricity_tax: float = Field(default=1.0511300560)
iva_tax: float = Field(default=1.21)
energy_formula: str = Field(default="electricity_tax * iva_tax * kwh_eur * kwh")
power_formula: str = Field(
default="electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24"
)
others_formula: str = Field(default="iva_tax * meter_month_eur / 30 / 24")
surplus_formula: str | None = Field(default=None)
cycle_start_day: int = Field(default=1)
"""Models for telemetry data"""
from datetime import datetime
from pydantic import BaseModel, Field
class Energy(BaseModel):
"""Data structure to represent energy consumption and/or surplus measurements."""
datetime: datetime
delta_h: float
value_kWh: float
surplus_kWh: float = Field(0)
real: bool
class Power(BaseModel):
"""Data structure to represent power measurements."""
datetime: datetime
value_kW: float
class Statistics(BaseModel):
"""Data structure to represent aggregated energy/surplus data."""
datetime: datetime
delta_h: float = Field(0)
value_kWh: float = Field(0)
value_p1_kWh: float = Field(0)
value_p2_kWh: float = Field(0)
value_p3_kWh: float = Field(0)
surplus_kWh: float = Field(0)
surplus_p1_kWh: float = Field(0)
surplus_p2_kWh: float = Field(0)
surplus_p3_kWh: float = Field(0)
"""Models for contractual data"""
from datetime import datetime
from pydantic import BaseModel
class Supply(BaseModel):
"""Data model of a Supply."""
cups: str
date_start: datetime
date_end: datetime
address: str | None
postal_code: str | None
province: str | None
municipality: str | None
distributor: str | None
pointType: int
distributorCode: str
class Contract(BaseModel):
"""Data model of a Contract."""
date_start: datetime
date_end: datetime
marketer: str
distributorCode: str
power_p1: float | None
power_p2: float | None
from edata.providers.datadis import DatadisConnector
from edata.providers.redata import REDataConnector
"""Datadis API connector.
To fetch data from datadis.es private API.
There a few issues that are workarounded:
- You have to wait 24h between two identical requests.
- Datadis server does not like ranges greater than 1 month.
"""
import asyncio
import contextlib
import hashlib
import logging
import os
import tempfile
import typing
from datetime import datetime, timedelta
import aiohttp
import diskcache
from edata.models import Contract, Energy, Power, Supply
_LOGGER = logging.getLogger(__name__)
# Token-related constants
URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login"
TOKEN_USERNAME = "username"
TOKEN_PASSWD = "password"
# Supplies-related constants
URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies"
GET_SUPPLIES_MANDATORY_FIELDS = [
"cups",
"validDateFrom",
"validDateTo",
"pointType",
"distributorCode",
]
# Contracts-related constants
URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail"
GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [
"startDate",
"endDate",
"marketer",
"contractedPowerkW",
]
# Consumption-related constants
URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data"
GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [
"time",
"date",
"consumptionKWh",
"obtainMethod",
]
# Maximeter-related constants
URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power"
GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"]
# Timing constants
TIMEOUT = 3 * 60 # requests timeout
QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again...
# Cache-related constants
RECENT_CACHE_SUBDIR = "cache"
def migrate_storage(storage_dir):
"""Migrate storage from older versions."""
with contextlib.suppress(FileNotFoundError):
os.remove(os.path.join(storage_dir, "edata_recent_queries.json"))
os.remove(os.path.join(storage_dir, "edata_recent_queries_cache.json"))
class DatadisConnector:
"""A Datadis private API connector."""
def __init__(
self,
username: str,
password: str,
enable_smart_fetch: bool = True,
storage_path: str | None = None,
) -> None:
self._usr = username
self._pwd = password
self._token = {}
self._smart_fetch = enable_smart_fetch
self._recent_queries = {}
self._recent_cache = {}
self._warned_queries = []
if storage_path is not None:
self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR)
migrate_storage(storage_path)
else:
self._recent_cache_dir = os.path.join(
tempfile.gettempdir(), RECENT_CACHE_SUBDIR
)
os.makedirs(self._recent_cache_dir, exist_ok=True)
self._cache = diskcache.Cache(self._recent_cache_dir)
def _get_hash(self, item: str):
"""Return a hash."""
return hashlib.md5(item.encode()).hexdigest()
def _set_cache(self, key: str, data: dict | None = None) -> None:
"""Cache a successful query to avoid exceeding query limits (diskcache)."""
hash_query = self._get_hash(key)
try:
self._cache.set(hash_query, data, expire=QUERY_LIMIT.total_seconds())
_LOGGER.debug("Updating cache item '%s'", hash_query)
except Exception as e:
_LOGGER.warning("Unknown error while updating cache: %s", e)
def _is_cached(self, key: str) -> bool:
"""Check if a query has been done recently to avoid exceeding query limits (diskcache)."""
hash_query = self._get_hash(key)
return hash_query in self._cache
def _get_cache(self, key: str):
"""Return cached response for a query (diskcache)."""
hash_query = self._get_hash(key)
try:
return self._cache.get(hash_query, default=None)
except Exception:
return None
async def _async_get_token(self):
"""Private async method that fetches a new token if needed."""
_LOGGER.debug("No token found, fetching a new one")
is_valid_token = False
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(
URL_TOKEN,
data={
TOKEN_USERNAME: self._usr,
TOKEN_PASSWD: self._pwd,
},
) as response:
text = await response.text()
if response.status == 200:
self._token["encoded"] = text
self._token["headers"] = {
"Authorization": "Bearer " + self._token["encoded"]
}
is_valid_token = True
else:
_LOGGER.error(
"Unknown error while retrieving token, got %s", text
)
except Exception as e:
_LOGGER.error("Exception while retrieving token: %s", e)
return is_valid_token
async def async_login(self):
"""Test to login with provided credentials (async)."""
return await self._async_get_token()
def login(self):
"""Test to login with provided credentials (sync wrapper)."""
return asyncio.run(self.async_login())
async def _async_get(
self,
url: str,
request_data: dict | None = None,
refresh_token: bool = False,
is_retry: bool = False,
ignore_cache: bool = False,
) -> list[dict[str, typing.Any]]:
"""Async get request for Datadis API."""
if request_data is None:
data = {}
else:
data = request_data
is_valid_token = False
response = []
if refresh_token:
is_valid_token = await self._async_get_token()
if is_valid_token or not refresh_token:
params = "?" if len(data) > 0 else ""
for param in data:
key = param
value = data[param]
params = params + f"{key}={value}&"
anonym_params = "?" if len(data) > 0 else ""
for anonym_param in data:
key = anonym_param
if key == "cups":
value = "xxxx" + str(data[anonym_param])[-5:]
elif key == "authorizedNif":
value = "xxxx"
else:
value = data[anonym_param]
anonym_params = anonym_params + f"{key}={value}&"
is_recent_query = await asyncio.to_thread(self._is_cached, url + params)
if not ignore_cache and is_recent_query:
_cache = await asyncio.to_thread(self._get_cache, url + params)
if _cache is not None:
_LOGGER.info("CACHED %s", url + anonym_params)
return _cache # type: ignore
return []
try:
_LOGGER.info("GET %s", url + anonym_params)
headers = {"Accept-Encoding": "identity"}
if self._token.get("headers"):
headers.update(self._token["headers"])
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
url + params,
headers=headers,
) as reply:
text = await reply.text()
if reply.status == 200:
try:
json_data = await reply.json(content_type=None)
if json_data:
response = json_data
if not ignore_cache:
await asyncio.to_thread(
self._set_cache,
url + params,
response,
)
else:
_LOGGER.info("200 OK but empty response")
if not ignore_cache:
await asyncio.to_thread(
self._set_cache, url + params
)
except Exception as e:
_LOGGER.warning(
"200 OK but failed to parse the response"
)
elif reply.status == 401 and not refresh_token:
response = await self._async_get(
url,
request_data=data,
refresh_token=True,
ignore_cache=ignore_cache,
)
elif reply.status == 429:
_LOGGER.warning(
"%s with message '%s'",
reply.status,
text,
)
if not ignore_cache:
await asyncio.to_thread(self._set_cache, url + params)
elif is_retry:
if (url + params) not in self._warned_queries:
_LOGGER.warning(
"%s with message '%s'. %s. %s",
reply.status,
text,
"Query temporary disabled",
"Future 500 code errors for this query will be silenced until restart",
)
if not ignore_cache:
await asyncio.to_thread(self._set_cache, url + params)
self._warned_queries.append(url + params)
else:
response = await self._async_get(
url,
request_data,
is_retry=True,
ignore_cache=ignore_cache,
)
except asyncio.TimeoutError:
_LOGGER.warning("Timeout at %s", url + anonym_params)
return []
except Exception as e:
_LOGGER.warning("Exception at %s: %s", url + anonym_params, e)
return []
return response
async def async_get_supplies(
self, authorized_nif: str | None = None
) -> list[Supply]:
data = {}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(
URL_GET_SUPPLIES, request_data=data, ignore_cache=True
)
supplies = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS):
supplies.append(
Supply(
cups=i["cups"],
date_start=datetime.strptime(
(
i["validDateFrom"]
if i["validDateFrom"] != ""
else "1970/01/01"
),
"%Y/%m/%d",
),
date_end=datetime.strptime(
(
i["validDateTo"]
if i["validDateTo"] != ""
else tomorrow_str
),
"%Y/%m/%d",
),
address=i.get("address", None),
postal_code=i.get("postalCode", None),
province=i.get("province", None),
municipality=i.get("municipality", None),
distributor=i.get("distributor", None),
pointType=i["pointType"],
distributorCode=i["distributorCode"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching supplies data, got %s",
response,
)
return supplies
def get_supplies(self, authorized_nif: str | None = None):
"""Datadis 'get_supplies' query (sync wrapper)."""
return asyncio.run(self.async_get_supplies(authorized_nif=authorized_nif))
async def async_get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
) -> list[Contract]:
data = {"cups": cups, "distributorCode": distributor_code}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(
URL_GET_CONTRACT_DETAIL, request_data=data, ignore_cache=True
)
contracts = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS):
contracts.append(
Contract(
date_start=datetime.strptime(
i["startDate"] if i["startDate"] != "" else "1970/01/01",
"%Y/%m/%d",
),
date_end=datetime.strptime(
i["endDate"] if i["endDate"] != "" else tomorrow_str,
"%Y/%m/%d",
),
marketer=i["marketer"],
distributorCode=distributor_code,
power_p1=(
i["contractedPowerkW"][0]
if isinstance(i["contractedPowerkW"], list)
else None
),
power_p2=(
i["contractedPowerkW"][1]
if (len(i["contractedPowerkW"]) > 1)
else None
),
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching contracts data, got %s",
response,
)
return contracts
def get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
):
"""Datadis get_contract_detail query (sync wrapper)."""
return asyncio.run(
self.async_get_contract_detail(cups, distributor_code, authorized_nif)
)
async def async_get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
) -> list[Energy]:
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
"measurementType": measurement_type,
"pointType": point_type,
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(URL_GET_CONSUMPTION_DATA, request_data=data)
consumptions = []
for i in response:
if "consumptionKWh" in i:
if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS):
hour = str(int(i["time"].split(":")[0]) - 1)
date_as_dt = datetime.strptime(
f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M"
)
if not (start_date <= date_as_dt <= end_date):
continue # skip element if dt is out of range
_surplus = i.get("surplusEnergyKWh", 0)
if _surplus is None:
_surplus = 0
consumptions.append(
Energy(
datetime=date_as_dt,
delta_h=1,
value_kWh=i["consumptionKWh"],
surplus_kWh=_surplus,
real=i["obtainMethod"] == "Real",
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching consumption data, got %s",
response,
)
return consumptions
def get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
):
"""Datadis get_consumption_data query (sync wrapper)."""
return asyncio.run(
self.async_get_consumption_data(
cups,
distributor_code,
start_date,
end_date,
measurement_type,
point_type,
authorized_nif,
)
)
async def async_get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
) -> list[Power]:
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(URL_GET_MAX_POWER, request_data=data)
maxpower_values = []
for i in response:
if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS):
maxpower_values.append(
Power(
datetime=datetime.strptime(
f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M"
),
value_kW=i["maxPower"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching maximeter data, got %s",
response,
)
return maxpower_values
def get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
):
"""Datadis get_max_power query (sync wrapper)."""
return asyncio.run(
self.async_get_max_power(
cups,
distributor_code,
start_date,
end_date,
authorized_nif,
)
)
"""A REData API connector"""
import asyncio
import datetime as dt
import logging
import aiohttp
from dateutil import parser
from edata.models import EnergyPrice
_LOGGER = logging.getLogger(__name__)
REQUESTS_TIMEOUT = 15
URL_REALTIME_PRICES = (
"https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real"
"?time_trunc=hour"
"&geo_ids={geo_id}"
"&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}"
)
class REDataConnector:
"""Main class for REData connector"""
def __init__(
self,
) -> None:
"""Init method for REDataConnector"""
async def async_get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list[EnergyPrice]:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month (async)"""
url = URL_REALTIME_PRICES.format(
geo_id=8744 if is_ceuta_melilla else 8741,
start=dt_from,
end=dt_to,
)
data = []
_LOGGER.info("GET %s", url)
timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as res:
text = await res.text()
if res.status == 200:
try:
res_json = await res.json()
res_list = res_json["included"][0]["attributes"]["values"]
except (IndexError, KeyError):
_LOGGER.error(
"%s returned a malformed response: %s ",
url,
text,
)
return data
for element in res_list:
data.append(
EnergyPrice(
datetime=parser.parse(element["datetime"]).replace(
tzinfo=None
),
value_eur_kWh=element["value"] / 1000,
delta_h=1,
)
)
else:
_LOGGER.error(
"%s returned %s with code %s",
url,
text,
res.status,
)
except Exception as e:
_LOGGER.error("Exception fetching realtime prices: %s", e)
return data
def get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month (sync wrapper)"""
return asyncio.run(
self.async_get_realtime_prices(dt_from, dt_to, is_ceuta_melilla)
)
import asyncio
import calendar
import logging
import os
import typing
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import gettempdir
from dateutil import relativedelta
from jinja2 import Environment
from edata.core.utils import (
get_db_path,
get_contract_for_dt,
get_day,
get_month,
get_tariff,
redacted_cups,
)
from edata.database.controller import EdataDB
from edata.models import Contract, Energy, EnergyPrice
from edata.models.bill import Bill, BillingRules, PVPCBillingRules
_LOGGER = logging.getLogger(__name__)
class BillService:
"Definition of a bill service for energy supplies."
def __init__(self, cups: str, storage_path: str) -> None:
self._cups = cups
self._scups = redacted_cups(cups)
self.db = EdataDB(get_db_path(storage_path))
async def get_bills(
self,
start: datetime | None = None,
end: datetime | None = None,
type_: typing.Literal["hour", "day", "month"] = "hour",
) -> list[Bill]:
"""Return the list of bills."""
res = await self.db.list_bill(self._cups, type_, start, end)
return [x.data for x in res]
async def update(
self,
start: datetime | None = None,
end: datetime | None = None,
billing_rules: BillingRules | PVPCBillingRules | None = None,
is_pvpc: bool = True,
) -> None:
"""Update all missing billing data within optional date ranges."""
if not billing_rules:
# assuming pvpc
_LOGGER.info("%s non explicit billing rules, assuming PVPC", self._scups)
billing_rules = PVPCBillingRules()
is_pvpc = True
# fetch cups
supply = await self.db.get_supply(self._cups)
if not supply:
_LOGGER.warning(
"%s the selected cups does not exist, please fetch data first",
self._scups,
)
return
_LOGGER.info(
"%s the selected supply is available from %s to %s",
self._scups,
supply.data.date_start,
supply.data.date_end,
)
if not start:
_LOGGER.debug(
"%s automatically setting start date as last hourly bill", self._scups
)
start = await self._get_last_bill_dt()
if not start:
_LOGGER.debug(
"%s there are no bills for this cups, building since the start of the supply",
self._scups,
)
start = supply.data.date_start
if not end:
_LOGGER.debug(
"%s automatically setting end date as supply date end", self._scups
)
end = supply.data.date_end
_LOGGER.info(
"%s data will be updated from %s to %s",
self._scups,
start,
end,
)
# fetch contracts
contracts = await self._get_contracts()
# fetch and filter energy items
energy = await self._get_energy()
_LOGGER.debug("%s compiling missing hourly bills", self._scups)
if is_pvpc:
pvpc = await self._get_pvpc()
billing_rules = PVPCBillingRules(**billing_rules.model_dump())
bills = await asyncio.to_thread(
self._compile_pvpc, contracts, energy, pvpc, billing_rules
)
confighash = f"pvpc-{hash(billing_rules.model_dump_json())}"
else:
bills = await asyncio.to_thread(
self._compile_j2, contracts, energy, billing_rules
)
confighash = f"custom-{hash(billing_rules.model_dump_json())}"
_LOGGER.debug("%s pushing hourly bills", self._scups)
await self.db.add_bill_list(
cups=self._cups,
type_="hour",
confhash=confighash,
complete=True,
bill=bills,
)
_LOGGER.debug("%s updating daily and monthly bills", self._scups)
await self.update_statistics(start, end)
await self.fix_missing_statistics()
async def update_statistics(self, start: datetime, end: datetime):
"""Update the statistics during a period."""
await self._update_daily_statistics(start, end)
await self._update_monthly_statistics(start, end)
async def _update_daily_statistics(self, start: datetime, end: datetime):
"""Update daily statistics within a date range."""
day_start = get_day(start)
day_end = end
daily = await self.db.list_bill(self._cups, "day", day_start, day_end)
complete = []
for stat in daily:
if stat.complete:
complete.append(stat.datetime)
continue
data = await self.get_bills(day_start, day_end)
stats = await asyncio.to_thread(
self._compile_statistics, data, get_day, skip=complete
)
for stat in stats:
is_complete = stat.delta_h == 24
await self.db.add_bill(self._cups, "day", stat, "mix", is_complete)
if not is_complete:
_LOGGER.info(
"%s daily statistics for %s are incomplete",
self._scups,
stat.datetime.date(),
)
async def _update_monthly_statistics(self, start: datetime, end: datetime):
"""Update monthly statistics within a date range."""
month_start = get_month(start)
month_end = end
monthly = await self.db.list_bill(self._cups, "month", month_start, month_end)
complete = []
for stat in monthly:
if stat.complete:
complete.append(stat.datetime)
continue
data = await self.get_bills(month_start, month_end)
stats = await asyncio.to_thread(
self._compile_statistics, data, get_month, skip=complete
)
for stat in stats:
target_hours = (
calendar.monthrange(stat.datetime.year, stat.datetime.month)[1] * 24
)
is_complete = stat.delta_h == target_hours
await self.db.add_bill(self._cups, "month", stat, "mix", is_complete)
if not is_complete:
_LOGGER.info(
"%s monthly statistics for %s are incomplete",
self._scups,
stat.datetime.date(),
)
async def _find_missing_stats(self):
"""Return the list of days that are missing billing data."""
stats = await self.db.list_bill(self._cups, "day", complete=False)
return [x.datetime for x in stats]
def _compile_statistics(
self,
data: list[Bill],
agg: typing.Callable[[datetime], datetime],
wanted: list[datetime] | None = None,
skip: list[datetime] | None = None,
) -> list[Bill]:
"""Return the aggregated bill data."""
if not wanted:
wanted = []
if not skip:
skip = []
agg_data = {}
for item in data:
agg_dt = agg(item.datetime)
is_wanted = agg_dt in wanted or agg_dt not in skip
if not is_wanted:
continue
if agg_dt not in agg_data:
agg_data[agg_dt] = Bill(
datetime=agg_dt,
delta_h=0,
)
ref = agg_data[agg_dt]
ref.delta_h += item.delta_h
ref.value_eur += item.value_eur
ref.energy_term += item.energy_term
ref.power_term += item.power_term
ref.others_term += item.others_term
ref.surplus_term += item.surplus_term
return [agg_data[x] for x in agg_data]
async def fix_missing_statistics(self):
"""Recompile statistics to fix missing data."""
missing = await self._find_missing_stats()
for day in missing:
_LOGGER.debug("%s updating daily statistics for date %s", self._scups, day)
end = day + relativedelta.relativedelta(day=1) - timedelta(minutes=1)
await self._update_daily_statistics(day, end)
missing_months = list(set([get_month(x) for x in missing]))
for month in missing_months:
_LOGGER.debug(
"%s updating monthly statistics for date %s", self._scups, month
)
end = month + relativedelta.relativedelta(months=1) - timedelta(minutes=1)
await self._update_monthly_statistics(month, end)
def _compile_pvpc(
self,
contracts: list[Contract],
energy: list[Energy],
pvpc: list[EnergyPrice],
rules: PVPCBillingRules,
) -> list[Bill]:
"""Compile bills assuming PVPC billing."""
energy_dt = [x.datetime for x in energy]
pvpc_dt = [x.datetime for x in pvpc]
# reduce computation range to valid periods
pvpc_valid_dt = []
for contract in contracts:
for dt in pvpc_dt:
if dt in pvpc_valid_dt:
continue
if contract.date_start <= dt <= contract.date_end:
pvpc_valid_dt.append(dt)
pvpc_valid_dt = [x for x in pvpc_valid_dt if x in energy_dt]
e = {x.datetime: x for x in energy if x.datetime in pvpc_valid_dt}
p = {x.datetime: x for x in pvpc if x.datetime in pvpc_valid_dt}
b: dict[datetime, Bill] = {}
for dt in e.keys():
c = get_contract_for_dt(contracts, dt)
if not c:
continue
p1_kw = c.power_p1
p2_kw = c.power_p2
if not p1_kw or not p2_kw:
continue
bill = Bill(datetime=dt, delta_h=1)
bill.energy_term = (
rules.electricity_tax
* rules.iva_tax
* p[dt].value_eur_kWh
* e[dt].value_kWh
)
bill.power_term = (
rules.electricity_tax
* rules.iva_tax
* (
p1_kw * (rules.p1_kw_year_eur + rules.market_kw_year_eur)
+ p2_kw * rules.p2_kw_year_eur
)
/ 365
/ 24
)
bill.others_term = rules.iva_tax * rules.meter_month_eur / 30 / 24
bill.value_eur = bill.energy_term + bill.power_term + bill.others_term
b[dt] = bill
return [x for x in b.values()]
def _compile_j2(
self,
contracts: list[Contract],
energy: list[Energy],
rules: BillingRules,
):
"""Compile bills from custom rules."""
e = {x.datetime: x for x in energy}
b: dict[datetime, Bill] = {}
env = Environment()
energy_expr = env.compile_expression(f"({rules.energy_formula})|float")
power_expr = env.compile_expression(f"({rules.power_formula})|float")
others_expr = env.compile_expression(f"({rules.others_formula})|float")
for dt in e.keys():
c = get_contract_for_dt(contracts, dt)
if not c:
continue
p1_kw = c.power_p1
p2_kw = c.power_p2
if not p1_kw or not p2_kw:
continue
bill = Bill(datetime=dt, delta_h=1)
params = rules.model_dump()
params["p1_kw"] = p1_kw
params["p2_kw"] = p2_kw
params["kwh"] = e[dt].value_kWh
tariff = get_tariff(dt)
if tariff == 1:
params["kwh_eur"] = rules.p1_kwh_eur
elif tariff == 2:
params["kwh_eur"] = rules.p2_kwh_eur
elif tariff == 3:
params["kwh_eur"] = rules.p3_kwh_eur
energy_term = energy_expr(**params)
power_term = power_expr(**params)
others_term = others_expr(**params)
if energy_term:
bill.energy_term = round(energy_term, 6)
if power_term:
bill.power_term = round(power_term, 6)
if others_term:
bill.others_term = round(others_term, 6)
bill.value_eur = bill.energy_term + bill.power_term + bill.others_term
b[dt] = bill
return [x for x in b.values()]
async def _get_contracts(self) -> list[Contract]:
res = await self.db.list_contracts(self._cups)
return [x.data for x in res]
async def _get_energy(
self, start: datetime | None = None, end: datetime | None = None
) -> list[Energy]:
res = await self.db.list_energy(self._cups, start, end)
return [x.data for x in res]
async def _get_pvpc(
self, start: datetime | None = None, end: datetime | None = None
) -> list[EnergyPrice]:
res = await self.db.list_pvpc(start, end)
return [x.data for x in res]
async def _get_last_bill_dt(self) -> datetime | None:
"""Return the timestamp of the latest bill record."""
last_record = await self.db.get_last_bill(self._cups)
if last_record:
return last_record.datetime
"""Definition of a service for telemetry data handling."""
import asyncio
import calendar
import logging
import os
import typing
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import gettempdir
from dateutil import relativedelta
from edata.core.utils import get_db_path, get_day, get_month, get_tariff, redacted_cups
from edata.database.controller import EdataDB
from edata.models import Contract, Energy, Power, Statistics, Supply
from edata.models.bill import EnergyPrice
from edata.providers import DatadisConnector, REDataConnector
_LOGGER = logging.getLogger(__name__)
class DataService:
"Definition of an energy and power data service based on Datadis."
def __init__(
self,
cups: str,
datadis_user: str,
datadis_pwd: str,
storage_path: str,
datadis_authorized_nif: str | None = None,
) -> None:
self.datadis = DatadisConnector(
datadis_user, datadis_pwd, storage_path=storage_path
)
self.redata = REDataConnector()
# params
self._cups = cups
self._scups = redacted_cups(cups)
self._authorized_nif = datadis_authorized_nif
self._measurement_type = "0"
if self._authorized_nif and self._authorized_nif == datadis_user:
_LOGGER.warning(
"Ignoring authorized NIF parameter since it matches the username"
)
self._authorized_nif = None
self.db = EdataDB(get_db_path(storage_path))
# data (in-memory cache)
self._supplies: list[Supply] = []
self._contracts: list[Contract] = []
async def get_supplies(self) -> list[Supply]:
"""Return the list of supplies."""
res = await self.db.list_supplies()
return [x.data for x in res]
async def get_supply(self) -> Supply | None:
res = await self.db.get_supply(self._cups)
if res:
return res.data
return None
async def get_contracts(self) -> list[Contract]:
"""Return a list of contracts for the selected cups."""
res = await self.db.list_contracts(self._cups)
return [x.data for x in res]
async def get_energy(
self, start: datetime | None = None, end: datetime | None = None
) -> list[Energy]:
"""Return a list of energy records for the selected cups."""
res = await self.db.list_energy(self._cups, start, end)
return [x.data for x in res]
async def get_power(
self, start: datetime | None = None, end: datetime | None = None
) -> list[Power]:
"""Return a list of power records for the selected cups."""
res = await self.db.list_power(self._cups, start, end)
return [x.data for x in res]
async def get_pvpc(
self, start: datetime | None = None, end: datetime | None = None
) -> list[EnergyPrice]:
"""Return a list of pvpc records (energy prices) for the selected cups."""
res = await self.db.list_pvpc(start, end)
return [x.data for x in res]
async def get_statistics(
self,
type_: typing.Literal["day", "month"],
start: datetime | None = None,
end: datetime | None = None,
complete: bool | None = None,
) -> list[Statistics]:
"""Return a list of statistics records for the selected cups."""
data = await self.db.list_statistics(self._cups, type_, start, end, complete)
return [x.data for x in data]
async def fix_missing_statistics(self):
"""Recompile statistics to fix missing data."""
missing = await self._find_missing_stats()
for day in missing:
_LOGGER.info("%s updating daily statistics for date %s", self._scups, day)
end = day + relativedelta.relativedelta(day=1) - timedelta(hours=1)
await self._update_daily_statistics(day, end)
missing_months = await self._find_missing_stats("month")
for month in missing_months:
_LOGGER.info(
"%s updating monthly statistics for date %s", self._scups, month
)
end = month + relativedelta.relativedelta(month=1) - timedelta(hours=1)
await self._update_monthly_statistics(month, end)
async def update(
self,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> bool:
"""Update all missing data within optional date ranges."""
await self._sync()
cups = self._cups
scups = self._scups
# update supplies
await self.update_supplies()
if not self._supplies:
_LOGGER.error(
"%s unable to retrieve any supplies, this is likely due to credential errors (check previous logs) or temporary Datadis unavailability",
scups,
)
return False
# find requested cups in supplies
supply = self._find_supply_for_cups(cups)
if not supply:
_LOGGER.error(
(
"%s the selected supply is not found in the provided account, got: %s."
"This may be expected if you have just registered this cups since it takes a while to appear,"
"otherwise check Datadis website and copypaste the CUPS to avoid type errors"
),
scups,
[redacted_cups(x.cups) for x in self._supplies],
)
return False
_LOGGER.info(
"%s the selected supply is available from %s to %s",
scups,
supply.date_start,
supply.date_end,
)
if not start_date:
start_date = supply.date_start
_LOGGER.debug(
"%s automatically setting start date as supply date start", self._scups
)
if not end_date:
end_date = datetime.today()
_LOGGER.debug("%s automatically setting end date as today", self._scups)
_LOGGER.info(
"%s data will be updated from %s to %s",
scups,
start_date,
end_date,
)
# update contracts to get valid periods
await self.update_contracts()
if not self._contracts:
_LOGGER.warning(
"%s unable to update contracts, edata will assume that the selected supply has no contractual issues",
scups,
)
# update energy records
last_energy_dt = await self._get_last_energy_dt()
if last_energy_dt:
_LOGGER.info(
"%s the latest known energy timestamp is %s",
self._scups,
last_energy_dt,
)
# look for missing dates
missing_days = await self._find_missing_stats()
missing_days = [
x for x in missing_days if x >= start_date and x <= end_date
]
_LOGGER.info(
"%s the following days are missing energy data %s",
self._scups,
missing_days,
)
for day in missing_days:
_LOGGER.info("%s trying to update energy for %s", scups, day)
await self.update_energy(day, day + timedelta(days=1))
# and recreate statistics
await self.fix_missing_statistics()
# fetch upstream records
await self.update_energy(last_energy_dt + timedelta(hours=1), end_date)
else:
# we have no data yet, fetch from start
await self.update_energy(start_date, end_date)
# update power records
await self.update_power(start_date, end_date)
# fetch pvpc data
await self.update_pvpc(start_date, end_date)
# update new statistics
await self.update_statistics(start_date, end_date)
return True
async def login(self):
"""Test login at Datadis."""
return await self.datadis.async_login()
async def update_supplies(self):
"""Update the list of supplies for the configured user."""
self._supplies = await self.datadis.async_get_supplies(self._authorized_nif)
for s in self._supplies:
await self.db.add_supply(s)
async def update_contracts(self) -> bool:
"""Update the list of contracts for the selected cups."""
cups = self._cups
supply = self._find_supply_for_cups(cups)
if supply:
self._contracts = await self.datadis.async_get_contract_detail(
cups, supply.distributorCode, self._authorized_nif
)
for c in self._contracts:
await self.db.add_contract(self._cups, c)
return True
_LOGGER.warning("Unable to fetch contract details for %s", self._scups)
return False
async def update_energy(self, start: datetime, end: datetime):
"""Update the list of energy consumptions for the selected cups."""
cups = self._cups
supply = self._find_supply_for_cups(cups)
if supply:
data = await self.datadis.async_get_consumption_data(
cups,
supply.distributorCode,
start,
end,
self._measurement_type,
supply.pointType,
self._authorized_nif,
)
await self.db.add_energy_list(self._cups, data)
return True
_LOGGER.warning("Unable to fetch energy data for %s", self._scups)
return False
async def update_power(self, start: datetime, end: datetime):
"""Update the list of power peaks for the selected cups."""
cups = self._cups
supply = self._find_supply_for_cups(cups)
if supply:
data = await self.datadis.async_get_max_power(
cups,
supply.distributorCode,
start,
end,
self._authorized_nif,
)
await self.db.add_power_list(self._cups, data)
return True
_LOGGER.warning("Unable to fetch power data for %s", self._scups)
return False
async def update_pvpc(self, start: datetime, end: datetime):
"""Update recent pvpc prices."""
cups = self._cups
min_date = get_day(datetime.now()) - timedelta(days=28)
if start < min_date:
start = min_date
if end < min_date:
# end date out of bounds
return False
if _pvpc_dt := await self._get_last_pvpc_dt():
start = _pvpc_dt + timedelta(hours=1)
if start >= end:
_LOGGER.info("%s pvpc prices are already synced", self._scups)
# data is already synced
return True
end = get_day(end) + timedelta(hours=23, minutes=59)
prices = await self.redata.async_get_realtime_prices(start, end)
if prices:
await self.db.add_pvpc_list(prices)
return True
_LOGGER.warning("%s unable to fetch pvpc prices", self._scups)
return False
async def update_statistics(self, start: datetime, end: datetime):
"""Update the statistics during a period."""
await self._update_daily_statistics(start, end)
await self._update_monthly_statistics(start, end)
async def _update_daily_statistics(self, start: datetime, end: datetime):
"""Update daily statistics within a date range."""
day_start = get_day(start)
day_end = end
daily = await self.db.list_statistics(self._cups, "day", day_start, day_end)
complete = []
for stat in daily:
if stat.complete:
complete.append(stat.datetime)
continue
data = await self.get_energy(day_start, day_end)
stats = await asyncio.to_thread(
self._compile_statistics, data, get_day, skip=complete
)
for stat in stats:
is_complete = stat.delta_h == 24
await self.db.add_statistics(self._cups, "day", stat, is_complete)
if not is_complete:
_LOGGER.info(
"%s daily statistics for %s are incomplete",
self._scups,
stat.datetime.date(),
)
async def _update_monthly_statistics(self, start: datetime, end: datetime):
"""Update monthly statistics within a date range."""
month_start = get_month(start)
month_end = end
monthly = await self.db.list_statistics(
self._cups, "month", month_start, month_end
)
complete = []
for stat in monthly:
if stat.complete:
complete.append(stat.datetime)
continue
data = await self.get_energy(month_start, month_end)
stats = await asyncio.to_thread(
self._compile_statistics, data, get_month, skip=complete
)
for stat in stats:
target_hours = (
calendar.monthrange(stat.datetime.year, stat.datetime.month)[1] * 24
)
is_complete = stat.delta_h == target_hours
await self.db.add_statistics(self._cups, "month", stat, is_complete)
if not is_complete:
_LOGGER.info(
"%s monthly statistics for %s are incomplete",
self._scups,
stat.datetime.date(),
)
def _compile_statistics(
self,
data: list[Energy],
agg: typing.Callable[[datetime], datetime],
wanted: list[datetime] | None = None,
skip: list[datetime] | None = None,
) -> list[Statistics]:
"""Return the aggregated energy data."""
if not wanted:
wanted = []
if not skip:
skip = []
agg_data = {}
for item in data:
agg_dt = agg(item.datetime)
is_wanted = agg_dt in wanted or agg_dt not in skip
if not is_wanted:
continue
tariff = get_tariff(item.datetime)
if agg_dt not in agg_data:
agg_data[agg_dt] = Statistics(
datetime=agg_dt,
delta_h=0,
value_kWh=0,
value_p1_kWh=0,
value_p2_kWh=0,
value_p3_kWh=0,
surplus_kWh=0,
surplus_p1_kWh=0,
surplus_p2_kWh=0,
surplus_p3_kWh=0,
)
ref = agg_data[agg_dt]
ref.delta_h += item.delta_h
ref.value_kWh += item.value_kWh
ref.surplus_kWh += item.surplus_kWh
if 1 == tariff:
ref.value_p1_kWh += item.value_kWh
ref.surplus_p1_kWh += item.surplus_kWh
elif 2 == tariff:
ref.value_p2_kWh += item.value_kWh
ref.surplus_p2_kWh += item.surplus_kWh
elif 3 == tariff:
ref.value_p3_kWh += item.value_kWh
ref.surplus_p3_kWh += item.surplus_kWh
return [agg_data[x] for x in agg_data]
async def _find_missing_stats(self, agg: typing.Literal["day", "month"] = "day"):
"""Return the list of days that are missing energy data."""
stats = await self.get_statistics(agg, complete=False)
return [x.datetime for x in stats]
def _find_supply_for_cups(self, cups: str) -> Supply | None:
"""Return the supply that matches the provided cups."""
for supply in self._supplies:
if supply.cups == cups:
return supply
async def _get_last_energy_dt(self) -> datetime | None:
"""Return the timestamp of the latest energy record."""
last_record = await self.db.get_last_energy(self._cups)
if last_record:
return last_record.datetime
async def _get_last_pvpc_dt(self) -> datetime | None:
"""Return the timestamp of the latest pvpc record."""
last_record = await self.db.get_last_pvpc()
if last_record:
return last_record.datetime
async def _sync(self):
"""Load state."""
self._supplies = await self.get_supplies()
self._contracts = await self.get_contracts()
import json
import os
from tempfile import gettempdir
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from syrupy.assertion import SnapshotAssertion
from edata.models.bill import BillingRules
from edata.models.data import Energy, Power
from edata.models.supply import Contract, Supply
from edata.services.bill_service import BillService
from edata.services.data_service import DataService
ASSETS = os.path.join(os.path.dirname(__file__), "assets")
def load_json(filename):
with open(os.path.join(ASSETS, filename), encoding="utf-8") as f:
return json.load(f)
def load_models(filename, model):
return [model(**d) for d in load_json(filename)]
@pytest.fixture(scope="module")
def storage_dir():
return gettempdir()
@pytest.fixture(scope="module")
def supplies():
return load_models("supplies.json", Supply)
@pytest.fixture(scope="module")
def contracts():
return load_models("contracts.json", Contract)
@pytest.fixture(scope="module")
def energy():
return load_models("energy.json", Energy)
@pytest.fixture(scope="module")
def power():
return load_models("power.json", Power)
@pytest.fixture(scope="module")
def mock_connector(supplies, contracts, energy, power):
mock = AsyncMock()
mock.async_login.return_value = True
mock.async_get_supplies.return_value = supplies
mock.async_get_contract_detail.return_value = contracts
mock.async_get_consumption_data.return_value = sorted(
energy, key=lambda x: x.datetime
)
mock.async_get_max_power.return_value = power
return mock
@pytest_asyncio.fixture(scope="module", loop_scope="module")
async def populated_data_service(mock_connector, energy, power, storage_dir):
with patch("edata.services.data_service.DatadisConnector") as mock_connector_class:
mock_connector_class.side_effect = lambda *a, **k: mock_connector
ds = DataService(
"ESXXXXXXXXXXXXXXXXTEST", "user", "pwd", storage_path=storage_dir
)
await ds.login()
await ds.update_supplies()
await ds.update_contracts()
all_energy = sorted(energy, key=lambda x: x.datetime)
start_date = all_energy[0].datetime
end_date = all_energy[-1].datetime
await ds.update_energy(start_date, end_date)
await ds.update_power(start_date, end_date)
await ds._update_daily_statistics(start_date, end_date)
await ds._update_monthly_statistics(start_date, end_date)
return ds
@pytest.mark.asyncio
async def test_data_fetch(
populated_data_service, snapshot: SnapshotAssertion, energy, power
):
ds = populated_data_service
assert len(await ds.get_energy()) == len(energy)
assert len(await ds.get_power()) == len(power)
result_all = {
"supplies": ds._supplies,
"contracts": ds._contracts,
"energy": await ds.get_energy(),
"power": await ds.get_power(),
}
assert result_all == snapshot
@pytest.mark.asyncio
async def test_get_daily_energy(populated_data_service, snapshot: SnapshotAssertion):
ds = populated_data_service
daily = await ds.get_statistics("day")
assert daily == snapshot
@pytest.mark.asyncio
async def test_get_monthly_energy(populated_data_service, snapshot: SnapshotAssertion):
ds = populated_data_service
monthly = await ds.get_statistics("month")
assert monthly == snapshot
@pytest.mark.asyncio
async def test_bill_service(
populated_data_service, snapshot: SnapshotAssertion, energy, storage_dir
):
# Ensure data service is populated
ds = populated_data_service
# Initialize BillService with same configuration
bs = BillService("ESXXXXXXXXXXXXXXXXTEST", storage_path=storage_dir)
all_energy = sorted(energy, key=lambda x: x.datetime)
start_date = all_energy[0].datetime
end_date = all_energy[-1].datetime
rules = BillingRules(
p1_kwh_eur=0.20,
p2_kwh_eur=0.20,
p3_kwh_eur=0.20,
p1_kw_year_eur=20,
p2_kw_year_eur=10,
)
# Run update
await bs.update(start_date, end_date, rules, is_pvpc=False)
# Verify bills were created
bills = await bs.get_bills(start_date, end_date)
assert len(bills) > 0
# Verify statistics were updated
daily_stats = await bs.get_bills(start_date, end_date, "day")
assert len(daily_stats) > 0
monthly_stats = await bs.get_bills(start_date, end_date, "month")
assert len(monthly_stats) > 0
assert monthly_stats == snapshot
+120
-76
Metadata-Version: 2.4
Name: e-data
Version: 1.3.3
Version: 2.0.0.dev112
Summary: Python library for managing spanish energy data from various web providers

@@ -684,19 +684,21 @@ Author-email: VMG <vmayorg@outlook.es>

Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.11.0
Requires-Python: >=3.12.0
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: dateparser>=1.1.2
Requires-Dist: freezegun>=1.2.1
Requires-Dist: holidays>=0.14.2
Requires-Dist: pytest>=7.1.2
Requires-Dist: python_dateutil>=2.8.2
Requires-Dist: requests>=2.28.1
Requires-Dist: voluptuous>=0.13.1
Requires-Dist: Jinja2>=3.1.2
Requires-Dist: diskcache>=5.6.3
Requires-Dist: aiohttp>=3.12.15
Requires-Dist: aiohttp<4,>=3.12
Requires-Dist: dateparser<2,>=1.2
Requires-Dist: diskcache<6,>=5.6
Requires-Dist: freezegun<2,>=1.5
Requires-Dist: holidays<1,>=0.50
Requires-Dist: Jinja2<4,>=3.1
Requires-Dist: pydantic<3,>=2.10
Requires-Dist: python_dateutil<3,>=2.8
Requires-Dist: Requests<3,>=2.31
Requires-Dist: SQLAlchemy<3,>=2.0
Requires-Dist: sqlmodel<0.1,>=0.0.22
Requires-Dist: typer<1,>=0.12
Requires-Dist: aiosqlite<1,>=0.21
Dynamic: license-file

@@ -710,3 +712,3 @@

Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).
Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).

@@ -731,81 +733,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_

El paquete consta de tres módulos diferenciados:
El paquete consta de varios módulos:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
* **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos.
- `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros).
- `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real).
Estos módulos corresponden a la siguiente estructura del paquete:
* **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos.
- `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas).
- `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes.
```
edata/
· __init__.py
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
```
* **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic.
- `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`).
- `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`).
- `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`).
* **Base de datos** (`database`): Gestión de persistencia local con SQLite.
- `controller.py`: Controlador principal de la base de datos (`EdataDB`).
- `models.py`: Modelos SQLModel para las tablas de la base de datos.
- `queries.py`: Consultas SQL predefinidas.
* **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc.
* **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas.
## Ejemplo de uso
Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones:
### Usando los servicios (recomendado)
Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones:
* No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto)
* El username suele ser el NIF del titular
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura.
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular.
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular
``` python
import asyncio
from datetime import datetime
import json
from edata.services.data_service import DataService
from edata.services.bill_service import BillService
from edata.models.bill import PVPCBillingRules
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
async def main():
# Crear el servicio de datos
data_service = DataService(
cups="ES0000000000000000XX", # Tu CUPS
datadis_user="12345678A", # Tu NIF/usuario
datadis_pwd="tu_password",
storage_path="./my_data" # Directorio para la BD
datadis_authorized_nif=None, # NIF autorizado (opcional)
)
# Actualizar todos los datos disponibles
await data_service.update()
# Obtener suministros y contratos
supplies = await data_service.get_supplies()
contracts = await data_service.get_contracts()
# Obtener datos de consumo y potencia
energy = await data_service.get_energy()
power = await data_service.get_power()
# Obtener estadísticas agregadas (diarias o mensuales)
daily_stats = await data_service.get_statistics("day")
monthly_stats = await data_service.get_statistics("month")
# Crear el servicio de facturación
bill_service = BillService(
cups="ES0000000000000000XX",
storage_path="./my_data" # Mismo directorio que data_service
)
# Definir reglas de facturación PVPC (valores por defecto españoles)
pvpc_rules = PVPCBillingRules(
p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año)
p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año)
meter_month_eur=0.81, # Alquiler contador (€/mes)
market_kw_year_eur=3.113, # Otros cargos (€/kW/año)
electricity_tax=1.0511300560, # Impuesto eléctrico
iva_tax=1.21 # IVA (21%)
)
# Actualizar facturación con PVPC
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 12, 31)
await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True)
# Obtener facturas calculadas
bills = await bill_service.get_bills(start_date, end_date, type_="hour")
daily_bills = await bill_service.get_bills(start_date, end_date, type_="day")
monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month")
# Mostrar resumen
total_cost = sum(b.value_eur for b in bills)
print(f"Coste total: {total_cost:.2f} €")
print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh")
# Preparar reglas de tarificación (si se quiere)
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
# podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc)
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
)
# Ejecutar
asyncio.run(main())
```
# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
### Usando la CLI
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
El paquete incluye una interfaz de línea de comandos para operaciones básicas:
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
``` bash
# Ver suministros disponibles
python -m edata.cli show-supplies <username>
# Imprimir atributos
print(edata.attributes)
# Descargar todos los datos de un CUPS
python -m edata.cli download-all --cups ES0000000000000000XX <username>
# Actualizar facturación con tarifa fija
python -m edata.cli update-bill \
--cups ES0000000000000000XX \
--p1-kw-year-eur 30.67 \
--p2-kw-year-eur 1.42 \
--p1-kwh-eur 0.15 \
--p2-kwh-eur 0.10 \
--p3-kwh-eur 0.08 \
--meter-month-eur 0.81
```

@@ -1,10 +0,13 @@

dateparser>=1.1.2
freezegun>=1.2.1
holidays>=0.14.2
pytest>=7.1.2
python_dateutil>=2.8.2
requests>=2.28.1
voluptuous>=0.13.1
Jinja2>=3.1.2
diskcache>=5.6.3
aiohttp>=3.12.15
aiohttp<4,>=3.12
dateparser<2,>=1.2
diskcache<6,>=5.6
freezegun<2,>=1.5
holidays<1,>=0.50
Jinja2<4,>=3.1
pydantic<3,>=2.10
python_dateutil<3,>=2.8
Requests<3,>=2.31
SQLAlchemy<3,>=2.0
sqlmodel<0.1,>=0.0.22
typer<1,>=0.12
aiosqlite<1,>=0.21

@@ -12,19 +12,22 @@ LICENSE

edata/cli.py
edata/const.py
edata/definitions.py
edata/helpers.py
edata/storage.py
edata/connectors/__init__.py
edata/connectors/datadis.py
edata/connectors/redata.py
edata/processors/__init__.py
edata/processors/base.py
edata/processors/billing.py
edata/processors/consumption.py
edata/processors/maximeter.py
edata/processors/utils.py
edata/core/__init__.py
edata/core/const.py
edata/core/utils.py
edata/database/__init__.py
edata/database/controller.py
edata/database/models.py
edata/database/queries.py
edata/database/utils.py
edata/models/__init__.py
edata/models/bill.py
edata/models/data.py
edata/models/supply.py
edata/providers/__init__.py
edata/providers/datadis.py
edata/providers/redata.py
edata/services/bill_service.py
edata/services/data_service.py
edata/tests/__init__.py
edata/tests/test_datadis_connector.py
edata/tests/test_helpers.py
edata/tests/test_processors.py
edata/tests/test_redata_connector.py
edata/tests/test_redata_connector.py
edata/tests/test_services.py

@@ -1,24 +0,145 @@

import typer
import asyncio
import logging
from getpass import getpass
from edata.connectors.datadis import DatadisConnector
import json
from typing import Annotated
def main():
"""CLI básica para mostrar supplies y contracts de Datadis."""
username = typer.prompt("Usuario (NIF)")
password = getpass("Contraseña: ")
import typer
from edata.models.bill import BillingRules
from edata.providers.datadis import DatadisConnector
from edata.services.bill_service import BillService
from edata.services.data_service import DataService
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = typer.Typer()
@app.command()
def show_supplies(username: str):
"""Show supplies and contracts for a given datadis user."""
password = getpass()
connector = DatadisConnector(username, password)
supplies = connector.get_supplies()
typer.echo("\nSupplies:")
typer.echo(json.dumps(supplies, indent=2, default=str))
for supply in supplies:
typer.echo(supply.model_dump_json())
if supplies:
cups = supplies[0]["cups"]
distributor = supplies[0]["distributorCode"]
cups = supplies[0].cups
distributor = supplies[0].distributorCode
contracts = connector.get_contract_detail(cups, distributor)
typer.echo("\nContracts:")
typer.echo(json.dumps(contracts, indent=2, default=str))
for contract in contracts:
typer.echo(contract.model_dump_json())
else:
typer.echo("No se encontraron supplies.")
typer.echo("We found no supplies.")
async def _download_all(
nif: str,
cups: str,
authorized_nif: str | None = None,
):
"""Download all data for a given datadis account and CUPS."""
password = getpass()
service = DataService(
cups,
nif,
password,
datadis_authorized_nif=authorized_nif,
storage_path="./edata_cli",
)
await service.update_supplies()
supply = await service.get_supply()
if supply:
await service.update()
@app.command()
def download_all(
nif: str,
cups: Annotated[str, typer.Option(help="The identifier of the Supply")],
authorized_nif: str | None = None,
):
"""Download all data for a given datadis account and CUPS."""
asyncio.run(_download_all(nif, cups, authorized_nif))
async def _update_custom_bill(
cups: str,
p1_kw_year_eur: float,
p2_kw_year_eur: float,
p1_kwh_eur: float,
p2_kwh_eur: float,
p3_kwh_eur: float,
meter_month_eur: float,
):
"""Download all data for a given datadis account and CUPS."""
bs = BillService(cups, storage_path="./edata_cli")
rules = BillingRules(
p1_kw_year_eur=p1_kw_year_eur,
p2_kw_year_eur=p2_kw_year_eur,
p1_kwh_eur=p1_kwh_eur,
p2_kwh_eur=p2_kwh_eur,
p3_kwh_eur=p3_kwh_eur,
meter_month_eur=meter_month_eur,
)
await bs.update(billing_rules=rules, is_pvpc=False)
@app.command()
def update_custom_bill(
cups: Annotated[str, typer.Option(help="The identifier of the Supply")],
p1_kw_year_eur: Annotated[
float, typer.Option(help="Price per kW at P1 tariff (by year)")
],
p2_kw_year_eur: Annotated[
float, typer.Option(help="Price per kW at P2 tariff (by year)")
],
p1_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P1 tariff")],
p2_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P2 tariff")],
p3_kwh_eur: Annotated[float, typer.Option(help="Price per kWh at P3 tariff")],
meter_month_eur: Annotated[float, typer.Option(help="Monthly cost of the meter")],
):
"""Download all data for a given datadis account and CUPS."""
asyncio.run(
_update_custom_bill(
cups,
p1_kw_year_eur,
p2_kw_year_eur,
p1_kwh_eur,
p2_kwh_eur,
p3_kwh_eur,
meter_month_eur,
)
)
async def _update_pvpc_bill(cups: str):
"""Download all data for a given datadis account and CUPS."""
bs = BillService(cups, storage_path="./edata_cli")
await bs.update(is_pvpc=True)
@app.command()
def update_pvpc_bill(
cups: Annotated[str, typer.Option(help="The identifier of the Supply")],
):
"""Download all data for a given datadis account and CUPS."""
asyncio.run(
_update_pvpc_bill(
cups,
)
)
if __name__ == "__main__":
typer.run(main)
app()
"""Tests for DatadisConnector (offline)."""
import datetime
import datetime
from unittest.mock import patch, AsyncMock, MagicMock
from ..connectors.datadis import DatadisConnector
from unittest.mock import AsyncMock, MagicMock, patch
from ..connectors.datadis import DatadisConnector
from edata.providers.datadis import DatadisConnector

@@ -73,6 +71,9 @@ MOCK_USERNAME = "USERNAME"

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_supplies(mock_token, mock_get, snapshot):
"""Test a successful 'get_supplies' query (syrupy snapshot)."""
"""Test a successful 'get_supplies' query."""
mock_response = MagicMock()

@@ -87,7 +88,8 @@ mock_response.status = 200

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_contract_detail(mock_token, mock_get, snapshot):
"""Test a successful 'get_contract_detail' query (syrupy snapshot)."""
"""Test a successful 'get_contract_detail' query."""
mock_response = MagicMock()

@@ -102,7 +104,8 @@ mock_response.status = 200

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_consumption_data(mock_token, mock_get, snapshot):
"""Test a successful 'get_consumption_data' query (syrupy snapshot)."""
"""Test a successful 'get_consumption_data' query."""
mock_response = MagicMock()

@@ -114,17 +117,21 @@ mock_response.status = 200

connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
assert connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
) == snapshot
assert (
connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
)
== snapshot
)
@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_max_power(mock_token, mock_get, snapshot):
"""Test a successful 'get_max_power' query (syrupy snapshot)."""
"""Test a successful 'get_max_power' query."""
mock_response = MagicMock()

@@ -136,15 +143,20 @@ mock_response.status = 200

connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
assert connector.get_max_power(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 3, 1, 0, 0, 0),
datetime.datetime(2022, 4, 1, 0, 0, 0),
None,
) == snapshot
assert (
connector.get_max_power(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 3, 1, 0, 0, 0),
datetime.datetime(2022, 4, 1, 0, 0, 0),
None,
)
== snapshot
)
@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_supplies_empty_response(mock_token, mock_get, snapshot):
"""Test get_supplies with empty response (syrupy snapshot)."""
"""Test get_supplies with empty response."""
mock_response = MagicMock()

@@ -160,3 +172,5 @@ mock_response.status = 200

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_supplies_malformed_response(mock_token, mock_get, snapshot):

@@ -175,5 +189,7 @@ """Test get_supplies with malformed response (missing required fields, syrupy snapshot)."""

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_supplies_partial_response(mock_token, mock_get, snapshot):
"""Test get_supplies with partial valid/invalid response (syrupy snapshot)."""
"""Test get_supplies with partial valid/invalid response."""
partial = [

@@ -192,5 +208,6 @@ SUPPLIES_RESPONSE[0],

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_consumption_data_cache(mock_token, mock_get, snapshot):

@@ -205,20 +222,26 @@ """Test get_consumption_data uses cache on second call (should not call HTTP again, syrupy snapshot)."""

# First call populates cache
assert connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
) == snapshot
assert (
connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
)
== snapshot
)
# Second call should use cache, not call HTTP again
mock_get.reset_mock()
assert connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
) == snapshot
assert (
connector.get_consumption_data(
"ESXXXXXXXXXXXXXXXXTEST",
"2",
datetime.datetime(2022, 10, 22, 0, 0, 0),
datetime.datetime(2022, 10, 22, 2, 0, 0),
"0",
5,
)
== snapshot
)
mock_get.assert_not_called()

@@ -228,5 +251,7 @@

@patch("aiohttp.ClientSession.get")
@patch.object(DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True)
@patch.object(
DatadisConnector, "_async_get_token", new_callable=AsyncMock, return_value=True
)
def test_get_supplies_optional_fields_none(mock_token, mock_get, snapshot):
"""Test get_supplies with optional fields as None (syrupy snapshot)."""
"""Test get_supplies with optional fields as None."""
response = [

@@ -252,2 +277,2 @@ {

connector = DatadisConnector(MOCK_USERNAME, MOCK_PASSWORD)
assert connector.get_supplies() == snapshot
assert connector.get_supplies() == snapshot

@@ -1,7 +0,8 @@

"""Tests for REData (online)"""
"""Tests for REDataConnector (online)"""
from datetime import datetime, timedelta
from ..connectors.redata import REDataConnector
from edata.providers.redata import REDataConnector
def test_get_realtime_prices():

@@ -8,0 +9,0 @@ """Test a successful 'get_realtime_prices' query"""

+120
-76
Metadata-Version: 2.4
Name: e-data
Version: 1.3.3
Version: 2.0.0.dev112
Summary: Python library for managing spanish energy data from various web providers

@@ -684,19 +684,21 @@ Author-email: VMG <vmayorg@outlook.es>

Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.11.0
Requires-Python: >=3.12.0
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: dateparser>=1.1.2
Requires-Dist: freezegun>=1.2.1
Requires-Dist: holidays>=0.14.2
Requires-Dist: pytest>=7.1.2
Requires-Dist: python_dateutil>=2.8.2
Requires-Dist: requests>=2.28.1
Requires-Dist: voluptuous>=0.13.1
Requires-Dist: Jinja2>=3.1.2
Requires-Dist: diskcache>=5.6.3
Requires-Dist: aiohttp>=3.12.15
Requires-Dist: aiohttp<4,>=3.12
Requires-Dist: dateparser<2,>=1.2
Requires-Dist: diskcache<6,>=5.6
Requires-Dist: freezegun<2,>=1.5
Requires-Dist: holidays<1,>=0.50
Requires-Dist: Jinja2<4,>=3.1
Requires-Dist: pydantic<3,>=2.10
Requires-Dist: python_dateutil<3,>=2.8
Requires-Dist: Requests<3,>=2.31
Requires-Dist: SQLAlchemy<3,>=2.0
Requires-Dist: sqlmodel<0.1,>=0.0.22
Requires-Dist: typer<1,>=0.12
Requires-Dist: aiosqlite<1,>=0.21
Dynamic: license-file

@@ -710,3 +712,3 @@

Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).
Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).

@@ -731,81 +733,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_

El paquete consta de tres módulos diferenciados:
El paquete consta de varios módulos:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
* **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos.
- `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros).
- `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real).
Estos módulos corresponden a la siguiente estructura del paquete:
* **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos.
- `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas).
- `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes.
```
edata/
· __init__.py
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
```
* **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic.
- `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`).
- `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`).
- `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`).
* **Base de datos** (`database`): Gestión de persistencia local con SQLite.
- `controller.py`: Controlador principal de la base de datos (`EdataDB`).
- `models.py`: Modelos SQLModel para las tablas de la base de datos.
- `queries.py`: Consultas SQL predefinidas.
* **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc.
* **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas.
## Ejemplo de uso
Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones:
### Usando los servicios (recomendado)
Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones:
* No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto)
* El username suele ser el NIF del titular
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura.
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular.
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular
``` python
import asyncio
from datetime import datetime
import json
from edata.services.data_service import DataService
from edata.services.bill_service import BillService
from edata.models.bill import PVPCBillingRules
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
async def main():
# Crear el servicio de datos
data_service = DataService(
cups="ES0000000000000000XX", # Tu CUPS
datadis_user="12345678A", # Tu NIF/usuario
datadis_pwd="tu_password",
storage_path="./my_data" # Directorio para la BD
datadis_authorized_nif=None, # NIF autorizado (opcional)
)
# Actualizar todos los datos disponibles
await data_service.update()
# Obtener suministros y contratos
supplies = await data_service.get_supplies()
contracts = await data_service.get_contracts()
# Obtener datos de consumo y potencia
energy = await data_service.get_energy()
power = await data_service.get_power()
# Obtener estadísticas agregadas (diarias o mensuales)
daily_stats = await data_service.get_statistics("day")
monthly_stats = await data_service.get_statistics("month")
# Crear el servicio de facturación
bill_service = BillService(
cups="ES0000000000000000XX",
storage_path="./my_data" # Mismo directorio que data_service
)
# Definir reglas de facturación PVPC (valores por defecto españoles)
pvpc_rules = PVPCBillingRules(
p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año)
p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año)
meter_month_eur=0.81, # Alquiler contador (€/mes)
market_kw_year_eur=3.113, # Otros cargos (€/kW/año)
electricity_tax=1.0511300560, # Impuesto eléctrico
iva_tax=1.21 # IVA (21%)
)
# Actualizar facturación con PVPC
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 12, 31)
await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True)
# Obtener facturas calculadas
bills = await bill_service.get_bills(start_date, end_date, type_="hour")
daily_bills = await bill_service.get_bills(start_date, end_date, type_="day")
monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month")
# Mostrar resumen
total_cost = sum(b.value_eur for b in bills)
print(f"Coste total: {total_cost:.2f} €")
print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh")
# Preparar reglas de tarificación (si se quiere)
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
# podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc)
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
)
# Ejecutar
asyncio.run(main())
```
# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
### Usando la CLI
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
El paquete incluye una interfaz de línea de comandos para operaciones básicas:
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
``` bash
# Ver suministros disponibles
python -m edata.cli show-supplies <username>
# Imprimir atributos
print(edata.attributes)
# Descargar todos los datos de un CUPS
python -m edata.cli download-all --cups ES0000000000000000XX <username>
# Actualizar facturación con tarifa fija
python -m edata.cli update-bill \
--cups ES0000000000000000XX \
--p1-kw-year-eur 30.67 \
--p2-kw-year-eur 1.42 \
--p1-kwh-eur 0.15 \
--p2-kwh-eur 0.10 \
--p3-kwh-eur 0.08 \
--meter-month-eur 0.81
```

@@ -7,3 +7,3 @@ [build-system]

name = "e-data"
version = "1.3.3"
version = "2.0.0.dev112"
description = "Python library for managing spanish energy data from various web providers"

@@ -15,7 +15,6 @@ readme = "README.md"

license = { file = "LICENSE" }
requires-python = ">=3.11.0"
requires-python = ">=3.12.0"
classifiers = [
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",

@@ -26,12 +25,15 @@ "Programming Language :: Python :: Implementation :: CPython",

dependencies = [
"dateparser>=1.1.2",
"freezegun>=1.2.1",
"holidays>=0.14.2",
"pytest>=7.1.2",
"python_dateutil>=2.8.2",
"requests>=2.28.1",
"voluptuous>=0.13.1",
"Jinja2>=3.1.2",
"diskcache>=5.6.3",
"aiohttp>=3.12.15"
"aiohttp>=3.12,<4",
"dateparser>=1.2,<2",
"diskcache>=5.6,<6",
"freezegun>=1.5,<2",
"holidays>=0.50,<1",
"Jinja2>=3.1,<4",
"pydantic>=2.10,<3",
"python_dateutil>=2.8,<3",
"Requests>=2.31,<3",
"SQLAlchemy>=2.0,<3",
"sqlmodel>=0.0.22,<0.1",
"typer>=0.12,<1",
"aiosqlite>=0.21,<1",
]

@@ -38,0 +40,0 @@

+105
-63

@@ -7,3 +7,3 @@ [![Downloads](https://pepy.tech/badge/e-data)](https://pepy.tech/project/e-data)

Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. A día de hoy sus capacidades de facturación (€) son limitadas, soporta PVPC (según disponibilidad de datos de REData) y tarificación fija por tramos. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).
Este paquete proporciona herramientas para la descarga de tus datos de consumo eléctrico (desde Datadis.es) y su posterior procesado. La motivación principal es que conocer el consumo puede ayudarnos a reducirlo, e incluso a elegir una tarifa que mejor se adapte a nuestras necesidades. Soporta facturación con PVPC (según disponibilidad de datos de REData) y tarificación fija personalizable mediante fórmulas. Todos los datos se almacenan localmente en una base de datos SQLite. Es el corazón de la integración [homeassistant-edata](https://github.com/uvejota/homeassistant-edata).

@@ -28,81 +28,123 @@ _**Esta herramienta no mantiene ningún tipo de vinculación con los proveedores de datos anteriormente mencionados, simplemente consulta la información disponible y facilita su posterior análisis.**_

El paquete consta de tres módulos diferenciados:
El paquete consta de varios módulos:
* **Conectores** (módulo `connectors`), para definir los métodos de consulta a los diferentes proveedores: Datadis y REData.
* **Procesadores** (módulo `processors`), para procesar datos de consumo, maxímetro, o coste (tarificación). Ahora mismo consta de tres procesadores: `billing`, `consumption` y `maximeter`, además de algunas utilidades ubicadas en `utils`. Los procesadores deben heredar de la clase Processor definida en `base.py`
* **Ayudantes** (módulo `helpers`), para ayudar en el uso y gestión de los anteriores, presentando de momento un único ayudante llamado `EdataHelper` que te permite recopilar `X` días de datos (por defecto 365) y automáticamente procesarlos. Los datos son almacenados en la variable `data`, mientras que los atributos autocalculados son almacenados en la variable `attributes`. Por lo general, primero utilizan los conectores y luego procesan los datos, gestionando varias tareas de recuperación (principalmente para Datadis).
* **Proveedores** (`providers`): Conectores para consultar diferentes fuentes de datos.
- `datadis.py`: Conector para la API privada de Datadis (datos de consumo, potencia, contratos y suministros).
- `redata.py`: Conector para la API pública de REData (precios PVPC en tiempo real).
Estos módulos corresponden a la siguiente estructura del paquete:
* **Servicios** (`services`): Capa de lógica de negocio que gestiona la sincronización y procesamiento de datos.
- `data_service.py`: Servicio para gestionar datos de telemetría (consumo, potencia, estadísticas).
- `bill_service.py`: Servicio para gestionar la facturación y cálculo de costes.
```
edata/
· __init__.py
· connectors/
· __init__.py
· datadis.py
· redata.py
· processors/
· __init__.py
· base.py
· billing.py
· consumption.py
· maximeter.py
· utils.py
· helpers.py
```
* **Modelos** (`models`): Definiciones de estructuras de datos usando Pydantic.
- `supply.py`: Modelos de suministros (`Supply`) y contratos (`Contract`).
- `data.py`: Modelos de datos de telemetría (`Energy`, `Power`, `Statistics`).
- `bill.py`: Modelos de facturación (`Bill`, `EnergyPrice`, `BillingRules`, `PVPCBillingRules`).
* **Base de datos** (`database`): Gestión de persistencia local con SQLite.
- `controller.py`: Controlador principal de la base de datos (`EdataDB`).
- `models.py`: Modelos SQLModel para las tablas de la base de datos.
- `queries.py`: Consultas SQL predefinidas.
* **Utilidades** (`core`): Funciones auxiliares para cálculos de tarifas, fechas, etc.
* **CLI** (`cli.py`): Interfaz de línea de comandos para operaciones básicas.
## Ejemplo de uso
Partimos de que tenemos credenciales en Datadis.es. Algunas aclaraciones:
### Usando los servicios (recomendado)
Partimos de que tenemos credenciales en [Datadis.es](https://datadis.es). Algunas aclaraciones:
* No es necesario solicitar API pública en el registro (se utilizará la API privada habilitada por defecto)
* El username suele ser el NIF del titular
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura.
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular.
* Copie el CUPS de la web de Datadis, algunas comercializadoras adhieren caracteres adicionales en el CUPS mostrado en su factura
* La herramienta acepta el uso de NIF autorizado para consultar el suministro de otro titular
``` python
import asyncio
from datetime import datetime
import json
from edata.services.data_service import DataService
from edata.services.bill_service import BillService
from edata.models.bill import PVPCBillingRules
# importamos definiciones de datos que nos interesen
from edata.definitions import PricingRules
# importamos el ayudante
from edata.helpers import EdataHelper
# importamos el procesador de utilidades
from edata.processors import utils
async def main():
# Crear el servicio de datos
data_service = DataService(
cups="ES0000000000000000XX", # Tu CUPS
datadis_user="12345678A", # Tu NIF/usuario
datadis_pwd="tu_password",
storage_path="./my_data" # Directorio para la BD
datadis_authorized_nif=None, # NIF autorizado (opcional)
)
# Actualizar todos los datos disponibles
await data_service.update()
# Obtener suministros y contratos
supplies = await data_service.get_supplies()
contracts = await data_service.get_contracts()
# Obtener datos de consumo y potencia
energy = await data_service.get_energy()
power = await data_service.get_power()
# Obtener estadísticas agregadas (diarias o mensuales)
daily_stats = await data_service.get_statistics("day")
monthly_stats = await data_service.get_statistics("month")
# Crear el servicio de facturación
bill_service = BillService(
cups="ES0000000000000000XX",
storage_path="./my_data" # Mismo directorio que data_service
)
# Definir reglas de facturación PVPC (valores por defecto españoles)
pvpc_rules = PVPCBillingRules(
p1_kw_year_eur=30.67266, # Término potencia P1 (€/kW/año)
p2_kw_year_eur=1.4243591, # Término potencia P2 (€/kW/año)
meter_month_eur=0.81, # Alquiler contador (€/mes)
market_kw_year_eur=3.113, # Otros cargos (€/kW/año)
electricity_tax=1.0511300560, # Impuesto eléctrico
iva_tax=1.21 # IVA (21%)
)
# Actualizar facturación con PVPC
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 12, 31)
await bill_service.update(start_date, end_date, billing_rules=pvpc_rules, is_pvpc=True)
# Obtener facturas calculadas
bills = await bill_service.get_bills(start_date, end_date, type_="hour")
daily_bills = await bill_service.get_bills(start_date, end_date, type_="day")
monthly_bills = await bill_service.get_bills(start_date, end_date, type_="month")
# Mostrar resumen
total_cost = sum(b.value_eur for b in bills)
print(f"Coste total: {total_cost:.2f} €")
print(f"Consumo total: {sum(e.value_kWh for e in energy):.2f} kWh")
# Preparar reglas de tarificación (si se quiere)
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
# podemos rellenar los siguientes campos si quisiéramos precio fijo (y no pvpc)
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
)
# Ejecutar
asyncio.run(main())
```
# Instanciar el helper
# 'authorized_nif' permite indicar el NIF de la persona que nos autoriza a consultar su CUPS.
# 'data' permite "cargar" al helper datos anteriores (resultado edata.data de una ejecución anterior), para evitar volver a consultar los mismos.
edata = EdataHelper(
"datadis_user",
"datadis_password",
"cups",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC, # si se le pasa None, no aplica tarificación
data=None, # aquí podríamos cargar datos anteriores
)
### Usando la CLI
# Solicitar actualización de todo el histórico (se almacena en edata.data)
edata.update(date_from=datetime(1970, 1, 1), date_to=datetime.today())
El paquete incluye una interfaz de línea de comandos para operaciones básicas:
# volcamos todo lo obtenido a un fichero
with open("backup.json", "w") as file:
json.dump(utils.serialize_dict(edata.data), file) # se puede utilizar deserialize_dict para la posterior lectura del backup
``` bash
# Ver suministros disponibles
python -m edata.cli show-supplies <username>
# Imprimir atributos
print(edata.attributes)
# Descargar todos los datos de un CUPS
python -m edata.cli download-all --cups ES0000000000000000XX <username>
# Actualizar facturación con tarifa fija
python -m edata.cli update-bill \
--cups ES0000000000000000XX \
--p1-kw-year-eur 30.67 \
--p2-kw-year-eur 1.42 \
--p1-kwh-eur 0.15 \
--p2-kwh-eur 0.10 \
--p3-kwh-eur 0.08 \
--meter-month-eur 0.81
```
"""Datadis API connector.
To fetch data from datadis.es private API.
There a few issues that are workarounded:
- You have to wait 24h between two identical requests.
- Datadis server does not like ranges greater than 1 month.
"""
import contextlib
from datetime import datetime, timedelta
import hashlib
import logging
import os
import tempfile
import diskcache
from dateutil.relativedelta import relativedelta
import aiohttp
import asyncio
from ..definitions import ConsumptionData, ContractData, MaxPowerData, SupplyData
from ..processors import utils
_LOGGER = logging.getLogger(__name__)
# Token-related constants
URL_TOKEN = "https://datadis.es/nikola-auth/tokens/login"
TOKEN_USERNAME = "username"
TOKEN_PASSWD = "password"
# Supplies-related constants
URL_GET_SUPPLIES = "https://datadis.es/api-private/api/get-supplies"
GET_SUPPLIES_MANDATORY_FIELDS = [
"cups",
"validDateFrom",
"validDateTo",
"pointType",
"distributorCode",
]
# Contracts-related constants
URL_GET_CONTRACT_DETAIL = "https://datadis.es/api-private/api/get-contract-detail"
GET_CONTRACT_DETAIL_MANDATORY_FIELDS = [
"startDate",
"endDate",
"marketer",
"contractedPowerkW",
]
# Consumption-related constants
URL_GET_CONSUMPTION_DATA = "https://datadis.es/api-private/api/get-consumption-data"
GET_CONSUMPTION_DATA_MANDATORY_FIELDS = [
"time",
"date",
"consumptionKWh",
"obtainMethod",
]
MAX_CONSUMPTIONS_MONTHS = (
1 # max consumptions in a single request (fixed to 1 due to datadis limitations)
)
# Maximeter-related constants
URL_GET_MAX_POWER = "https://datadis.es/api-private/api/get-max-power"
GET_MAX_POWER_MANDATORY_FIELDS = ["time", "date", "maxPower"]
# Timing constants
TIMEOUT = 3 * 60 # requests timeout
QUERY_LIMIT = timedelta(hours=24) # a datadis limitation, again...
# Cache-related constants
RECENT_CACHE_SUBDIR = "cache"
def migrate_storage(storage_dir):
"""Migrate storage from older versions."""
with contextlib.suppress(FileNotFoundError):
os.remove(os.path.join(storage_dir, "edata_recent_queries.json"))
os.remove(os.path.join(storage_dir, "edata_recent_queries_cache.json"))
class DatadisConnector:
"""A Datadis private API connector."""
def __init__(
self,
username: str,
password: str,
enable_smart_fetch: bool = True,
storage_path: str | None = None,
) -> None:
self._usr = username
self._pwd = password
self._token = {}
self._smart_fetch = enable_smart_fetch
self._recent_queries = {}
self._recent_cache = {}
self._warned_queries = []
if storage_path is not None:
self._recent_cache_dir = os.path.join(storage_path, RECENT_CACHE_SUBDIR)
migrate_storage(storage_path)
else:
self._recent_cache_dir = os.path.join(
tempfile.gettempdir(), RECENT_CACHE_SUBDIR
)
os.makedirs(self._recent_cache_dir, exist_ok=True)
self._cache = diskcache.Cache(self._recent_cache_dir)
def _update_recent_queries(self, query: str, data: dict | None = None) -> None:
"""Cache a successful query to avoid exceeding query limits (diskcache)."""
hash_query = hashlib.md5(query.encode()).hexdigest()
try:
self._cache.set(hash_query, data, expire=QUERY_LIMIT.total_seconds())
_LOGGER.info("Updating cache item '%s'", hash_query)
except Exception as e:
_LOGGER.warning("Unknown error while updating cache: %s", e)
def _is_recent_query(self, query: str) -> bool:
"""Check if a query has been done recently to avoid exceeding query limits (diskcache)."""
hash_query = hashlib.md5(query.encode()).hexdigest()
return hash_query in self._cache
def _get_cache_for_query(self, query: str) -> dict | None:
"""Return cached response for a query (diskcache)."""
hash_query = hashlib.md5(query.encode()).hexdigest()
try:
return self._cache.get(hash_query, default=None)
except Exception:
return None
async def _async_get_token(self):
"""Private async method that fetches a new token if needed."""
_LOGGER.info("No token found, fetching a new one")
is_valid_token = False
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(
URL_TOKEN,
data={
TOKEN_USERNAME: self._usr,
TOKEN_PASSWD: self._pwd,
},
) as response:
text = await response.text()
if response.status == 200:
self._token["encoded"] = text
self._token["headers"] = {"Authorization": "Bearer " + self._token["encoded"]}
is_valid_token = True
else:
_LOGGER.error("Unknown error while retrieving token, got %s", text)
except Exception as e:
_LOGGER.error("Exception while retrieving token: %s", e)
return is_valid_token
def login(self):
"""Test to login with provided credentials (sync wrapper)."""
return asyncio.run(self._async_get_token())
async def _async_get(
self,
url: str,
request_data: dict | None = None,
refresh_token: bool = False,
is_retry: bool = False,
ignore_recent_queries: bool = False,
):
"""Async get request for Datadis API."""
if request_data is None:
data = {}
else:
data = request_data
is_valid_token = False
response = []
if refresh_token:
is_valid_token = await self._async_get_token()
if is_valid_token or not refresh_token:
params = "?" if len(data) > 0 else ""
for param in data:
key = param
value = data[param]
params = params + f"{key}={value}&"
anonym_params = "?" if len(data) > 0 else ""
for anonym_param in data:
key = anonym_param
if key == "cups":
value = "xxxx" + str(data[anonym_param])[-5:]
elif key == "authorizedNif":
value = "xxxx"
else:
value = data[anonym_param]
anonym_params = anonym_params + f"{key}={value}&"
if not ignore_recent_queries and self._is_recent_query(url + params):
_cache = self._get_cache_for_query(url + params)
if _cache is not None:
_LOGGER.info(
"Returning cached response for '%s'", url + anonym_params
)
return _cache
return []
try:
_LOGGER.info("GET %s", url + anonym_params)
headers = {"Accept-Encoding": "identity"}
if self._token.get("headers"):
headers.update(self._token["headers"])
timeout = aiohttp.ClientTimeout(total=TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
url + params,
headers=headers,
) as reply:
text = await reply.text()
if reply.status == 200:
_LOGGER.info("Got 200 OK")
try:
json_data = await reply.json(content_type=None)
if json_data:
response = json_data
if not ignore_recent_queries:
self._update_recent_queries(url + params, response)
else:
_LOGGER.info("Got an empty response")
if not ignore_recent_queries:
self._update_recent_queries(url + params)
except Exception as e:
_LOGGER.warning("Failed to parse JSON response")
elif reply.status == 401 and not refresh_token:
response = await self._async_get(
url,
request_data=data,
refresh_token=True,
ignore_recent_queries=ignore_recent_queries,
)
elif reply.status == 429:
_LOGGER.warning(
"Got status code '%s' with message '%s'",
reply.status,
text,
)
if not ignore_recent_queries:
self._update_recent_queries(url + params)
elif is_retry:
if (url + params) not in self._warned_queries:
_LOGGER.warning(
"Got status code '%s' with message '%s'. %s. %s",
reply.status,
text,
"Query temporary disabled",
"Future 500 code errors for this query will be silenced until restart",
)
if not ignore_recent_queries:
self._update_recent_queries(url + params)
self._warned_queries.append(url + params)
else:
response = await self._async_get(
url,
request_data,
is_retry=True,
ignore_recent_queries=ignore_recent_queries,
)
except asyncio.TimeoutError:
_LOGGER.warning("Timeout at %s", url + anonym_params)
return []
except Exception as e:
_LOGGER.warning("Exception at %s: %s", url + anonym_params, e)
return []
return response
async def async_get_supplies(self, authorized_nif: str | None = None):
data = {}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(
URL_GET_SUPPLIES, request_data=data, ignore_recent_queries=True
)
supplies = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_SUPPLIES_MANDATORY_FIELDS):
supplies.append(
SupplyData(
cups=i["cups"],
date_start=datetime.strptime(
(
i["validDateFrom"]
if i["validDateFrom"] != ""
else "1970/01/01"
),
"%Y/%m/%d",
),
date_end=datetime.strptime(
(
i["validDateTo"]
if i["validDateTo"] != ""
else tomorrow_str
),
"%Y/%m/%d",
),
address=i.get("address", None),
postal_code=i.get("postalCode", None),
province=i.get("province", None),
municipality=i.get("municipality", None),
distributor=i.get("distributor", None),
pointType=i["pointType"],
distributorCode=i["distributorCode"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching supplies data, got %s",
response,
)
return supplies
def get_supplies(self, authorized_nif: str | None = None):
"""Datadis 'get_supplies' query (sync wrapper)."""
return asyncio.run(self.async_get_supplies(authorized_nif=authorized_nif))
async def async_get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
):
data = {"cups": cups, "distributorCode": distributor_code}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(
URL_GET_CONTRACT_DETAIL, request_data=data, ignore_recent_queries=True
)
contracts = []
tomorrow_str = (datetime.today() + timedelta(days=1)).strftime("%Y/%m/%d")
for i in response:
if all(k in i for k in GET_CONTRACT_DETAIL_MANDATORY_FIELDS):
contracts.append(
ContractData(
date_start=datetime.strptime(
i["startDate"] if i["startDate"] != "" else "1970/01/01",
"%Y/%m/%d",
),
date_end=datetime.strptime(
i["endDate"] if i["endDate"] != "" else tomorrow_str,
"%Y/%m/%d",
),
marketer=i["marketer"],
distributorCode=distributor_code,
power_p1=(
i["contractedPowerkW"][0]
if isinstance(i["contractedPowerkW"], list)
else None
),
power_p2=(
i["contractedPowerkW"][1]
if (len(i["contractedPowerkW"]) > 1)
else None
),
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching contracts data, got %s",
response,
)
return contracts
def get_contract_detail(
self, cups: str, distributor_code: str, authorized_nif: str | None = None
):
"""Datadis get_contract_detail query (sync wrapper)."""
return asyncio.run(self.async_get_contract_detail(cups, distributor_code, authorized_nif))
async def async_get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
is_smart_fetch: bool = False,
):
if self._smart_fetch and not is_smart_fetch:
_start = start_date
consumptions = []
while _start < end_date:
_end = min(
_start + relativedelta(months=MAX_CONSUMPTIONS_MONTHS), end_date
)
sub_consumptions = await self.async_get_consumption_data(
cups,
distributor_code,
_start,
_end,
measurement_type,
point_type,
authorized_nif,
is_smart_fetch=True,
)
consumptions = utils.extend_by_key(
consumptions,
sub_consumptions,
"datetime",
)
_start = _end
return consumptions
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
"measurementType": measurement_type,
"pointType": point_type,
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(URL_GET_CONSUMPTION_DATA, request_data=data)
consumptions = []
for i in response:
if "consumptionKWh" in i:
if all(k in i for k in GET_CONSUMPTION_DATA_MANDATORY_FIELDS):
hour = str(int(i["time"].split(":")[0]) - 1)
date_as_dt = datetime.strptime(
f"{i['date']} {hour.zfill(2)}:00", "%Y/%m/%d %H:%M"
)
if not (start_date <= date_as_dt <= end_date):
continue # skip element if dt is out of range
_surplus = i.get("surplusEnergyKWh", 0)
if _surplus is None:
_surplus = 0
consumptions.append(
ConsumptionData(
datetime=date_as_dt,
delta_h=1,
value_kWh=i["consumptionKWh"],
surplus_kWh=_surplus,
real=i["obtainMethod"] == "Real",
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching consumption data, got %s",
response,
)
return consumptions
def get_consumption_data(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
authorized_nif: str | None = None,
is_smart_fetch: bool = False,
):
"""Datadis get_consumption_data query (sync wrapper)."""
return asyncio.run(self.async_get_consumption_data(
cups,
distributor_code,
start_date,
end_date,
measurement_type,
point_type,
authorized_nif,
is_smart_fetch,
))
async def async_get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
):
data = {
"cups": cups,
"distributorCode": distributor_code,
"startDate": datetime.strftime(start_date, "%Y/%m"),
"endDate": datetime.strftime(end_date, "%Y/%m"),
}
if authorized_nif is not None:
data["authorizedNif"] = authorized_nif
response = await self._async_get(URL_GET_MAX_POWER, request_data=data)
maxpower_values = []
for i in response:
if all(k in i for k in GET_MAX_POWER_MANDATORY_FIELDS):
maxpower_values.append(
MaxPowerData(
datetime=datetime.strptime(
f"{i['date']} {i['time']}", "%Y/%m/%d %H:%M"
),
value_kW=i["maxPower"],
)
)
else:
_LOGGER.warning(
"Weird data structure while fetching maximeter data, got %s",
response,
)
return maxpower_values
def get_max_power(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
authorized_nif: str | None = None,
):
"""Datadis get_max_power query (sync wrapper)."""
return asyncio.run(self.async_get_max_power(
cups,
distributor_code,
start_date,
end_date,
authorized_nif,
))
"""A REData API connector"""
import datetime as dt
import logging
import aiohttp
import asyncio
from dateutil import parser
from ..definitions import PricingData
_LOGGER = logging.getLogger(__name__)
REQUESTS_TIMEOUT = 15
URL_REALTIME_PRICES = (
"https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real"
"?time_trunc=hour"
"&geo_ids={geo_id}"
"&start_date={start:%Y-%m-%dT%H:%M}&end_date={end:%Y-%m-%dT%H:%M}"
)
class REDataConnector:
"""Main class for REData connector"""
def __init__(
self,
) -> None:
"""Init method for REDataConnector"""
async def async_get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month (async)"""
url = URL_REALTIME_PRICES.format(
geo_id=8744 if is_ceuta_melilla else 8741,
start=dt_from,
end=dt_to,
)
data = []
timeout = aiohttp.ClientTimeout(total=REQUESTS_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as res:
text = await res.text()
if res.status == 200:
try:
res_json = await res.json()
res_list = res_json["included"][0]["attributes"]["values"]
except (IndexError, KeyError):
_LOGGER.error(
"%s returned a malformed response: %s ",
url,
text,
)
return data
for element in res_list:
data.append(
PricingData(
datetime=parser.parse(element["datetime"]).replace(tzinfo=None),
value_eur_kWh=element["value"] / 1000,
delta_h=1,
)
)
else:
_LOGGER.error(
"%s returned %s with code %s",
url,
text,
res.status,
)
except Exception as e:
_LOGGER.error("Exception fetching realtime prices: %s", e)
return data
def get_realtime_prices(
self, dt_from: dt.datetime, dt_to: dt.datetime, is_ceuta_melilla: bool = False
) -> list:
"""GET query to fetch realtime pvpc prices, historical data is limited to current month (sync wrapper)"""
return asyncio.run(self.async_get_realtime_prices(dt_from, dt_to, is_ceuta_melilla))
"""Constants file."""
PROG_NAME = "edata"
"""Definitions for data structures."""
import voluptuous as vol
import datetime as dt
from typing import TypedDict
ATTRIBUTES = {
"cups": None,
"contract_p1_kW": "kW",
"contract_p2_kW": "kW",
"yesterday_kWh": "kWh",
"yesterday_hours": "h",
"yesterday_p1_kWh": "kWh",
"yesterday_p2_kWh": "kWh",
"yesterday_p3_kWh": "kWh",
"yesterday_surplus_kWh": "kWh",
"yesterday_surplus_p1_kWh": "kWh",
"yesterday_surplus_p2_kWh": "kWh",
"yesterday_surplus_p3_kWh": "kWh",
"last_registered_date": None,
"last_registered_day_kWh": "kWh",
"last_registered_day_hours": "h",
"last_registered_day_p1_kWh": "kWh",
"last_registered_day_p2_kWh": "kWh",
"last_registered_day_p3_kWh": "kWh",
"last_registered_day_surplus_kWh": "kWh",
"last_registered_day_surplus_p1_kWh": "kWh",
"last_registered_day_surplus_p2_kWh": "kWh",
"last_registered_day_surplus_p3_kWh": "kWh",
"month_kWh": "kWh",
"month_daily_kWh": "kWh",
"month_days": "d",
"month_p1_kWh": "kWh",
"month_p2_kWh": "kWh",
"month_p3_kWh": "kWh",
"month_surplus_kWh": "kWh",
"month_surplus_p1_kWh": "kWh",
"month_surplus_p2_kWh": "kWh",
"month_surplus_p3_kWh": "kWh",
"month_€": "€",
"last_month_kWh": "kWh",
"last_month_daily_kWh": "kWh",
"last_month_days": "d",
"last_month_p1_kWh": "kWh",
"last_month_p2_kWh": "kWh",
"last_month_p3_kWh": "kWh",
"last_month_surplus_kWh": "kWh",
"last_month_surplus_p1_kWh": "kWh",
"last_month_surplus_p2_kWh": "kWh",
"last_month_surplus_p3_kWh": "kWh",
"last_month_€": "€",
"max_power_kW": "kW",
"max_power_date": None,
"max_power_mean_kW": "kW",
"max_power_90perc_kW": "kW",
}
# Energy term with taxes
DEFAULT_BILLING_ENERGY_FORMULA = "electricity_tax * iva_tax * kwh_eur * kwh"
# Power term with taxes
DEFAULT_BILLING_POWER_FORMULA = "electricity_tax * iva_tax * (p1_kw * (p1_kw_year_eur + market_kw_year_eur) + p2_kw * p2_kw_year_eur) / 365 / 24"
# Others term with taxes
DEFAULT_BILLING_OTHERS_FORMULA = "iva_tax * meter_month_eur / 30 / 24"
# Surplus term with taxes
DEFAULT_BILLING_SURPLUS_FORMULA = (
"electricity_tax * iva_tax * surplus_kwh * surplus_kwh_eur"
)
# Sum energy and power terms, and substract surplus until 0.
# An alternative would be "[(energy_term + power_term - surplus_term), 0]|max + others_term"
DEFAULT_BILLING_MAIN_FORMULA = "energy_term + power_term + others_term"
class SupplyData(TypedDict):
"""Data structure to represent a supply."""
cups: str
date_start: dt.datetime
date_end: dt.datetime
address: str | None
postal_code: str | None
province: str | None
municipality: str | None
distributor: str | None
pointType: int
distributorCode: str
SupplySchema = vol.Schema(
{
vol.Required("cups"): str,
vol.Required("date_start"): dt.datetime,
vol.Required("date_end"): dt.datetime,
vol.Required("address"): vol.Union(str, None),
vol.Required("postal_code"): vol.Union(str, None),
vol.Required("province"): vol.Union(str, None),
vol.Required("municipality"): vol.Union(str, None),
vol.Required("distributor"): vol.Union(str, None),
vol.Required("pointType"): int,
vol.Required("distributorCode"): str,
}
)
class ContractData(TypedDict):
"""Data structure to represent a contract."""
date_start: dt.datetime
date_end: dt.datetime
marketer: str
distributorCode: str
power_p1: float | None
power_p2: float | None
ContractSchema = vol.Schema(
{
vol.Required("date_start"): dt.datetime,
vol.Required("date_end"): dt.datetime,
vol.Required("marketer"): str,
vol.Required("distributorCode"): str,
vol.Required("power_p1"): vol.Union(vol.Coerce(float), None),
vol.Required("power_p2"): vol.Union(vol.Coerce(float), None),
}
)
class ConsumptionData(TypedDict):
"""Data structure to represent a consumption."""
datetime: dt.datetime
delta_h: float
value_kWh: float
surplus_kWh: float
real: bool
ConsumptionSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("delta_h"): vol.Coerce(float),
vol.Required("value_kWh"): vol.Coerce(float),
vol.Optional("surplus_kWh", default=0): vol.Coerce(float),
vol.Required("real"): bool,
}
)
class MaxPowerData(TypedDict):
"""Data structure to represent a MaxPower."""
datetime: dt.datetime
value_kW: float
MaxPowerSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_kW"): vol.Coerce(float),
}
)
class PricingData(TypedDict):
"""Data structure to represent pricing data."""
datetime: dt.datetime
value_eur_kWh: float
delta_h: float
PricingSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_eur_kWh"): vol.Coerce(float),
vol.Required("delta_h"): vol.Coerce(float),
}
)
class PricingRules(TypedDict):
"""Data structure to represent custom pricing rules."""
p1_kw_year_eur: float
p2_kw_year_eur: float
p1_kwh_eur: float | None
p2_kwh_eur: float | None
p3_kwh_eur: float | None
surplus_p1_kwh_eur: float | None
surplus_p2_kwh_eur: float | None
surplus_p3_kwh_eur: float | None
meter_month_eur: float
market_kw_year_eur: float
electricity_tax: float
iva_tax: float
energy_formula: str | None
power_formula: str | None
others_formula: str | None
surplus_formula: str | None
cycle_start_day: int | None
PricingRulesSchema = vol.Schema(
{
vol.Required("p1_kw_year_eur"): vol.Coerce(float),
vol.Required("p2_kw_year_eur"): vol.Coerce(float),
vol.Optional("p1_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("p2_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("p3_kwh_eur", default=None): vol.Union(vol.Coerce(float), None),
vol.Optional("surplus_p1_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Optional("surplus_p2_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Optional("surplus_p3_kwh_eur", default=None): vol.Union(
vol.Coerce(float), None
),
vol.Required("meter_month_eur"): vol.Coerce(float),
vol.Required("market_kw_year_eur"): vol.Coerce(float),
vol.Required("electricity_tax"): vol.Coerce(float),
vol.Required("iva_tax"): vol.Coerce(float),
vol.Optional("energy_formula", default=DEFAULT_BILLING_ENERGY_FORMULA): str,
vol.Optional("power_formula", default=DEFAULT_BILLING_POWER_FORMULA): str,
vol.Optional("others_formula", default=DEFAULT_BILLING_OTHERS_FORMULA): str,
vol.Optional("surplus_formula", default=DEFAULT_BILLING_SURPLUS_FORMULA): str,
vol.Optional("main_formula", default=DEFAULT_BILLING_MAIN_FORMULA): str,
vol.Optional("cycle_start_day", default=1): vol.Range(1, 30),
}
)
DEFAULT_PVPC_RULES = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
)
class ConsumptionAggData(TypedDict):
"""A dict holding a Consumption item."""
datetime: dt.datetime
value_kWh: float
value_p1_kWh: float
value_p2_kWh: float
value_p3_kWh: float
surplus_kWh: float
surplus_p1_kWh: float
surplus_p2_kWh: float
surplus_p3_kWh: float
delta_h: float
ConsumptionAggSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_kWh"): vol.Coerce(float),
vol.Required("value_p1_kWh"): vol.Coerce(float),
vol.Required("value_p2_kWh"): vol.Coerce(float),
vol.Required("value_p3_kWh"): vol.Coerce(float),
vol.Optional("surplus_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p1_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p2_kWh", default=0): vol.Coerce(float),
vol.Optional("surplus_p3_kWh", default=0): vol.Coerce(float),
vol.Required("delta_h"): vol.Coerce(float),
}
)
class PricingAggData(TypedDict):
"""A dict holding a Billing item."""
datetime: dt.datetime
value_eur: float
energy_term: float
power_term: float
others_term: float
surplus_term: float
delta_h: float
PricingAggSchema = vol.Schema(
{
vol.Required("datetime"): dt.datetime,
vol.Required("value_eur"): vol.Coerce(float),
vol.Required("energy_term"): vol.Coerce(float),
vol.Required("power_term"): vol.Coerce(float),
vol.Required("others_term"): vol.Coerce(float),
vol.Optional("surplus_term", default=0): vol.Coerce(float),
vol.Optional("delta_h", default=1): vol.Coerce(float),
},
)
class EdataData(TypedDict):
"""A Typed Dict to handle Edata Aggregated Data."""
supplies: list[SupplyData]
contracts: list[ContractData]
consumptions: list[ConsumptionData]
maximeter: list[MaxPowerData]
pvpc: list[PricingData]
consumptions_daily_sum: list[ConsumptionAggData]
consumptions_monthly_sum: list[ConsumptionAggData]
cost_hourly_sum: list[PricingAggData]
cost_daily_sum: list[PricingAggData]
cost_monthly_sum: list[PricingAggData]
EdataSchema = vol.Schema(
{
vol.Required("supplies"): [SupplySchema],
vol.Required("contracts"): [ContractSchema],
vol.Required("consumptions"): [ConsumptionSchema],
vol.Required("maximeter"): [MaxPowerSchema],
vol.Optional("pvpc", default=[]): [PricingSchema],
vol.Optional("consumptions_daily_sum", []): [ConsumptionAggSchema],
vol.Optional("consumptions_monthly_sum", []): [ConsumptionAggSchema],
vol.Optional("cost_hourly_sum", default=[]): [PricingAggSchema],
vol.Optional("cost_daily_sum", default=[]): [PricingAggSchema],
vol.Optional("cost_monthly_sum", default=[]): [PricingAggSchema],
}
)
"""A module for edata helpers."""
import asyncio
import contextlib
from datetime import datetime, timedelta
import logging
import os
from dateutil.relativedelta import relativedelta
import requests
from . import const
from .connectors.datadis import DatadisConnector
from .connectors.redata import REDataConnector
from .definitions import ATTRIBUTES, EdataData, PricingRules
from .processors import utils
from .processors.billing import BillingInput, BillingProcessor
from .processors.consumption import ConsumptionProcessor
from .processors.maximeter import MaximeterProcessor
from .storage import check_storage_integrity, dump_storage, load_storage
_LOGGER = logging.getLogger(__name__)
def acups(cups):
"""Print an abbreviated and anonymized CUPS."""
return cups[-5:]
class EdataHelper:
"""Main EdataHelper class."""
UPDATE_INTERVAL = timedelta(hours=1)
def __init__(
self,
datadis_username: str,
datadis_password: str,
cups: str,
datadis_authorized_nif: str | None = None,
pricing_rules: PricingRules | None = None,
storage_dir_path: str | None = None,
data: EdataData | None = None,
) -> None:
self.data = EdataData(
supplies=[],
contracts=[],
consumptions=[],
maximeter=[],
pvpc=[],
consumptions_daily_sum=[],
consumptions_monthly_sum=[],
cost_hourly_sum=[],
cost_daily_sum=[],
cost_monthly_sum=[],
)
self.attributes = {}
self._storage_dir = storage_dir_path
self._cups = cups
self._scups = acups(cups)
self._authorized_nif = datadis_authorized_nif
self.last_update = {x: datetime(1970, 1, 1) for x in self.data}
self._date_from = datetime(1970, 1, 1)
self._date_to = datetime.today()
self._must_dump = True
self._incremental_update = True
if data is not None:
data = check_storage_integrity(data)
self.data = data
else:
with contextlib.suppress(Exception):
self.data = load_storage(self._cups, self._storage_dir)
for attr in ATTRIBUTES:
self.attributes[attr] = None
self.datadis_api = DatadisConnector(
datadis_username,
datadis_password,
storage_path=(
os.path.join(storage_dir_path, const.PROG_NAME)
if storage_dir_path is not None
else None
),
)
self.redata_api = REDataConnector()
self.pricing_rules = pricing_rules
if self.pricing_rules is not None:
self.enable_billing = True
if not all(
x in self.pricing_rules and self.pricing_rules[x] is not None
for x in ("p1_kwh_eur", "p2_kwh_eur", "p3_kwh_eur")
):
self.is_pvpc = True
else:
self.is_pvpc = False
else:
self.enable_billing = False
self.is_pvpc = False
async def async_update(
self,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
incremental_update: bool = True,
):
"""Async call of update method."""
_LOGGER.info(
"%s: update triggered",
self._scups,
)
self._date_from = date_from
self._date_to = date_to
# update datadis resources
await self.update_datadis(self._cups, date_from, date_to)
# update redata resources if pvpc is requested
if self.is_pvpc:
try:
await asyncio.to_thread(self.update_redata, date_from, date_to)
except requests.exceptions.Timeout:
_LOGGER.error("Timeout exception while updating from REData")
await asyncio.to_thread(self.process_data, incremental_update=incremental_update)
if self._must_dump:
await asyncio.to_thread(dump_storage, self._cups, self.data, self._storage_dir)
def update(
self,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
incremental_update: bool = True,
):
"""Update synchronously."""
asyncio.run(self.async_update(date_from, date_to, incremental_update))
async def update_supplies(self):
"""Update supplies."""
_LOGGER.debug("%s: supplies update triggered", self._scups)
if datetime.today().date() != self.last_update["supplies"].date():
# if supplies haven't been updated today
supplies = await self.datadis_api.async_get_supplies(
authorized_nif=self._authorized_nif
) # fetch supplies
if len(supplies) > 0:
self.data["supplies"] = supplies
# if we got something, update last_update flag
self.last_update["supplies"] = datetime.now()
_LOGGER.info("%s: supplies update succeeded", self._scups)
else:
_LOGGER.info("%s: supplies are already updated (skipping)", self._scups)
async def update_contracts(self, cups: str, distributor_code: str):
"""Update contracts."""
_LOGGER.debug("%s: contracts update triggered", self._scups)
if datetime.today().date() != self.last_update["contracts"].date():
# if contracts haven't been updated today
contracts = await self.datadis_api.async_get_contract_detail(
cups, distributor_code, authorized_nif=self._authorized_nif
)
if len(contracts) > 0:
self.data["contracts"] = utils.extend_by_key(
self.data["contracts"], contracts, "date_start"
) # extend contracts data with new ones
# if we got something, update last_update flag
self.last_update["contracts"] = datetime.now()
_LOGGER.info("%s: contracts update succeeded", self._scups)
else:
_LOGGER.info("%s: contracts are already updated (skipping)", self._scups)
async def update_consumptions(
self,
cups: str,
distributor_code: str,
start_date: datetime,
end_date: datetime,
measurement_type: str,
point_type: int,
):
"""Update consumptions."""
_LOGGER.debug("%s: consumptions update triggered", self._scups)
if (datetime.now() - self.last_update["consumptions"]) > self.UPDATE_INTERVAL:
consumptions = await self.datadis_api.async_get_consumption_data(
cups,
distributor_code,
start_date,
end_date,
measurement_type,
point_type,
authorized_nif=self._authorized_nif,
)
if len(consumptions) > 0:
_LOGGER.info(
"%s: got consumptions from %s to %s",
self._scups,
consumptions[0]["datetime"].isoformat(),
consumptions[-1]["datetime"].isoformat(),
)
self.data["consumptions"] = utils.extend_by_key(
self.data["consumptions"], consumptions, "datetime"
)
self.last_update["consumptions"] = datetime.now()
else:
_LOGGER.info("%s: consumptions are up to date", self._scups)
else:
_LOGGER.info("%s: consumptions are already updated (skipping)", self._scups)
async def update_maximeter(self, cups, distributor_code, start_date, end_date):
"""Update maximeter."""
_LOGGER.debug("%s: maximeter update triggered", self._scups)
if (datetime.now() - self.last_update["maximeter"]) > self.UPDATE_INTERVAL:
maximeter = await self.datadis_api.async_get_max_power(
cups,
distributor_code,
start_date,
end_date,
authorized_nif=self._authorized_nif,
)
if len(maximeter) > 0:
_LOGGER.info(
"%s: maximeter update succeeded",
self._scups,
)
self.data["maximeter"] = utils.extend_by_key(
self.data["maximeter"], maximeter, "datetime"
)
self.last_update["maximeter"] = datetime.now()
else:
_LOGGER.info("%s: maximeter is up to date", self._scups)
else:
_LOGGER.info("%s: maximeter is already updated (skipping)", self._scups)
async def update_datadis(
self,
cups: str,
date_from: datetime = datetime(1970, 1, 1),
date_to: datetime = datetime.today(),
):
"""Update all data from Datadis."""
_LOGGER.info(
"%s: datadis update triggered (from %s to %s)",
self._scups,
date_from.isoformat(),
date_to.isoformat(),
)
# update supplies and get distributorCode
await self.update_supplies()
if len(self.data["supplies"]) == 0:
# return if no supplies were discovered
_LOGGER.warning(
"%s: supplies update failed or no supplies found in the provided account",
self._scups,
)
return False
# find requested cups in supplies
supply = utils.get_by_key(self.data["supplies"], "cups", cups)
if supply is None:
# return if specified cups seems not valid
_LOGGER.error(
"%s: CUPS not found. Got: %s",
self._scups,
[acups(x["cups"]) for x in self.data["supplies"]],
)
return False
_LOGGER.info("%s: CUPS found in account", self._scups)
# get some supply-related data
supply_date_start = supply["date_start"]
distributor_code = supply["distributorCode"]
point_type = supply["pointType"]
_LOGGER.info(
"%s: CUPS start date is %s", self._scups, supply_date_start.isoformat()
)
_LOGGER.info(
"%s: CUPS end date is %s", self._scups, supply["date_end"].isoformat()
)
# update contracts to get valid periods
await self.update_contracts(cups, distributor_code)
if len(self.data["contracts"]) == 0:
_LOGGER.warning(
"%s: contracts update failed or no contracts found in the provided account",
self._scups,
)
# return False
# filter consumptions and maximeter, and log gaps
def sort_and_filter(dt_from, dt_to):
self.data["consumptions"], miss_cons = utils.extract_dt_ranges(
self.data["consumptions"],
dt_from,
dt_to,
gap_interval=timedelta(hours=6),
)
self.data["maximeter"], miss_maxim = utils.extract_dt_ranges(
self.data["maximeter"],
dt_from,
dt_to,
gap_interval=timedelta(days=60),
)
return miss_cons, miss_maxim
miss_cons, miss_maxim = sort_and_filter(date_from, date_to)
# update consumptions
_LOGGER.info(
"%s: missing consumptions: %s",
self._scups,
", ".join(
[
"from "
+ (x["from"] + timedelta(hours=1)).isoformat()
+ " to "
+ x["to"].isoformat()
for x in miss_cons
]
),
)
for gap in miss_cons:
if not (
gap["to"] < supply["date_start"] or gap["from"] > supply["date_end"]
):
# fetch consumptions for each consumptions gap in valid periods
start = max([gap["from"] + timedelta(hours=1), supply["date_start"]])
end = min([gap["to"], supply["date_end"]])
_LOGGER.info(
"%s: requesting consumptions from %s to %s",
self._scups,
start.isoformat(),
end.isoformat(),
)
await self.update_consumptions(
cups,
distributor_code,
start,
end,
"0",
point_type,
)
# update maximeter
_LOGGER.info(
"%s: missing maximeter: %s",
self._scups,
", ".join(
[
"from " + x["from"].isoformat() + " to " + x["to"].isoformat()
for x in miss_maxim
]
),
)
for gap in miss_maxim:
if not (date_to < supply["date_start"] or date_from > supply["date_end"]):
# fetch maximeter for each maximeter gap in valid periods
start = max(
[gap["from"], supply["date_start"] + relativedelta(months=1)]
)
end = min([gap["to"], supply["date_end"]])
start = min([start, end])
_LOGGER.info(
"%s: requesting maximeter from %s to %s",
self._scups,
start.isoformat(),
end.isoformat(),
)
await self.update_maximeter(cups, distributor_code, start, end)
miss_cons, miss_maxim = sort_and_filter(date_from, date_to)
return True
def update_redata(
self,
date_from: datetime = (datetime.today() - timedelta(days=30)).replace(
hour=0, minute=0
),
date_to: datetime = (datetime.today() + timedelta(days=2)).replace(
hour=0, minute=0
),
):
"""Fetch PVPC prices using REData API."""
_LOGGER.info(
"%s: updating PVPC prices",
self._scups,
)
self.data["pvpc"], missing = utils.extract_dt_ranges(
self.data["pvpc"],
date_from,
date_to,
gap_interval=timedelta(hours=1),
)
for gap in missing:
prices = []
gap["from"] = max(
(datetime.today() - timedelta(days=30)).replace(hour=0, minute=0),
gap["from"],
)
while len(prices) == 0 and gap["from"] < gap["to"]:
prices = self.redata_api.get_realtime_prices(gap["from"], gap["to"])
gap["from"] = gap["from"] + timedelta(days=1)
self.data["pvpc"] = utils.extend_by_key(
self.data["pvpc"], prices, "datetime"
)
return True
def process_data(self, incremental_update: bool = True):
"""Process all raw data."""
self._incremental_update = incremental_update
for process_method in [
self.process_supplies,
self.process_contracts,
self.process_consumptions,
self.process_maximeter,
self.process_cost,
]:
try:
process_method()
except Exception as ex: # pylint: disable=broad-except
_LOGGER.error("Unhandled exception while updating attributes")
_LOGGER.exception(ex)
for attribute in self.attributes:
if attribute in ATTRIBUTES and ATTRIBUTES[attribute] is not None:
self.attributes[attribute] = (
round(self.attributes[attribute], 2)
if self.attributes[attribute] is not None
else None
)
if not incremental_update:
dump_storage(self._cups, self.data, self._storage_dir)
def process_supplies(self):
"""Process supplies data."""
for i in self.data["supplies"]:
if i["cups"] == self._cups:
self.attributes["cups"] = self._cups
break
def process_contracts(self):
"""Process contracts data."""
most_recent_date = datetime(1970, 1, 1)
for i in self.data["contracts"]:
if i["date_end"] > most_recent_date:
most_recent_date = i["date_end"]
self.attributes["contract_p1_kW"] = i.get("power_p1", None)
self.attributes["contract_p2_kW"] = i.get("power_p2", None)
break
def process_consumptions(self):
"""Process consumptions data."""
if len(self.data["consumptions"]) > 0:
new_data_from = self._date_from
if self._incremental_update:
with contextlib.suppress(Exception):
new_data_from = self.data["consumptions_monthly_sum"][-1][
"datetime"
]
proc = ConsumptionProcessor(
{
"consumptions": [
x
for x in self.data["consumptions"]
if x["datetime"] >= new_data_from
],
"cycle_start_day": 1,
}
)
today_starts = datetime(
datetime.today().year,
datetime.today().month,
datetime.today().day,
0,
0,
0,
)
month_starts = datetime(
datetime.today().year, datetime.today().month, 1, 0, 0, 0
)
# append new data
self.data["consumptions_daily_sum"] = utils.extend_and_filter(
self.data["consumptions_daily_sum"],
proc.output["daily"],
"datetime",
self._date_from,
self._date_to,
)
self.data["consumptions_monthly_sum"] = utils.extend_and_filter(
self.data["consumptions_monthly_sum"],
proc.output["monthly"],
"datetime",
self._date_from,
self._date_to,
)
yday = utils.get_by_key(
self.data["consumptions_daily_sum"],
"datetime",
today_starts - timedelta(days=1),
)
self.attributes["yesterday_kWh"] = (
yday.get("value_kWh", None) if yday is not None else None
)
for tariff in (1, 2, 3):
self.attributes[f"yesterday_p{tariff}_kWh"] = (
yday.get(f"value_p{tariff}_kWh", None) if yday is not None else None
)
self.attributes["yesterday_surplus_kWh"] = (
yday.get("surplus_kWh", None) if yday is not None else None
)
for tariff in (1, 2, 3):
self.attributes[f"yesterday_surplus_p{tariff}_kWh"] = (
yday.get(f"surplus_p{tariff}_kWh", None)
if yday is not None
else None
)
self.attributes["yesterday_hours"] = (
yday.get("delta_h", None) if yday is not None else None
)
month = utils.get_by_key(
self.data["consumptions_monthly_sum"], "datetime", month_starts
)
self.attributes["month_kWh"] = (
month.get("value_kWh", None) if month is not None else None
)
self.attributes["month_surplus_kWh"] = (
month.get("surplus_kWh", None) if month is not None else None
)
self.attributes["month_days"] = (
month.get("delta_h", 0) / 24 if month is not None else None
)
self.attributes["month_daily_kWh"] = (
(
(self.attributes["month_kWh"] / self.attributes["month_days"])
if self.attributes["month_days"] > 0
else 0
)
if month is not None
else None
)
for tariff in (1, 2, 3):
self.attributes[f"month_p{tariff}_kWh"] = (
month.get(f"value_p{tariff}_kWh", None)
if month is not None
else None
)
self.attributes[f"month_surplus_p{tariff}_kWh"] = (
month.get(f"surplus_p{tariff}_kWh", None)
if month is not None
else None
)
last_month = utils.get_by_key(
self.data["consumptions_monthly_sum"],
"datetime",
(month_starts - relativedelta(months=1)),
)
self.attributes["last_month_kWh"] = (
last_month.get("value_kWh", None) if last_month is not None else None
)
self.attributes["last_month_surplus_kWh"] = (
last_month.get("surplus_kWh", None) if last_month is not None else None
)
self.attributes["last_month_days"] = (
last_month.get("delta_h", 0) / 24 if last_month is not None else None
)
self.attributes["last_month_daily_kWh"] = (
(
(
self.attributes["last_month_kWh"]
/ self.attributes["last_month_days"]
)
if self.attributes["last_month_days"] > 0
else 0
)
if last_month is not None
else None
)
for tariff in (1, 2, 3):
self.attributes[f"last_month_p{tariff}_kWh"] = (
last_month.get(f"value_p{tariff}_kWh", None)
if last_month is not None
else None
)
self.attributes[f"last_month_surplus_p{tariff}_kWh"] = (
last_month.get(f"surplus_p{tariff}_kWh", None)
if last_month is not None
else None
)
if len(self.data["consumptions"]) > 0:
self.attributes["last_registered_date"] = self.data["consumptions"][-1][
"datetime"
]
if len(self.data["consumptions_daily_sum"]) > 0:
last_day = self.data["consumptions_daily_sum"][-1]
self.attributes["last_registered_day_kWh"] = (
last_day.get("value_kWh", None)
if last_day is not None
else None
)
self.attributes["last_registered_day_surplus_kWh"] = (
last_day.get("surplus_kWh", None)
if last_day is not None
else None
)
for tariff in (1, 2, 3):
self.attributes[f"last_registered_day_p{tariff}_kWh"] = (
last_day.get(f"value_p{tariff}_kWh", None)
if last_day is not None
else None
)
self.attributes[
f"last_registered_day_surplus_p{tariff}_kWh"
] = (
last_day.get(f"surplus_p{tariff}_kWh", None)
if last_day is not None
else None
)
self.attributes["last_registered_day_hours"] = (
last_day.get("delta_h", None) if last_day is not None else None
)
def process_maximeter(self):
"""Process maximeter data."""
if len(self.data["maximeter"]) > 0:
processor = MaximeterProcessor(self.data["maximeter"])
last_relative_year = processor.output["stats"]
self.attributes["max_power_kW"] = last_relative_year.get(
"value_max_kW", None
)
self.attributes["max_power_date"] = last_relative_year.get("date_max", None)
self.attributes["max_power_mean_kW"] = last_relative_year.get(
"value_mean_kW", None
)
self.attributes["max_power_90perc_kW"] = last_relative_year.get(
"value_tile90_kW", None
)
def process_cost(self):
"""Process costs."""
if self.enable_billing:
new_data_from = self._date_from
if self._incremental_update:
with contextlib.suppress(Exception):
new_data_from = self.data["cost_monthly_sum"][-1]["datetime"]
proc = BillingProcessor(
BillingInput(
contracts=self.data["contracts"],
consumptions=[
x
for x in self.data["consumptions"]
if x["datetime"] >= new_data_from
],
prices=(
[x for x in self.data["pvpc"] if x["datetime"] >= new_data_from]
if self.is_pvpc
else None
),
rules=self.pricing_rules,
)
)
month_starts = datetime(
datetime.today().year, datetime.today().month, 1, 0, 0, 0
)
# append new data
hourly = proc.output["hourly"]
self.data["cost_hourly_sum"] = utils.extend_and_filter(
self.data["cost_hourly_sum"],
hourly,
"datetime",
self._date_from,
self._date_to,
)
daily = proc.output["daily"]
self.data["cost_daily_sum"] = utils.extend_and_filter(
self.data["cost_daily_sum"],
daily,
"datetime",
self._date_from,
self._date_to,
)
monthly = proc.output["monthly"]
self.data["cost_monthly_sum"] = utils.extend_and_filter(
self.data["cost_monthly_sum"],
monthly,
"datetime",
self._date_from,
self._date_to,
)
this_month = utils.get_by_key(
self.data["cost_monthly_sum"],
"datetime",
month_starts,
)
last_month = utils.get_by_key(
self.data["cost_monthly_sum"],
"datetime",
(month_starts - relativedelta(months=1)),
)
if this_month is not None:
self.attributes["month_€"] = this_month.get("value_eur", None)
if last_month is not None:
self.attributes["last_month_€"] = last_month.get("value_eur", None)
def reset(self):
"""Reset in-mem objects."""
self.data = EdataData(
supplies=[],
contracts=[],
consumptions=[],
maximeter=[],
pvpc=[],
consumptions_daily_sum=[],
consumptions_monthly_sum=[],
cost_hourly_sum=[],
cost_daily_sum=[],
cost_monthly_sum=[],
)
for attr in ATTRIBUTES:
self.attributes[attr] = None
self.last_update = {x: datetime(1970, 1, 1) for x in self.data}
"""Base definitions for processors."""
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any
class Processor(ABC):
"""A base class for data processors."""
_LABEL = "Processor"
def __init__(self, input_data: Any, auto: bool = True) -> None:
"""Init method."""
self._input = deepcopy(input_data)
self._output = None
if auto:
self.do_process()
@abstractmethod
def do_process(self):
"""Process method."""
@property
def output(self):
"""An output property."""
return deepcopy(self._output)
"""Billing data processors."""
import contextlib
from datetime import datetime, timedelta
import logging
from typing import Optional, TypedDict
from jinja2 import Environment
import voluptuous
from ..definitions import (
ConsumptionData,
ConsumptionSchema,
ContractData,
ContractSchema,
PricingAggData,
PricingData,
PricingRules,
PricingRulesSchema,
PricingSchema,
)
from ..processors import utils
from ..processors.base import Processor
_LOGGER = logging.getLogger(__name__)
class BillingOutput(TypedDict):
"""A dict holding BillingProcessor output property."""
hourly: list[PricingAggData]
daily: list[PricingAggData]
monthly: list[PricingAggData]
class BillingInput(TypedDict):
"""A dict holding BillingProcessor input data."""
contracts: list[ContractData]
consumptions: list[ConsumptionData]
prices: Optional[list[PricingData]]
rules: PricingRules
class BillingProcessor(Processor):
"""A billing processor for edata."""
def do_process(self):
"""Process billing and get hourly/daily/monthly metrics."""
self._output = BillingOutput(hourly=[], daily=[], monthly=[])
_schema = voluptuous.Schema(
{
voluptuous.Required("contracts"): [ContractSchema],
voluptuous.Required("consumptions"): [ConsumptionSchema],
voluptuous.Optional("prices", default=None): voluptuous.Union(
[voluptuous.Union(PricingSchema)], None
),
voluptuous.Required("rules"): PricingRulesSchema,
}
)
self._input = _schema(self._input)
self._cycle_offset = self._input["rules"]["cycle_start_day"] - 1
# joint data by datetime
_data = {
x["datetime"]: {
"datetime": x["datetime"],
"kwh": x["value_kWh"],
"surplus_kwh": x["surplus_kWh"] if x["surplus_kWh"] is not None else 0,
}
for x in self._input["consumptions"]
}
for contract in self._input["contracts"]:
start = contract["date_start"]
end = contract["date_end"]
finish = False
while not finish:
if start in _data:
_data[start]["p1_kw"] = contract["power_p1"]
_data[start]["p2_kw"] = contract["power_p2"]
start = start + timedelta(hours=1)
finish = not (end > start)
if self._input["prices"]:
for x in self._input["prices"]:
start = x["datetime"]
if start in _data:
_data[start]["kwh_eur"] = x["value_eur_kWh"]
env = Environment()
energy_expr = env.compile_expression(
f'({self._input["rules"]["energy_formula"]})|float'
)
power_expr = env.compile_expression(
f'({self._input["rules"]["power_formula"]})|float'
)
others_expr = env.compile_expression(
f'({self._input["rules"]["others_formula"]})|float'
)
surplus_expr = env.compile_expression(
f'({self._input["rules"]["surplus_formula"]})|float'
)
main_expr = env.compile_expression(
f'({self._input["rules"]["main_formula"]})|float'
)
_data = sorted([_data[x] for x in _data], key=lambda x: x["datetime"])
hourly = []
for x in _data:
x.update(self._input["rules"])
tariff = utils.get_pvpc_tariff(x["datetime"])
if "kwh_eur" not in x:
if tariff == "p1":
x["kwh_eur"] = x["p1_kwh_eur"]
elif tariff == "p2":
x["kwh_eur"] = x["p2_kwh_eur"]
elif tariff == "p3":
x["kwh_eur"] = x["p3_kwh_eur"]
if x["kwh_eur"] is None:
continue
if tariff == "p1":
x["surplus_kwh_eur"] = x["surplus_p1_kwh_eur"]
elif tariff == "p2":
x["surplus_kwh_eur"] = x["surplus_p2_kwh_eur"]
elif tariff == "p3":
x["surplus_kwh_eur"] = x["surplus_p3_kwh_eur"]
_energy_term = 0
_power_term = 0
_others_term = 0
_surplus_term = 0
with contextlib.suppress(Exception):
_energy_term = round(energy_expr(**x), 6)
_power_term = round(power_expr(**x), 6)
_others_term = round(others_expr(**x), 6)
_surplus_term = round(surplus_expr(**x), 6)
new_item = PricingAggData(
datetime=x["datetime"],
energy_term=_energy_term,
power_term=_power_term,
others_term=_others_term,
surplus_term=_surplus_term,
value_eur=0,
delta_h=1,
)
hourly.append(new_item)
self._output["hourly"] = hourly
last_day_dt = None
last_month_dt = None
for hour in hourly:
curr_hour_dt: datetime = hour["datetime"]
curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0)
curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace(
day=1
)
if last_day_dt is None or curr_day_dt != last_day_dt:
self._output["daily"].append(
PricingAggData(
datetime=curr_day_dt,
energy_term=hour["energy_term"],
power_term=hour["power_term"],
others_term=hour["others_term"],
surplus_term=hour["surplus_term"],
value_eur=hour["value_eur"],
delta_h=hour["delta_h"],
)
)
else:
self._output["daily"][-1]["energy_term"] += hour["energy_term"]
self._output["daily"][-1]["power_term"] += hour["power_term"]
self._output["daily"][-1]["others_term"] += hour["others_term"]
self._output["daily"][-1]["surplus_term"] += hour["surplus_term"]
self._output["daily"][-1]["delta_h"] += hour["delta_h"]
self._output["daily"][-1]["value_eur"] += hour["value_eur"]
if last_month_dt is None or curr_month_dt != last_month_dt:
self._output["monthly"].append(
PricingAggData(
datetime=curr_month_dt,
energy_term=hour["energy_term"],
power_term=hour["power_term"],
others_term=hour["others_term"],
surplus_term=hour["surplus_term"],
value_eur=hour["value_eur"],
delta_h=hour["delta_h"],
)
)
else:
self._output["monthly"][-1]["energy_term"] += hour["energy_term"]
self._output["monthly"][-1]["power_term"] += hour["power_term"]
self._output["monthly"][-1]["others_term"] += hour["others_term"]
self._output["monthly"][-1]["surplus_term"] += hour["surplus_term"]
self._output["monthly"][-1]["value_eur"] += hour["value_eur"]
self._output["monthly"][-1]["delta_h"] += hour["delta_h"]
last_day_dt = curr_day_dt
last_month_dt = curr_month_dt
for item in self._output:
for cost in self._output[item]:
cost["value_eur"] = round(main_expr(**cost, **self._input["rules"]), 6)
cost["energy_term"] = round(cost["energy_term"], 6)
cost["power_term"] = round(cost["power_term"], 6)
cost["others_term"] = round(cost["others_term"], 6)
cost["surplus_term"] = round(cost["surplus_term"], 6)
"""Consumption data processors."""
import logging
from collections.abc import Iterable
from typing import TypedDict
from datetime import datetime, timedelta
import voluptuous
from ..definitions import ConsumptionAggData, ConsumptionSchema
from . import utils
from .base import Processor
_LOGGER = logging.getLogger(__name__)
class ConsumptionOutput(TypedDict):
"""A dict holding ConsumptionProcessor output property."""
daily: Iterable[ConsumptionAggData]
monthly: Iterable[ConsumptionAggData]
class ConsumptionProcessor(Processor):
"""A consumptions processor."""
def do_process(self):
"""Calculate daily and monthly consumption stats."""
self._output = ConsumptionOutput(daily=[], monthly=[])
last_day_dt = None
last_month_dt = None
_schema = voluptuous.Schema(
{
voluptuous.Required("consumptions"): [ConsumptionSchema],
voluptuous.Optional("cycle_start_day", default=1): voluptuous.Range(
1, 30
),
}
)
self._input = _schema(self._input)
self._cycle_offset = self._input["cycle_start_day"] - 1
for consumption in self._input["consumptions"]:
curr_hour_dt: datetime = consumption["datetime"]
curr_day_dt = curr_hour_dt.replace(hour=0, minute=0, second=0)
curr_month_dt = (curr_day_dt - timedelta(days=self._cycle_offset)).replace(
day=1
)
tariff = utils.get_pvpc_tariff(curr_hour_dt)
kwh = consumption["value_kWh"]
surplus_kwh = consumption["surplus_kWh"]
delta_h = consumption["delta_h"]
kwh_by_tariff = [0, 0, 0]
surplus_kwh_by_tariff = [0, 0, 0]
match tariff:
case "p1":
kwh_by_tariff[0] = kwh
surplus_kwh_by_tariff[0] = surplus_kwh
case "p2":
kwh_by_tariff[1] = kwh
surplus_kwh_by_tariff[1] = surplus_kwh
case "p3":
kwh_by_tariff[2] = kwh
surplus_kwh_by_tariff[2] = surplus_kwh
if last_day_dt is None or curr_day_dt != last_day_dt:
self._output["daily"].append(
ConsumptionAggData(
datetime=curr_day_dt,
value_kWh=kwh,
delta_h=delta_h,
value_p1_kWh=kwh_by_tariff[0],
value_p2_kWh=kwh_by_tariff[1],
value_p3_kWh=kwh_by_tariff[2],
surplus_kWh=surplus_kwh,
surplus_p1_kWh=surplus_kwh_by_tariff[0],
surplus_p2_kWh=surplus_kwh_by_tariff[1],
surplus_p3_kWh=surplus_kwh_by_tariff[2],
)
)
else:
self._output["daily"][-1]["value_kWh"] += kwh
self._output["daily"][-1]["value_p1_kWh"] += kwh_by_tariff[0]
self._output["daily"][-1]["value_p2_kWh"] += kwh_by_tariff[1]
self._output["daily"][-1]["value_p3_kWh"] += kwh_by_tariff[2]
self._output["daily"][-1]["surplus_kWh"] += surplus_kwh
self._output["daily"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[0]
self._output["daily"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[1]
self._output["daily"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[2]
self._output["daily"][-1]["delta_h"] += delta_h
if last_month_dt is None or curr_month_dt != last_month_dt:
self._output["monthly"].append(
ConsumptionAggData(
datetime=curr_month_dt,
value_kWh=kwh,
delta_h=delta_h,
value_p1_kWh=kwh_by_tariff[0],
value_p2_kWh=kwh_by_tariff[1],
value_p3_kWh=kwh_by_tariff[2],
surplus_kWh=surplus_kwh,
surplus_p1_kWh=surplus_kwh_by_tariff[0],
surplus_p2_kWh=surplus_kwh_by_tariff[1],
surplus_p3_kWh=surplus_kwh_by_tariff[2],
)
)
else:
self._output["monthly"][-1]["value_kWh"] += kwh
self._output["monthly"][-1]["value_p1_kWh"] += kwh_by_tariff[0]
self._output["monthly"][-1]["value_p2_kWh"] += kwh_by_tariff[1]
self._output["monthly"][-1]["value_p3_kWh"] += kwh_by_tariff[2]
self._output["monthly"][-1]["surplus_kWh"] += surplus_kwh
self._output["monthly"][-1]["surplus_p1_kWh"] += surplus_kwh_by_tariff[
0
]
self._output["monthly"][-1]["surplus_p2_kWh"] += surplus_kwh_by_tariff[
1
]
self._output["monthly"][-1]["surplus_p3_kWh"] += surplus_kwh_by_tariff[
2
]
self._output["monthly"][-1]["delta_h"] += delta_h
last_day_dt = curr_day_dt
last_month_dt = curr_month_dt
# Round to two decimals
for item in self._output:
for cons in self._output[item]:
for key in cons:
if isinstance(cons[key], float):
cons[key] = round(cons[key], 2)
"""Maximeter data processors."""
import logging
from datetime import datetime
from typing import TypedDict
from dateparser import parse
import voluptuous
from edata.definitions import MaxPowerSchema
from edata.processors import utils
from .base import Processor
_LOGGER = logging.getLogger(__name__)
class MaximeterStats(TypedDict):
"""A dict holding MaximeterProcessor stats."""
value_max_kW: float
date_max: datetime
value_mean_kW: float
value_tile90_kW: float
class MaximeterOutput(TypedDict):
"""A dict holding MaximeterProcessor output property."""
stats: MaximeterStats
class MaximeterProcessor(Processor):
"""A processor for Maximeter data."""
def do_process(self):
"""Calculate maximeter stats."""
self._output = MaximeterOutput(stats={})
_schema = voluptuous.Schema([MaxPowerSchema])
self._input = _schema(self._input)
_values = [x["value_kW"] for x in self._input]
_max_kW = max(_values)
_dt_max_kW = parse(str(self._input[_values.index(_max_kW)]["datetime"]))
_mean_kW = sum(_values) / len(_values)
_tile90_kW = utils.percentile(_values, 0.9)
self._output["stats"] = MaximeterOutput(
value_max_kW=round(_max_kW, 2),
date_max=_dt_max_kW,
value_mean_kW=round(_mean_kW, 2),
value_tile90_kW=round(_tile90_kW, 2),
)
"""Generic utilities for processing data."""
import json
import logging
from copy import deepcopy
from datetime import date, datetime, timedelta
from json import JSONEncoder
import holidays
import math
import functools
import contextlib
_LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
HOURS_P1 = [10, 11, 12, 13, 18, 19, 20, 21]
HOURS_P2 = [8, 9, 14, 15, 16, 17, 22, 23]
WEEKDAYS_P3 = [5, 6]
def is_empty(lst):
"""Check if a list is empty."""
return len(lst) == 0
def extract_dt_ranges(lst, dt_from, dt_to, gap_interval=timedelta(hours=1)):
"""Filter a list of dicts between two datetimes."""
new_lst = []
missing = []
oldest_dt = None
newest_dt = None
last_dt = None
if len(lst) > 0:
sorted_lst = sorted(lst, key=lambda i: i["datetime"])
last_dt = dt_from
for i in sorted_lst:
if dt_from <= i["datetime"] <= dt_to:
if (i["datetime"] - last_dt) > gap_interval:
missing.append({"from": last_dt, "to": i["datetime"]})
if i.get("value_kWh", 1) > 0:
if oldest_dt is None or i["datetime"] < oldest_dt:
oldest_dt = i["datetime"]
if newest_dt is None or i["datetime"] > newest_dt:
newest_dt = i["datetime"]
if i["datetime"] != last_dt: # remove duplicates
new_lst.append(i)
last_dt = i["datetime"]
if dt_to > last_dt:
missing.append({"from": last_dt, "to": dt_to})
_LOGGER.debug("found data from %s to %s", oldest_dt, newest_dt)
else:
missing.append({"from": dt_from, "to": dt_to})
return new_lst, missing
def extend_by_key(old_lst, new_lst, key):
"""Extend a list of dicts by key."""
lst = deepcopy(old_lst)
temp_list = []
for new_element in new_lst:
for old_element in lst:
if new_element[key] == old_element[key]:
for i in old_element:
old_element[i] = new_element[i]
break
else:
temp_list.append(new_element)
lst.extend(temp_list)
return lst
def extend_and_filter(old_lst, new_lst, key, dt_from, dt_to):
data = extend_by_key(old_lst, new_lst, key)
data, _ = extract_dt_ranges(
data,
dt_from,
dt_to,
gap_interval=timedelta(days=365), # trick
)
return data
def get_by_key(lst, key, value):
"""Obtain an element of a list of dicts by key=value."""
for i in lst:
if i[key] == value:
return i
return None
def get_pvpc_tariff(a_datetime):
"""Evals the PVPC tariff for a given datetime."""
hdays = holidays.country_holidays("ES")
hour = a_datetime.hour
weekday = a_datetime.weekday()
if weekday in WEEKDAYS_P3 or a_datetime.date() in hdays:
return "p3"
elif hour in HOURS_P1:
return "p1"
elif hour in HOURS_P2:
return "p2"
else:
return "p3"
def serialize_dict(data: dict) -> dict:
"""Serialize dicts as json."""
class DateTimeEncoder(JSONEncoder):
"""Replace datetime objects with ISO strings."""
def default(self, o):
if isinstance(o, (date, datetime)):
return o.isoformat()
return json.loads(json.dumps(data, cls=DateTimeEncoder))
def deserialize_dict(serialized_dict: dict) -> dict:
"""Deserializes a json replacing ISOTIME strings into datetime."""
def datetime_parser(json_dict):
"""Parse JSON while converting ISO strings into datetime objects."""
for key, value in json_dict.items():
if "date" in key:
with contextlib.suppress(Exception):
json_dict[key] = datetime.fromisoformat(value)
return json_dict
return json.loads(json.dumps(serialized_dict), object_hook=datetime_parser)
def percentile(N, percent, key=lambda x: x):
"""Find the percentile of a list of values."""
if not N:
return None
k = (len(N) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(N[int(k)])
d0 = key(N[int(f)]) * (c - k)
d1 = key(N[int(c)]) * (k - f)
return d0 + d1
import json
import logging
import os
from .processors import utils
from . import const as const
from . import definitions as defs
_LOGGER = logging.getLogger(__name__)
DEFAULT_STORAGE_DIR = os.getenv("HOME")
RECENT_CACHE_FILENAME = "edata_{id}.json"
compile_storage_id = lambda cups: cups.lower()
def check_storage_integrity(data: defs.EdataData):
"""Check if an EdataData object follows a schema."""
return defs.EdataSchema(data)
def load_storage(cups: str, storage_dir: str | None = None):
"""Load EdataData storage from its config dir."""
if storage_dir is None:
storage_dir = DEFAULT_STORAGE_DIR
_subdir = os.path.join(storage_dir, const.PROG_NAME)
_recent_cache = os.path.join(
_subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups))
)
os.makedirs(_subdir, exist_ok=True)
with open(_recent_cache, encoding="utf-8") as f:
return check_storage_integrity(utils.deserialize_dict(json.load(f)))
def dump_storage(cups: str, storage: defs.EdataData, storage_dir: str | None = None):
"""Update EdataData storage."""
if storage_dir is None:
storage_dir = DEFAULT_STORAGE_DIR
_subdir = os.path.join(storage_dir, const.PROG_NAME)
_recent_cache = os.path.join(
_subdir, RECENT_CACHE_FILENAME.format(id=compile_storage_id(cups))
)
os.makedirs(_subdir, exist_ok=True)
with open(_recent_cache, "w", encoding="utf-8") as f:
json.dump(utils.serialize_dict(check_storage_integrity(storage)), f)
"""A collection of tests for e-data processors"""
import json
import pathlib
from freezegun import freeze_time
from ..definitions import PricingRules
from ..helpers import EdataHelper
from ..processors import utils
AT_TIME = "2022-10-22"
TESTS_DIR = str(pathlib.Path(__file__).parent.resolve())
TEST_GOOD_INPUT = TESTS_DIR + "/assets/helpers/edata.storage_TEST"
TEST_EXPECTATIONS_DATA = TESTS_DIR + "/assets/helpers/data.out"
TEST_EXPECTATIONS_ATTRIBUTES = (
TESTS_DIR + f"/assets/helpers/attributes_at_{AT_TIME}.out"
)
PRICING_RULES_PVPC = PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.05,
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
)
@freeze_time(AT_TIME)
def test_helper_offline(snapshot) -> None:
"""Tests EdataHelper (syrupy snapshot)"""
with open(TEST_GOOD_INPUT, "r", encoding="utf-8") as original_file:
data = utils.deserialize_dict(json.load(original_file))
helper = EdataHelper(
"USER",
"PASS",
"CUPS",
datadis_authorized_nif=None,
pricing_rules=PRICING_RULES_PVPC,
data=data,
)
helper.process_data()
# Compara ambos outputs con snapshot
assert {
"data": utils.serialize_dict(helper.data),
"attributes": utils.serialize_dict(helper.attributes),
} == snapshot
"""A collection of tests for e-data processors"""
import datetime as dt
import json
import pathlib
import typing
from collections.abc import Iterable
import pytest
from ..definitions import PricingData, PricingRules
from ..processors import utils
from ..processors.base import Processor
from ..processors.billing import BillingProcessor
from ..processors.consumption import ConsumptionProcessor
from ..processors.maximeter import MaximeterProcessor
TESTS_DIR = str(pathlib.Path(__file__).parent.resolve())
TEST_GOOD_INPUT = TESTS_DIR + "/assets/processors/edata.storage_TEST"
def _compare_processor_output(
source_filepath: str,
processor_class: Processor,
key: str,
snapshot,
):
with open(source_filepath, encoding="utf-8") as original_file:
data = utils.deserialize_dict(json.load(original_file))
if key == "consumptions":
processor = processor_class({"consumptions": data[key]})
else:
processor = processor_class(data[key])
assert utils.serialize_dict(processor.output) == snapshot
@pytest.mark.parametrize(
"processor, key",
[(ConsumptionProcessor, "consumptions"), (MaximeterProcessor, "maximeter")],
)
def test_processor(processor: Processor, key: str, snapshot) -> None:
"""Tests all processors but billing (syrupy snapshot)"""
_compare_processor_output(
TEST_GOOD_INPUT,
processor,
key,
snapshot,
)
@pytest.mark.parametrize(
"_id, rules, prices",
[
(
"custom_prices",
PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.1,
p1_kwh_eur=None,
p2_kwh_eur=None,
p3_kwh_eur=None,
),
[
PricingData(
datetime=dt.datetime(2022, 10, 22, x, 0, 0),
value_eur_kWh=1,
delta_h=1,
)
for x in range(0, 24)
],
),
(
"constant_prices",
PricingRules(
p1_kw_year_eur=30.67266,
p2_kw_year_eur=1.4243591,
meter_month_eur=0.81,
market_kw_year_eur=3.113,
electricity_tax=1.0511300560,
iva_tax=1.1,
p1_kwh_eur=1,
p2_kwh_eur=1,
p3_kwh_eur=1,
),
None,
),
],
)
def test_processor_billing(
_id: str, rules: PricingRules, prices: typing.Optional[Iterable[PricingData]], snapshot
):
"""Tests billing processor (syrupy snapshot)"""
with open(TEST_GOOD_INPUT, "r", encoding="utf-8") as original_file:
data = utils.deserialize_dict(json.load(original_file))
processor = BillingProcessor(
{
"consumptions": data["consumptions"],
"contracts": data["contracts"],
"prices": prices,
"rules": rules,
}
)
assert utils.serialize_dict(processor.output) == snapshot