import logging
import pandas as pd
import numpy as np
import os
import pandera.pandas as pa
from importlib.resources import files
from dataclasses import dataclass, field
from rich.progress import track
from pandera.typing import Series
from itertools import product
from scipy.stats import beta
from champpy.utils.time_utils import get_day_index, TypeDays
from champpy.core.mobility.mobility_data import MobProfiles, MobProfilesExtended
from champpy.core.mobility.mobility_validation import MobCharacteristics
# Define paths to data files using importlib.resources
DATA_DIR = files("champpy").joinpath("data")
PARAMS_DIR = DATA_DIR / "params.parquet"
PARAMS_INFO_DIR = DATA_DIR / "params_info.parquet"
logger = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True)
class UserParamsParameterizer:
"""
User parameters for the parameterization of the mobility model.
This data class encapsulates all user-configurable parameters required by the
:class:`Parameterizer` to calculate mobility model parameters from reference data.
Examples
--------
.. code-block:: python
user_params = UserParamsParameterizer(
description="Passenger cars weekday/weekend",
vehicle_type="passenger car",
temp_res=0.25,
typeday=TypeDays(groups=[[0, 1, 2, 3, 4], [5, 6]])
)
"""
description: str
"""Description of the parameter set.
Example: ``"Parameters for passenger cars based on example1 data"``
"""
vehicle_type: str
"""Type of vehicle the parameters apply to.
Example: ``"passenger car"``, ``"light commercial vehicle"``
"""
temp_res: float = 0.25
"""Temporal resolution in hours.
Default: ``0.25`` (15-minute resolution)
"""
typeday: TypeDays = field(default_factory=lambda: TypeDays(groups=[[0], [1], [2], [3], [4], [5], [6]]))
"""Group weekdays using types of days (:class:`~champpy.utils.time_utils.TypeDays`).
Default: ``TypeDays(groups=[[0], [1], [2], [3], [4], [5], [6]])`` (each day of the week separate)
Example: ``TypeDays(groups=[[0, 1, 2, 3, 4], [5, 6]])`` (for weekdays/weekends)
"""
speed_dist_edges_duration: list = field(default_factory=lambda: [0, 0.5, 1, 10])
"""Speed distribution bin edges by trip duration in hours.
Default: ``[0, 0.5, 1, 10]`` (bins: 0-30min, 30min-1h, 1h-10h)
"""
def __post_init__(self):
# Ensure temp_res is positive
if self.temp_res <= 0:
mssg = "temp_res must be positive. Got: %s", self.temp_res
logger.error(mssg)
raise ValueError(mssg)
# Ensure speed_dist_edges_duration is sorted and positive
edges = self.speed_dist_edges_duration
if any(d < 0 for d in edges) or any(b <= a for a, b in zip(edges, edges[1:])):
mssg = f"speed_dist_edges_duration must be a sorted list of positive values. Got: {edges}"
logger.error(mssg)
raise ValueError(mssg)
# Warning if speed_dist_edges_duration does not start with 0
if edges[0] != 0:
mssg = (
f"speed_dist_edges_duration should start with 0 to include also trips with short duration. Got: {edges}"
)
logger.warning(mssg)
# Ensure typeday is instance of TypeDays
if not isinstance(self.typeday, TypeDays):
mssg = f"typeday must be an instance of TypeDays class. Got: {type(self.typeday)}"
logger.error(mssg)
raise ValueError(mssg)
class ParamsSchema(pa.DataFrameModel):
"""Schema for calculated parameters for the mobility model."""
id_params: int = pa.Field(ge=0, coerce=True) # Unique identifier for the parameter set.
id_cluster: int = pa.Field(ge=1, coerce=True, default=0)
percentage: float = pa.Field(ge=0.0, le=100.0, coerce=True)
speed_max: float = pa.Field(ge=0.0, coerce=True)
weekdays: Series[object] # List of weekday integers (0-6)
transition_matrix: Series[object] # 3D numpy array: (timesteps, locations, locations)
speed_dist_param1: Series[object] # List of speed distribution parameters (e.g. alpha)
speed_dist_param2: Series[object] # List of speed distribution parameters (e.g. beta)
speed_dist_edges_duration: Series[object] # List of speed distribution edges in hours
class Config:
strict = "filter" # remove extra columns
coerce = True # enforce dtypes
ordered = False # don't enforce column order
@pa.dataframe_check
def check_transition_matrix(cls, df: pd.DataFrame) -> Series[bool]:
"""Ensure transition_matrix cells contain 3D numpy arrays with values between 0 and 1."""
return df["transition_matrix"].apply(
lambda x: isinstance(x, np.ndarray) and x.ndim == 3 and np.all((x >= 0) & (x <= 1))
)
@pa.dataframe_check
def check_weekdays(cls, df: pd.DataFrame) -> Series[bool]:
"""Ensure weekdays cells contain lists of integers 0-6."""
return df["weekdays"].apply(
lambda x: isinstance(x, list) and all(isinstance(d, int) and 0 <= d <= 6 for d in x)
)
@pa.dataframe_check
def check_speed_dist_params(cls, df: pd.DataFrame) -> Series[bool]:
"""Ensure speed distribution parameter cells contain lists of floats."""
bool_param2 = df["speed_dist_param2"].apply(
lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x)
)
bool_param1 = df["speed_dist_param1"].apply(
lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x)
)
bool_edges = df["speed_dist_edges_duration"].apply(
lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x)
)
return bool_param1 & bool_param2 & bool_edges
[docs]
@dataclass
class ParamsInfo:
"""
Metadata information for a mobility model parameter set.
This dataclass stores descriptive information and metadata about calculated
mobility model parameters, such as temporal resolution, vehicle type, and
clustering details.
"""
id_params: int
"""Unique identifier for the parameter set."""
description: str
"""Description of the parameter set."""
vehicle_type: str
"""Type of vehicle the parameters apply to (e.g., ``"passenger car"``)."""
temp_res: float
"""Temporal resolution of the mobility data in hours."""
annual_km: float
"""Annual kilometers driven as reference value."""
locations: list[int]
"""List of location IDs occurring in the mobility data."""
share_of_time_at_locations: list[float]
"""Share of time (0-1) vehicles spend at each location."""
number_typedays: int
"""Number of typedays used in the parameterization (1-7)."""
number_clusters: int
"""Number of vehicle clusters in the parameterization."""
labels_locations: list[str]
"""Human-readable labels for each location."""
labels_clusters: list[str]
"""Human-readable labels for each cluster."""
created_user: str = field(default_factory=lambda: os.environ.get("USERNAME") or os.environ.get("USER") or "unknown")
"""Username who created the parameter set (default: current user)."""
created_dt: pd.Timestamp = field(default_factory=pd.Timestamp.now)
"""Timestamp when the parameter set was created (default: now)."""
[docs]
@dataclass(frozen=True)
class ModelParams:
"""
Calculated parameters used in the mobility model.
This dataclass combines the calculated parameter DataFrame with metadata
information about the parameter set. This class is generated with :class:`Parameterizer` and can be loaded with :class:`ParamsLoader`.
"""
df: pd.DataFrame
"""DataFrame with calculated parameters.
The DataFrame is construced with the panderad schema ParamsSchema and contains the following columns:
.. list-table::
:header-rows: 1
:widths: 25 15 60
* - Column
- Type
- Description
* - id_params
- :class:`int`
- Unique identifier for the parameter set
* - id_cluster
- :class:`int`
- Cluster identifier (≥1)
* - percentage
- :class:`float`
- Percentage of this cluster (0-100)
* - speed_max
- :class:`float`
- Maximum speed for normalization (≥0)
* - weekdays
- :class:`list`
- List of weekday integers (0=Monday, 6=Sunday)
* - transition_matrix
- :class:`~numpy.ndarray`
- 3D array with transition probabilities (timesteps, locations, locations)
* - speed_dist_param1
- :class:`list`
- Beta distribution alpha parameters by duration bin
* - speed_dist_param2
- :class:`list`
- Beta distribution beta parameters by duration bin
* - speed_dist_edges_duration
- :class:`list`
- Duration bin edges in hours
"""
info: ParamsInfo
"""Information about the parameter set."""
[docs]
class Parameterizer:
"""
Class to determine the paramerters for the mobility model.
The Parameterizer is a factory class to calculate the parameters for the mobility model based on cleaned mobility data and user-defined parameters.
The main method is :meth:`calc_params`, which takes cleaned mobility data as input and returns the calculated parameters as :class:`ModelParams`.
Parameters
----------
user_params: :class:`UserParamsParameterizer`
User paramerters for the parameterization
"""
def __init__(self, user_params: UserParamsParameterizer):
"""Initialize Parameterizer with user parameters.
See :class:`UserParamsParameterizer` for details on the user parameters.
"""
# Store user parameters
self.user_params = user_params
# Internal placeholder for calculation of parameters DataFrame
self._params_df: pd.DataFrame = pd.DataFrame()
# Initalize placehlders for temporary variables
self._unique_locations: list[int] = []
[docs]
def calc_params(self, ref_profiles: MobProfiles) -> ModelParams:
"""
Calculate parameters for the mobility model.
Main method to calculate parameters for the mobility model based on cleaned
mobility data and user-defined parameters. The function performs the following
steps for each cluster and weekday combination:
- Calculate percentage of clusters based on number of days per cluster
- Extend reference data to include weekday and day index
- Reindex locations to consecutive integers starting from 1 (keep 0 as is)
- Calculate transition matrices for each cluster and weekday combination
- Fit Beta distributions for speed parameters binned by trip duration
Parameters
----------
ref_profiles : MobProfiles
Cleaned mobility data to be used as reference for the parameterization.
Must be cleaned using :class:`MobProfilesCleaner` before input.
Returns
-------
ModelParams
Calculated parameters and metadata stored in :class:`ModelParams` dataclass.
"""
# Abort if mob_profiles is not cleaned
if not ref_profiles._is_cleaned:
mssg = "ref_profiles must be cleaned mobility data. Please clean mobility data using MobProfilesCleaner before parameterization."
logger.error(mssg)
raise ValueError(mssg)
logger.info("Starting parameterization of the mobility model.")
# Create info DataFrame
params_info = self._create_info(ref_profiles)
# initialize params DataFrame
number_cluster = ref_profiles.vehicles.df["id_cluster"].nunique()
number_typeday = len(self.user_params.typeday.groups)
number_rows = number_cluster * number_typeday
clusters = ref_profiles.vehicles.df["id_cluster"].unique()
# Create weekdays by repeating typeday for each cluster (keep as lists)
weekdays_repeated = self.user_params.typeday.groups * number_cluster
# edges als float-Liste
edges_float = [float(e) for e in self.user_params.speed_dist_edges_duration]
self._params_df = pd.DataFrame(
{
"id_params": [params_info.id_params] * number_rows,
"id_cluster": np.repeat(clusters, number_typeday),
"weekdays": weekdays_repeated,
"percentage": np.zeros(number_rows),
"speed_max": np.zeros(number_rows),
"transition_matrix": [None] * number_rows, # Will be filled with 3D arrays
"speed_dist_param1": [None] * number_rows,
"speed_dist_param2": [None] * number_rows,
"speed_dist_edges_duration": [edges_float] * number_rows,
}
)
# Calculate parameters
self._calc_all_parameters(ref_profiles)
# Validate params DataFrame
validated_params_df = ParamsSchema.validate(self._params_df)
# Return result as ModelParams dataclass
return ModelParams(df=validated_params_df, info=params_info)
def _create_info(self, ref_profiles: MobProfiles) -> ParamsInfo:
"""Get parameter information DataFrame."""
mob_char = MobCharacteristics(ref_profiles, typedays=TypeDays(groups=[[0, 1, 2, 3, 4, 5, 6]]), method="mean")
# Create info DataFrame
params_info = ParamsInfo(
id_params=0, # TODO Placeholder, should be unique identifier
description=self.user_params.description,
vehicle_type=self.user_params.vehicle_type,
temp_res=self.user_params.temp_res,
annual_km=(mob_char.df.loc[0, "daily_kilometrage"] * 365).round(3),
locations=mob_char.df.loc[0, "locations"],
share_of_time_at_locations=mob_char.df.loc[0, "share_of_time_at_locations"].round(3),
number_typedays=len(self.user_params.typeday.groups),
number_clusters=ref_profiles.vehicles.df["id_cluster"].nunique(),
labels_locations=ref_profiles.locations.df["label"].tolist(),
labels_clusters=ref_profiles.clusters.df["label"].tolist(),
)
return params_info
def _calc_all_parameters(self, ref_profiles: MobProfiles):
"""Calculate parameters for the mobility model."""
# Determine percentage of clusters based on number of days per cluster in ref_profiles
self._calc_percentage_clusters(ref_profiles)
# Extend reference data
ref_profiles_df_ext = MobProfilesExtended(ref_profiles).df
# add weekday and index columns
temp_res = ref_profiles.logbooks.temp_res
ref_profiles_df_ext["weekday"] = ref_profiles_df_ext["start_dt"].dt.dayofweek # Monday=0, Sunday=6
ref_profiles_df_ext["start_index"] = get_day_index(ref_profiles_df_ext["start_dt"], temp_res)
ref_profiles_df_ext["end_index"] = get_day_index(ref_profiles_df_ext["end_dt"], temp_res)
# Reindex locations
ref_profiles_df_ext = self._reindex_locations(ref_profiles_df_ext)
# Loop over each row in params DataFrame
number_rows = self._params_df.shape[0]
for idx in track(range(number_rows), description="Parameterization:"):
cluster = self._params_df.at[idx, "id_cluster"]
weekdays = self._params_df.at[idx, "weekdays"]
logger.debug(f"Calculating parameters for cluster {cluster}, weekdays {weekdays}")
# Filter ref_profiles for current cluster and weekdays
mask_cluster = ref_profiles_df_ext["id_cluster"] == cluster
mask_weekdays = ref_profiles_df_ext["weekday"].isin(weekdays)
ref_profiles_df_ext_filtered = ref_profiles_df_ext[mask_cluster & mask_weekdays]
# Calculate parameters for this cluster and weekdays
self._calc_parameters_for_idx(ref_profiles_ext=ref_profiles_df_ext_filtered, idx=idx)
def _calc_percentage_clusters(self, ref_profiles: MobProfiles):
"""Calculate percentage of days per cluster."""
vehicles_df = ref_profiles.vehicles.df
vehicles_df["number_days"] = vehicles_df["last_day"] - vehicles_df["first_day"] + pd.Timedelta(days=1)
number_days_total = vehicles_df["number_days"].sum()
number_days_cluster = vehicles_df.groupby("id_cluster")["number_days"].sum()
percentage_cluster = number_days_cluster / number_days_total * 100
self._params_df["percentage"] = self._params_df["id_cluster"].map(percentage_cluster).values
def _calc_parameters_for_idx(
self,
ref_profiles_ext: pd.DataFrame,
idx: int,
):
"""Calculate parameters for the parameterization."""
self._calc_transition_matrix(ref_profiles_ext, idx)
self._calc_speed_distribution(ref_profiles_ext, idx)
def _reindex_locations(self, ref_profiles_ext: pd.DataFrame) -> pd.DataFrame:
"""Reindex locations for the parameterization."""
# save unique locations excluding zero
unique_locations = ref_profiles_ext["location"].unique()
unique_locations_nozero = unique_locations[unique_locations != 0]
locations_sorted = sorted(unique_locations.tolist())
self._unique_locations = locations_sorted
# Reindex locations to consecutive integers starting from 1, keep 0 as is
location_mapping = {old_id: new_id for new_id, old_id in enumerate(unique_locations_nozero, start=1)}
ref_profiles_ext.loc[:, "location"] = ref_profiles_ext["location"].map(location_mapping).fillna(0).astype(int)
return ref_profiles_ext
def _calc_transition_matrix(self, ref_profiles_ext: pd.DataFrame, idx: int):
"""Calculate transition matrices for the mobility model.
Methodological notes
--------------------
The matrix is estimated from true discrete step transitions:
state(t-1) -> state(t) for each vehicle.
For each valid pair we count transitions by:
- day_index (time-of-day step)
- start_loc (location at t-1)
- end_loc (location at t)
Then we normalize row-wise over end_loc to get probabilities for each
(day_index, start_loc) row.
"""
# Throw error if ref_profiles_ext is empty
if ref_profiles_ext.empty:
mssg = f"There is no data for cluster {self._params_df.at[idx, 'id_cluster']} and weekdays {self._params_df.at[idx, 'weekdays']}. Cannot calculate transition matrix."
logger.error(mssg)
raise ValueError(mssg)
# Predefine required variables
unique_location = np.arange(len(self._unique_locations))
unique_index_day = np.arange(int(24 / self.user_params.temp_res))
n_steps_per_day = len(unique_index_day)
n_locations = len(unique_location)
weekdays = self._params_df.at[idx, "weekdays"]
# Build contiguous state occupancy from interval rows.
# end_index - 1 ensures half-open intervals [start, end) on the day grid.
starts = ref_profiles_ext["start_index"].to_numpy(copy=True)
ends = ref_profiles_ext["end_index"].to_numpy(copy=True) - 1
ends[ends < 0] = n_steps_per_day - 1
lengths = ends - starts + 1
mask = lengths > 0
if not np.any(mask):
mssg = f"There are no valid timesteps for cluster {self._params_df.at[idx, 'id_cluster']} and weekdays {weekdays}. Cannot calculate transition matrix."
logger.error(mssg)
raise ValueError(mssg)
# Expand each interval row to one row per discrete time step.
# Result: state_df contains the location state at each (vehicle, date, day_index).
all_day_indices = np.concatenate([np.arange(s, e + 1) for s, e in zip(starts[mask], ends[mask])])
state_df = pd.DataFrame(
{
"id_vehicle": np.repeat(ref_profiles_ext.loc[mask, "id_vehicle"].to_numpy(), lengths[mask]),
"date": np.repeat(ref_profiles_ext.loc[mask, "start_dt"].dt.normalize().to_numpy(), lengths[mask]),
"weekday": np.repeat(ref_profiles_ext.loc[mask, "weekday"].to_numpy(), lengths[mask]),
"day_index": all_day_indices,
"location": np.repeat(ref_profiles_ext.loc[mask, "location"].to_numpy(), lengths[mask]),
}
)
state_df.sort_values(["id_vehicle", "date", "day_index"], inplace=True)
# Previous-step columns define candidate Markov transition pairs.
state_df["start_loc"] = state_df.groupby("id_vehicle")["location"].shift(1)
state_df["prev_day_index"] = state_df.groupby("id_vehicle")["day_index"].shift(1)
state_df["prev_date"] = state_df.groupby("id_vehicle")["date"].shift(1)
# Accept only consecutive time-step pairs:
# - regular in-day step: k-1 -> k
# - midnight boundary: last step of previous day -> first step of next day
is_same_day_step = (state_df["prev_date"] == state_df["date"]) & (
state_df["day_index"] == state_df["prev_day_index"] + 1
)
is_midnight_step = (
(state_df["day_index"] == 0)
& (state_df["prev_day_index"] == n_steps_per_day - 1)
& (pd.to_datetime(state_df["prev_date"]) + pd.Timedelta(days=1) == pd.to_datetime(state_df["date"]))
)
# Keep only transitions that belong to the current weekday group (typeday row).
mask_valid_transition = (is_same_day_step | is_midnight_step) & state_df["weekday"].isin(weekdays)
trans_counts = (
state_df.loc[mask_valid_transition, ["day_index", "start_loc", "location"]]
.rename(columns={"location": "end_loc"})
.astype({"day_index": int, "start_loc": int, "end_loc": int})
.groupby(["day_index", "start_loc", "end_loc"])
.size()
.reset_index(name="count")
)
# Build dense transition table with all combinations so output shape is fixed.
# Missing combinations receive count=0 and later probability=0.
combinations = list(product(unique_index_day, unique_location, unique_location))
transition_df = pd.DataFrame(combinations, columns=["day_index", "start_loc", "end_loc"])
transition_df = transition_df.merge(trans_counts, on=["day_index", "start_loc", "end_loc"], how="left")
transition_df["count"] = transition_df["count"].fillna(0)
total_counts = transition_df.groupby(["day_index", "start_loc"])["count"].sum()
transition_df = transition_df.merge(
total_counts.rename("total_count"), on=["day_index", "start_loc"], how="left"
)
# Row-wise normalization: P(end_loc | day_index, start_loc).
# For rows without observations (total_count==0), use identity fallback (stay prob = 1).
# This keeps the matrix numerically valid and avoids undefined transitions.
transition_df["probability"] = 0.0
mask_observed = transition_df["total_count"] > 0
transition_df.loc[mask_observed, "probability"] = (
transition_df.loc[mask_observed, "count"] / transition_df.loc[mask_observed, "total_count"]
)
mask_unobserved = ~mask_observed & (transition_df["start_loc"] == transition_df["end_loc"])
transition_df.loc[mask_unobserved, "probability"] = 1.0
# Reshape to 3D numpy array
tm = np.zeros((n_steps_per_day, n_locations, n_locations))
for _, row in transition_df.iterrows():
day_index = int(row["day_index"])
start_loc = int(row["start_loc"])
end_loc = int(row["end_loc"])
tm[day_index, start_loc, end_loc] = row["probability"]
self._params_df.at[idx, "transition_matrix"] = tm
def _calc_speed_distribution(self, ref_profiles_ext: pd.DataFrame, idx: int):
"""Calculate speed distribution parameters using a Beta distribution."""
# Get variables
lb_speed_df = ref_profiles_ext[ref_profiles_ext["location"] == 0][["speed", "duration"]]
edges_duration = self._params_df.at[idx, "speed_dist_edges_duration"]
# Extract durations and speeds for different duration bins
speeds_binned = []
for i in range(len(edges_duration) - 1):
lower_edge = edges_duration[i]
upper_edge = edges_duration[i + 1]
mask = (lb_speed_df["duration"] >= lower_edge) & (lb_speed_df["duration"] < upper_edge)
speeds_binned.append(lb_speed_df.loc[mask, "speed"].values)
# Normalize speeds to [0, 1] for Beta distribution fitting
max_speed = lb_speed_df["speed"].max() * 1.01 # add 1% margin
self._params_df.at[idx, "speed_max"] = max_speed
speeds_binned_normalized = [speeds / max_speed for speeds in speeds_binned]
# Fit Beta distribution to each bin
param1_list = []
param2_list = []
for speeds_binned_i in speeds_binned_normalized:
# Plot die aktuelle Bin-Verteilung
if len(speeds_binned_i) < 2:
# Not enough data to fit distribution
param1_list.append(np.nan)
param2_list.append(np.nan)
else:
params = beta.fit(speeds_binned_i, floc=0, fscale=1)
param1_list.append(params[0])
param2_list.append(params[1])
self._params_df.at[idx, "speed_dist_param1"] = [float(x) for x in param1_list]
self._params_df.at[idx, "speed_dist_param2"] = [float(x) for x in param2_list]
[docs]
class ParamsLoader:
"""
Class for loading pre-calculated parameters for the mobility model.
The ParamsLoader is a factory class to load pre-calculated parameters for the mobility model.
The parameters are loaded from parquet files stored in the repository.
Basic workflow to load existing parameters:
1. Create an instance of the ParamsLoader class
2. Call :meth:`load_info` to check what parameters are available
3. Select parameters by choosing the corresponding ``id_params``
4. Call :meth:`load_params` with the selected ``id_params``
Examples
--------
>>> loader = ParamsLoader()
>>> info_df = loader.load_info()
>>> params = loader.load_params(id_params=1)
"""
def __init__(self, user_name: str = None):
"""Initialize ParameterLoader with database connection.
Args:
db: Database connection object.
"""
if user_name is None:
user_name: str = field(
default_factory=lambda: os.environ.get("USERNAME") or os.environ.get("USER") or "unknown"
)
else:
self.user_name = user_name
[docs]
def load_info(self) -> pd.DataFrame:
"""
Load info DataFrame from all available parameter sets.
Returns
-------
pd.DataFrame
DataFrame containing metadata for all available parameter sets
"""
if not PARAMS_INFO_DIR.exists():
return pd.DataFrame()
return pd.read_parquet(PARAMS_INFO_DIR)
[docs]
def load_params(self, id_params: int = None) -> ModelParams:
"""
Load existing ModelParams.
Parameters
----------
id_params: int
Unique identifier for the parameter set to be loaded. Must correspond to an existing entry in the info DataFrame.
Returns
-------
ModelParams
Loaded model parameters and stored in :class:`ModelParams`
"""
logger.info("Load parameters with id_params=%s.", id_params)
# Load info for existing params
info_df = self.load_info()
if info_df.empty:
mssg = "There are no existing parameters."
logger.error(mssg)
raise ValueError(mssg)
if id_params is not None and id_params not in info_df["id_params"].values:
mssg = f"No parameters found for id_params = {id_params}. \nCheck ParamsLoader.load_info for available parameters."
logger.error(mssg)
raise ValueError(mssg)
# Load params DataFrame
params_df = self._load_only_params(id_params)
# convert transition
params_df = self._convert_params_df_list2tm(params_df)
# Create ParamsInfo
info_row = info_df[info_df["id_params"] == id_params].iloc[0]
params_info = ParamsInfo(
id_params=info_row["id_params"],
description=info_row["description"],
vehicle_type=info_row["vehicle_type"],
temp_res=info_row["temp_res"],
annual_km=info_row["annual_km"],
locations=info_row["locations"],
share_of_time_at_locations=info_row["share_of_time_at_locations"],
number_typedays=info_row["number_typedays"],
number_clusters=info_row["number_clusters"],
labels_locations=info_row["labels_locations"],
labels_clusters=info_row["labels_clusters"],
created_user=info_row["created_user"],
created_dt=info_row["created_dt"],
)
return ModelParams(df=params_df, info=params_info)
def _load_only_params(self, id_params: int = None) -> pd.DataFrame:
"""Load only params DataFrame from params.parquet."""
# Load params DataFrame
if id_params is not None:
params_df = pd.read_parquet(PARAMS_DIR, filters=[("id_params", "==", id_params)])
else:
params_df = pd.read_parquet(PARAMS_DIR)
return params_df
def _save_params(self, params: ModelParams) -> int:
"""Save calculated parameters. Only for internal use"""
# Load info for existing params
info_df = self.load_info()
# Save info DataFrame
if info_df.empty:
# add new id_params
new_id = 1
params.info.id_params = new_id
# create info_df from ParamsInfo
info_df = pd.DataFrame([vars(params.info)])
else:
# check if params with same description already exist
mask_existing = info_df["description"] == params.info.description
if mask_existing.any():
mssg = (
f"There are already Parameters with description '{params.info.description}'. "
f"Please define a unique description for the new parameters or use id_params = {info_df[mask_existing]['id_params'].values[0]} instead."
)
logger.error(mssg)
raise ValueError(mssg)
# round annual_km to avoid floating point issues
info_df["annual_km"] = info_df["annual_km"].round(3)
# check with the same info excluding description and created_user, created_dt
cols_to_check = [
col for col in info_df.columns if col in ["temp_res", "annual_km", "number_typedays", "number_clusters"]
]
mask_existing_info = (info_df[cols_to_check] == pd.Series(vars(params.info))[cols_to_check]).all(axis=1)
if mask_existing_info.any():
id_val = info_df[mask_existing_info]["id_params"].values[0]
mssg = (
f"The are already Parameters with the same properties. "
f"Please use id_params = {id_val} instead of creating new entries. "
f"Check: \n{info_df[mask_existing_info]}"
)
logger.error(mssg)
raise ValueError(mssg)
# assign new id_params
new_id = info_df["id_params"].max() + 1
params.info.id_params = new_id
# append new info
info_df = pd.concat([info_df, pd.DataFrame([vars(params.info)])], ignore_index=True)
# Save info DataFrame
info_df.to_parquet(PARAMS_INFO_DIR, index=False)
# Add id_params to params DataFrame
params.df["id_params"] = new_id
# Convert transition matrix np arrays to lists for saving
params_df = params.df.copy()
params_df = self._convert_params_df_tm2list(params_df)
# load existing params DataFrame
if PARAMS_DIR.exists():
params_existing_df = self._load_only_params()
# append new params
params_df = pd.concat([params_existing_df, params_df], ignore_index=True)
# Save params DataFrame
params_df.to_parquet(PARAMS_DIR, index=False)
return new_id
def _delete_params(self, id_params: int):
"""Delete parameters with given id_params. Only for internal use"""
# Load info DataFrame
info_df = self.load_info()
if info_df.empty or id_params not in info_df["id_params"].values:
mssg = f"No parameters found for id_params = {id_params}."
logger.error(mssg)
raise ValueError(mssg)
# Delete from info DataFrame
info_df = info_df[info_df["id_params"] != id_params]
info_df.to_parquet(PARAMS_INFO_DIR, index=False)
# Load params DataFrame
params_df = self._load_only_params()
# Delete from params DataFrame
params_df = params_df[params_df["id_params"] != id_params]
params_df.to_parquet(PARAMS_DIR, index=False)
[docs]
@classmethod
def deep_to_numpy(cls, arr):
"""Recursively convert nested lists or object arrays to float numpy arrays."""
# If arr is an object array, recursively convert to float arrays
if isinstance(arr, np.ndarray) and arr.dtype == object:
# Recursively process each row/layer
return np.array([cls.deep_to_numpy(x) for x in arr], dtype=float)
# If arr is a list, recursively process
if isinstance(arr, list):
return np.array([cls.deep_to_numpy(x) for x in arr], dtype=float)
# If arr is already float or int, just return
return arr
@classmethod
def _convert_params_df_list2tm(cls, df: pd.DataFrame) -> pd.DataFrame:
"""Convert transition_matrix column from list/nested array to 3D np.ndarray."""
df["transition_matrix"] = df["transition_matrix"].apply(cls.deep_to_numpy)
return df
@staticmethod
def _convert_params_df_tm2list(df: pd.DataFrame) -> pd.DataFrame:
"""Convert transition_matrix column from np.ndarray to list."""
df["transition_matrix"] = df["transition_matrix"].apply(
lambda x: x.tolist() if isinstance(x, np.ndarray) else x
)
return df