Source code for champpy.core.mobility.parameterization

import logging
import pandas as pd
import numpy as np
import os
import pandera.pandas as pa

from importlib.resources import files
from dataclasses import dataclass, field
from rich.progress import track
from pandera.typing import Series
from itertools import product
from scipy.stats import beta

from champpy.utils.time_utils import get_day_index, TypeDays
from champpy.core.mobility.mobility_data import MobProfiles, MobProfilesExtended
from champpy.core.mobility.mobility_validation import MobCharacteristics

# Define paths to data files using importlib.resources
DATA_DIR = files("champpy").joinpath("data")
PARAMS_DIR = DATA_DIR / "params.parquet"
PARAMS_INFO_DIR = DATA_DIR / "params_info.parquet"

logger = logging.getLogger(__name__)


[docs] @dataclass(frozen=True) class UserParamsParameterizer: """ User parameters for the parameterization of the mobility model. This data class encapsulates all user-configurable parameters required by the :class:`Parameterizer` to calculate mobility model parameters from reference data. Examples -------- .. code-block:: python user_params = UserParamsParameterizer( description="Passenger cars weekday/weekend", vehicle_type="passenger car", temp_res=0.25, typeday=TypeDays(groups=[[0, 1, 2, 3, 4], [5, 6]]) ) """ description: str """Description of the parameter set. Example: ``"Parameters for passenger cars based on example1 data"`` """ vehicle_type: str """Type of vehicle the parameters apply to. Example: ``"passenger car"``, ``"light commercial vehicle"`` """ temp_res: float = 0.25 """Temporal resolution in hours. Default: ``0.25`` (15-minute resolution) """ typeday: TypeDays = field(default_factory=lambda: TypeDays(groups=[[0], [1], [2], [3], [4], [5], [6]])) """Group weekdays using types of days (:class:`~champpy.utils.time_utils.TypeDays`). Default: ``TypeDays(groups=[[0], [1], [2], [3], [4], [5], [6]])`` (each day of the week separate) Example: ``TypeDays(groups=[[0, 1, 2, 3, 4], [5, 6]])`` (for weekdays/weekends) """ speed_dist_edges_duration: list = field(default_factory=lambda: [0, 0.5, 1, 10]) """Speed distribution bin edges by trip duration in hours. Default: ``[0, 0.5, 1, 10]`` (bins: 0-30min, 30min-1h, 1h-10h) """ def __post_init__(self): # Ensure temp_res is positive if self.temp_res <= 0: mssg = "temp_res must be positive. Got: %s", self.temp_res logger.error(mssg) raise ValueError(mssg) # Ensure speed_dist_edges_duration is sorted and positive edges = self.speed_dist_edges_duration if any(d < 0 for d in edges) or any(b <= a for a, b in zip(edges, edges[1:])): mssg = f"speed_dist_edges_duration must be a sorted list of positive values. Got: {edges}" logger.error(mssg) raise ValueError(mssg) # Warning if speed_dist_edges_duration does not start with 0 if edges[0] != 0: mssg = ( f"speed_dist_edges_duration should start with 0 to include also trips with short duration. Got: {edges}" ) logger.warning(mssg) # Ensure typeday is instance of TypeDays if not isinstance(self.typeday, TypeDays): mssg = f"typeday must be an instance of TypeDays class. Got: {type(self.typeday)}" logger.error(mssg) raise ValueError(mssg)
class ParamsSchema(pa.DataFrameModel): """Schema for calculated parameters for the mobility model.""" id_params: int = pa.Field(ge=0, coerce=True) # Unique identifier for the parameter set. id_cluster: int = pa.Field(ge=1, coerce=True, default=0) percentage: float = pa.Field(ge=0.0, le=100.0, coerce=True) speed_max: float = pa.Field(ge=0.0, coerce=True) weekdays: Series[object] # List of weekday integers (0-6) transition_matrix: Series[object] # 3D numpy array: (timesteps, locations, locations) speed_dist_param1: Series[object] # List of speed distribution parameters (e.g. alpha) speed_dist_param2: Series[object] # List of speed distribution parameters (e.g. beta) speed_dist_edges_duration: Series[object] # List of speed distribution edges in hours class Config: strict = "filter" # remove extra columns coerce = True # enforce dtypes ordered = False # don't enforce column order @pa.dataframe_check def check_transition_matrix(cls, df: pd.DataFrame) -> Series[bool]: """Ensure transition_matrix cells contain 3D numpy arrays with values between 0 and 1.""" return df["transition_matrix"].apply( lambda x: isinstance(x, np.ndarray) and x.ndim == 3 and np.all((x >= 0) & (x <= 1)) ) @pa.dataframe_check def check_weekdays(cls, df: pd.DataFrame) -> Series[bool]: """Ensure weekdays cells contain lists of integers 0-6.""" return df["weekdays"].apply( lambda x: isinstance(x, list) and all(isinstance(d, int) and 0 <= d <= 6 for d in x) ) @pa.dataframe_check def check_speed_dist_params(cls, df: pd.DataFrame) -> Series[bool]: """Ensure speed distribution parameter cells contain lists of floats.""" bool_param2 = df["speed_dist_param2"].apply( lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x) ) bool_param1 = df["speed_dist_param1"].apply( lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x) ) bool_edges = df["speed_dist_edges_duration"].apply( lambda x: isinstance(x, list) and all(isinstance(d, (float, np.floating)) for d in x) ) return bool_param1 & bool_param2 & bool_edges
[docs] @dataclass class ParamsInfo: """ Metadata information for a mobility model parameter set. This dataclass stores descriptive information and metadata about calculated mobility model parameters, such as temporal resolution, vehicle type, and clustering details. """ id_params: int """Unique identifier for the parameter set.""" description: str """Description of the parameter set.""" vehicle_type: str """Type of vehicle the parameters apply to (e.g., ``"passenger car"``).""" temp_res: float """Temporal resolution of the mobility data in hours.""" annual_km: float """Annual kilometers driven as reference value.""" locations: list[int] """List of location IDs occurring in the mobility data.""" share_of_time_at_locations: list[float] """Share of time (0-1) vehicles spend at each location.""" number_typedays: int """Number of typedays used in the parameterization (1-7).""" number_clusters: int """Number of vehicle clusters in the parameterization.""" labels_locations: list[str] """Human-readable labels for each location.""" labels_clusters: list[str] """Human-readable labels for each cluster.""" created_user: str = field(default_factory=lambda: os.environ.get("USERNAME") or os.environ.get("USER") or "unknown") """Username who created the parameter set (default: current user).""" created_dt: pd.Timestamp = field(default_factory=pd.Timestamp.now) """Timestamp when the parameter set was created (default: now)."""
[docs] @dataclass(frozen=True) class ModelParams: """ Calculated parameters used in the mobility model. This dataclass combines the calculated parameter DataFrame with metadata information about the parameter set. This class is generated with :class:`Parameterizer` and can be loaded with :class:`ParamsLoader`. """ df: pd.DataFrame """DataFrame with calculated parameters. The DataFrame is construced with the panderad schema ParamsSchema and contains the following columns: .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Column - Type - Description * - id_params - :class:`int` - Unique identifier for the parameter set * - id_cluster - :class:`int` - Cluster identifier (≥1) * - percentage - :class:`float` - Percentage of this cluster (0-100) * - speed_max - :class:`float` - Maximum speed for normalization (≥0) * - weekdays - :class:`list` - List of weekday integers (0=Monday, 6=Sunday) * - transition_matrix - :class:`~numpy.ndarray` - 3D array with transition probabilities (timesteps, locations, locations) * - speed_dist_param1 - :class:`list` - Beta distribution alpha parameters by duration bin * - speed_dist_param2 - :class:`list` - Beta distribution beta parameters by duration bin * - speed_dist_edges_duration - :class:`list` - Duration bin edges in hours """ info: ParamsInfo """Information about the parameter set."""
[docs] class Parameterizer: """ Class to determine the paramerters for the mobility model. The Parameterizer is a factory class to calculate the parameters for the mobility model based on cleaned mobility data and user-defined parameters. The main method is :meth:`calc_params`, which takes cleaned mobility data as input and returns the calculated parameters as :class:`ModelParams`. Parameters ---------- user_params: :class:`UserParamsParameterizer` User paramerters for the parameterization """ def __init__(self, user_params: UserParamsParameterizer): """Initialize Parameterizer with user parameters. See :class:`UserParamsParameterizer` for details on the user parameters. """ # Store user parameters self.user_params = user_params # Internal placeholder for calculation of parameters DataFrame self._params_df: pd.DataFrame = pd.DataFrame() # Initalize placehlders for temporary variables self._unique_locations: list[int] = []
[docs] def calc_params(self, ref_profiles: MobProfiles) -> ModelParams: """ Calculate parameters for the mobility model. Main method to calculate parameters for the mobility model based on cleaned mobility data and user-defined parameters. The function performs the following steps for each cluster and weekday combination: - Calculate percentage of clusters based on number of days per cluster - Extend reference data to include weekday and day index - Reindex locations to consecutive integers starting from 1 (keep 0 as is) - Calculate transition matrices for each cluster and weekday combination - Fit Beta distributions for speed parameters binned by trip duration Parameters ---------- ref_profiles : MobProfiles Cleaned mobility data to be used as reference for the parameterization. Must be cleaned using :class:`MobProfilesCleaner` before input. Returns ------- ModelParams Calculated parameters and metadata stored in :class:`ModelParams` dataclass. """ # Abort if mob_profiles is not cleaned if not ref_profiles._is_cleaned: mssg = "ref_profiles must be cleaned mobility data. Please clean mobility data using MobProfilesCleaner before parameterization." logger.error(mssg) raise ValueError(mssg) logger.info("Starting parameterization of the mobility model.") # Create info DataFrame params_info = self._create_info(ref_profiles) # initialize params DataFrame number_cluster = ref_profiles.vehicles.df["id_cluster"].nunique() number_typeday = len(self.user_params.typeday.groups) number_rows = number_cluster * number_typeday clusters = ref_profiles.vehicles.df["id_cluster"].unique() # Create weekdays by repeating typeday for each cluster (keep as lists) weekdays_repeated = self.user_params.typeday.groups * number_cluster # edges als float-Liste edges_float = [float(e) for e in self.user_params.speed_dist_edges_duration] self._params_df = pd.DataFrame( { "id_params": [params_info.id_params] * number_rows, "id_cluster": np.repeat(clusters, number_typeday), "weekdays": weekdays_repeated, "percentage": np.zeros(number_rows), "speed_max": np.zeros(number_rows), "transition_matrix": [None] * number_rows, # Will be filled with 3D arrays "speed_dist_param1": [None] * number_rows, "speed_dist_param2": [None] * number_rows, "speed_dist_edges_duration": [edges_float] * number_rows, } ) # Calculate parameters self._calc_all_parameters(ref_profiles) # Validate params DataFrame validated_params_df = ParamsSchema.validate(self._params_df) # Return result as ModelParams dataclass return ModelParams(df=validated_params_df, info=params_info)
def _create_info(self, ref_profiles: MobProfiles) -> ParamsInfo: """Get parameter information DataFrame.""" mob_char = MobCharacteristics(ref_profiles, typedays=TypeDays(groups=[[0, 1, 2, 3, 4, 5, 6]]), method="mean") # Create info DataFrame params_info = ParamsInfo( id_params=0, # TODO Placeholder, should be unique identifier description=self.user_params.description, vehicle_type=self.user_params.vehicle_type, temp_res=self.user_params.temp_res, annual_km=(mob_char.df.loc[0, "daily_kilometrage"] * 365).round(3), locations=mob_char.df.loc[0, "locations"], share_of_time_at_locations=mob_char.df.loc[0, "share_of_time_at_locations"].round(3), number_typedays=len(self.user_params.typeday.groups), number_clusters=ref_profiles.vehicles.df["id_cluster"].nunique(), labels_locations=ref_profiles.locations.df["label"].tolist(), labels_clusters=ref_profiles.clusters.df["label"].tolist(), ) return params_info def _calc_all_parameters(self, ref_profiles: MobProfiles): """Calculate parameters for the mobility model.""" # Determine percentage of clusters based on number of days per cluster in ref_profiles self._calc_percentage_clusters(ref_profiles) # Extend reference data ref_profiles_df_ext = MobProfilesExtended(ref_profiles).df # add weekday and index columns temp_res = ref_profiles.logbooks.temp_res ref_profiles_df_ext["weekday"] = ref_profiles_df_ext["start_dt"].dt.dayofweek # Monday=0, Sunday=6 ref_profiles_df_ext["start_index"] = get_day_index(ref_profiles_df_ext["start_dt"], temp_res) ref_profiles_df_ext["end_index"] = get_day_index(ref_profiles_df_ext["end_dt"], temp_res) # Reindex locations ref_profiles_df_ext = self._reindex_locations(ref_profiles_df_ext) # Loop over each row in params DataFrame number_rows = self._params_df.shape[0] for idx in track(range(number_rows), description="Parameterization:"): cluster = self._params_df.at[idx, "id_cluster"] weekdays = self._params_df.at[idx, "weekdays"] logger.debug(f"Calculating parameters for cluster {cluster}, weekdays {weekdays}") # Filter ref_profiles for current cluster and weekdays mask_cluster = ref_profiles_df_ext["id_cluster"] == cluster mask_weekdays = ref_profiles_df_ext["weekday"].isin(weekdays) ref_profiles_df_ext_filtered = ref_profiles_df_ext[mask_cluster & mask_weekdays] # Calculate parameters for this cluster and weekdays self._calc_parameters_for_idx(ref_profiles_ext=ref_profiles_df_ext_filtered, idx=idx) def _calc_percentage_clusters(self, ref_profiles: MobProfiles): """Calculate percentage of days per cluster.""" vehicles_df = ref_profiles.vehicles.df vehicles_df["number_days"] = vehicles_df["last_day"] - vehicles_df["first_day"] + pd.Timedelta(days=1) number_days_total = vehicles_df["number_days"].sum() number_days_cluster = vehicles_df.groupby("id_cluster")["number_days"].sum() percentage_cluster = number_days_cluster / number_days_total * 100 self._params_df["percentage"] = self._params_df["id_cluster"].map(percentage_cluster).values def _calc_parameters_for_idx( self, ref_profiles_ext: pd.DataFrame, idx: int, ): """Calculate parameters for the parameterization.""" self._calc_transition_matrix(ref_profiles_ext, idx) self._calc_speed_distribution(ref_profiles_ext, idx) def _reindex_locations(self, ref_profiles_ext: pd.DataFrame) -> pd.DataFrame: """Reindex locations for the parameterization.""" # save unique locations excluding zero unique_locations = ref_profiles_ext["location"].unique() unique_locations_nozero = unique_locations[unique_locations != 0] locations_sorted = sorted(unique_locations.tolist()) self._unique_locations = locations_sorted # Reindex locations to consecutive integers starting from 1, keep 0 as is location_mapping = {old_id: new_id for new_id, old_id in enumerate(unique_locations_nozero, start=1)} ref_profiles_ext.loc[:, "location"] = ref_profiles_ext["location"].map(location_mapping).fillna(0).astype(int) return ref_profiles_ext def _calc_transition_matrix(self, ref_profiles_ext: pd.DataFrame, idx: int): """Calculate transition matrices for the mobility model. Methodological notes -------------------- The matrix is estimated from true discrete step transitions: state(t-1) -> state(t) for each vehicle. For each valid pair we count transitions by: - day_index (time-of-day step) - start_loc (location at t-1) - end_loc (location at t) Then we normalize row-wise over end_loc to get probabilities for each (day_index, start_loc) row. """ # Throw error if ref_profiles_ext is empty if ref_profiles_ext.empty: mssg = f"There is no data for cluster {self._params_df.at[idx, 'id_cluster']} and weekdays {self._params_df.at[idx, 'weekdays']}. Cannot calculate transition matrix." logger.error(mssg) raise ValueError(mssg) # Predefine required variables unique_location = np.arange(len(self._unique_locations)) unique_index_day = np.arange(int(24 / self.user_params.temp_res)) n_steps_per_day = len(unique_index_day) n_locations = len(unique_location) weekdays = self._params_df.at[idx, "weekdays"] # Build contiguous state occupancy from interval rows. # end_index - 1 ensures half-open intervals [start, end) on the day grid. starts = ref_profiles_ext["start_index"].to_numpy(copy=True) ends = ref_profiles_ext["end_index"].to_numpy(copy=True) - 1 ends[ends < 0] = n_steps_per_day - 1 lengths = ends - starts + 1 mask = lengths > 0 if not np.any(mask): mssg = f"There are no valid timesteps for cluster {self._params_df.at[idx, 'id_cluster']} and weekdays {weekdays}. Cannot calculate transition matrix." logger.error(mssg) raise ValueError(mssg) # Expand each interval row to one row per discrete time step. # Result: state_df contains the location state at each (vehicle, date, day_index). all_day_indices = np.concatenate([np.arange(s, e + 1) for s, e in zip(starts[mask], ends[mask])]) state_df = pd.DataFrame( { "id_vehicle": np.repeat(ref_profiles_ext.loc[mask, "id_vehicle"].to_numpy(), lengths[mask]), "date": np.repeat(ref_profiles_ext.loc[mask, "start_dt"].dt.normalize().to_numpy(), lengths[mask]), "weekday": np.repeat(ref_profiles_ext.loc[mask, "weekday"].to_numpy(), lengths[mask]), "day_index": all_day_indices, "location": np.repeat(ref_profiles_ext.loc[mask, "location"].to_numpy(), lengths[mask]), } ) state_df.sort_values(["id_vehicle", "date", "day_index"], inplace=True) # Previous-step columns define candidate Markov transition pairs. state_df["start_loc"] = state_df.groupby("id_vehicle")["location"].shift(1) state_df["prev_day_index"] = state_df.groupby("id_vehicle")["day_index"].shift(1) state_df["prev_date"] = state_df.groupby("id_vehicle")["date"].shift(1) # Accept only consecutive time-step pairs: # - regular in-day step: k-1 -> k # - midnight boundary: last step of previous day -> first step of next day is_same_day_step = (state_df["prev_date"] == state_df["date"]) & ( state_df["day_index"] == state_df["prev_day_index"] + 1 ) is_midnight_step = ( (state_df["day_index"] == 0) & (state_df["prev_day_index"] == n_steps_per_day - 1) & (pd.to_datetime(state_df["prev_date"]) + pd.Timedelta(days=1) == pd.to_datetime(state_df["date"])) ) # Keep only transitions that belong to the current weekday group (typeday row). mask_valid_transition = (is_same_day_step | is_midnight_step) & state_df["weekday"].isin(weekdays) trans_counts = ( state_df.loc[mask_valid_transition, ["day_index", "start_loc", "location"]] .rename(columns={"location": "end_loc"}) .astype({"day_index": int, "start_loc": int, "end_loc": int}) .groupby(["day_index", "start_loc", "end_loc"]) .size() .reset_index(name="count") ) # Build dense transition table with all combinations so output shape is fixed. # Missing combinations receive count=0 and later probability=0. combinations = list(product(unique_index_day, unique_location, unique_location)) transition_df = pd.DataFrame(combinations, columns=["day_index", "start_loc", "end_loc"]) transition_df = transition_df.merge(trans_counts, on=["day_index", "start_loc", "end_loc"], how="left") transition_df["count"] = transition_df["count"].fillna(0) total_counts = transition_df.groupby(["day_index", "start_loc"])["count"].sum() transition_df = transition_df.merge( total_counts.rename("total_count"), on=["day_index", "start_loc"], how="left" ) # Row-wise normalization: P(end_loc | day_index, start_loc). # For rows without observations (total_count==0), use identity fallback (stay prob = 1). # This keeps the matrix numerically valid and avoids undefined transitions. transition_df["probability"] = 0.0 mask_observed = transition_df["total_count"] > 0 transition_df.loc[mask_observed, "probability"] = ( transition_df.loc[mask_observed, "count"] / transition_df.loc[mask_observed, "total_count"] ) mask_unobserved = ~mask_observed & (transition_df["start_loc"] == transition_df["end_loc"]) transition_df.loc[mask_unobserved, "probability"] = 1.0 # Reshape to 3D numpy array tm = np.zeros((n_steps_per_day, n_locations, n_locations)) for _, row in transition_df.iterrows(): day_index = int(row["day_index"]) start_loc = int(row["start_loc"]) end_loc = int(row["end_loc"]) tm[day_index, start_loc, end_loc] = row["probability"] self._params_df.at[idx, "transition_matrix"] = tm def _calc_speed_distribution(self, ref_profiles_ext: pd.DataFrame, idx: int): """Calculate speed distribution parameters using a Beta distribution.""" # Get variables lb_speed_df = ref_profiles_ext[ref_profiles_ext["location"] == 0][["speed", "duration"]] edges_duration = self._params_df.at[idx, "speed_dist_edges_duration"] # Extract durations and speeds for different duration bins speeds_binned = [] for i in range(len(edges_duration) - 1): lower_edge = edges_duration[i] upper_edge = edges_duration[i + 1] mask = (lb_speed_df["duration"] >= lower_edge) & (lb_speed_df["duration"] < upper_edge) speeds_binned.append(lb_speed_df.loc[mask, "speed"].values) # Normalize speeds to [0, 1] for Beta distribution fitting max_speed = lb_speed_df["speed"].max() * 1.01 # add 1% margin self._params_df.at[idx, "speed_max"] = max_speed speeds_binned_normalized = [speeds / max_speed for speeds in speeds_binned] # Fit Beta distribution to each bin param1_list = [] param2_list = [] for speeds_binned_i in speeds_binned_normalized: # Plot die aktuelle Bin-Verteilung if len(speeds_binned_i) < 2: # Not enough data to fit distribution param1_list.append(np.nan) param2_list.append(np.nan) else: params = beta.fit(speeds_binned_i, floc=0, fscale=1) param1_list.append(params[0]) param2_list.append(params[1]) self._params_df.at[idx, "speed_dist_param1"] = [float(x) for x in param1_list] self._params_df.at[idx, "speed_dist_param2"] = [float(x) for x in param2_list]
[docs] class ParamsLoader: """ Class for loading pre-calculated parameters for the mobility model. The ParamsLoader is a factory class to load pre-calculated parameters for the mobility model. The parameters are loaded from parquet files stored in the repository. Basic workflow to load existing parameters: 1. Create an instance of the ParamsLoader class 2. Call :meth:`load_info` to check what parameters are available 3. Select parameters by choosing the corresponding ``id_params`` 4. Call :meth:`load_params` with the selected ``id_params`` Examples -------- >>> loader = ParamsLoader() >>> info_df = loader.load_info() >>> params = loader.load_params(id_params=1) """ def __init__(self, user_name: str = None): """Initialize ParameterLoader with database connection. Args: db: Database connection object. """ if user_name is None: user_name: str = field( default_factory=lambda: os.environ.get("USERNAME") or os.environ.get("USER") or "unknown" ) else: self.user_name = user_name
[docs] def load_info(self) -> pd.DataFrame: """ Load info DataFrame from all available parameter sets. Returns ------- pd.DataFrame DataFrame containing metadata for all available parameter sets """ if not PARAMS_INFO_DIR.exists(): return pd.DataFrame() return pd.read_parquet(PARAMS_INFO_DIR)
[docs] def load_params(self, id_params: int = None) -> ModelParams: """ Load existing ModelParams. Parameters ---------- id_params: int Unique identifier for the parameter set to be loaded. Must correspond to an existing entry in the info DataFrame. Returns ------- ModelParams Loaded model parameters and stored in :class:`ModelParams` """ logger.info("Load parameters with id_params=%s.", id_params) # Load info for existing params info_df = self.load_info() if info_df.empty: mssg = "There are no existing parameters." logger.error(mssg) raise ValueError(mssg) if id_params is not None and id_params not in info_df["id_params"].values: mssg = f"No parameters found for id_params = {id_params}. \nCheck ParamsLoader.load_info for available parameters." logger.error(mssg) raise ValueError(mssg) # Load params DataFrame params_df = self._load_only_params(id_params) # convert transition params_df = self._convert_params_df_list2tm(params_df) # Create ParamsInfo info_row = info_df[info_df["id_params"] == id_params].iloc[0] params_info = ParamsInfo( id_params=info_row["id_params"], description=info_row["description"], vehicle_type=info_row["vehicle_type"], temp_res=info_row["temp_res"], annual_km=info_row["annual_km"], locations=info_row["locations"], share_of_time_at_locations=info_row["share_of_time_at_locations"], number_typedays=info_row["number_typedays"], number_clusters=info_row["number_clusters"], labels_locations=info_row["labels_locations"], labels_clusters=info_row["labels_clusters"], created_user=info_row["created_user"], created_dt=info_row["created_dt"], ) return ModelParams(df=params_df, info=params_info)
def _load_only_params(self, id_params: int = None) -> pd.DataFrame: """Load only params DataFrame from params.parquet.""" # Load params DataFrame if id_params is not None: params_df = pd.read_parquet(PARAMS_DIR, filters=[("id_params", "==", id_params)]) else: params_df = pd.read_parquet(PARAMS_DIR) return params_df def _save_params(self, params: ModelParams) -> int: """Save calculated parameters. Only for internal use""" # Load info for existing params info_df = self.load_info() # Save info DataFrame if info_df.empty: # add new id_params new_id = 1 params.info.id_params = new_id # create info_df from ParamsInfo info_df = pd.DataFrame([vars(params.info)]) else: # check if params with same description already exist mask_existing = info_df["description"] == params.info.description if mask_existing.any(): mssg = ( f"There are already Parameters with description '{params.info.description}'. " f"Please define a unique description for the new parameters or use id_params = {info_df[mask_existing]['id_params'].values[0]} instead." ) logger.error(mssg) raise ValueError(mssg) # round annual_km to avoid floating point issues info_df["annual_km"] = info_df["annual_km"].round(3) # check with the same info excluding description and created_user, created_dt cols_to_check = [ col for col in info_df.columns if col in ["temp_res", "annual_km", "number_typedays", "number_clusters"] ] mask_existing_info = (info_df[cols_to_check] == pd.Series(vars(params.info))[cols_to_check]).all(axis=1) if mask_existing_info.any(): id_val = info_df[mask_existing_info]["id_params"].values[0] mssg = ( f"The are already Parameters with the same properties. " f"Please use id_params = {id_val} instead of creating new entries. " f"Check: \n{info_df[mask_existing_info]}" ) logger.error(mssg) raise ValueError(mssg) # assign new id_params new_id = info_df["id_params"].max() + 1 params.info.id_params = new_id # append new info info_df = pd.concat([info_df, pd.DataFrame([vars(params.info)])], ignore_index=True) # Save info DataFrame info_df.to_parquet(PARAMS_INFO_DIR, index=False) # Add id_params to params DataFrame params.df["id_params"] = new_id # Convert transition matrix np arrays to lists for saving params_df = params.df.copy() params_df = self._convert_params_df_tm2list(params_df) # load existing params DataFrame if PARAMS_DIR.exists(): params_existing_df = self._load_only_params() # append new params params_df = pd.concat([params_existing_df, params_df], ignore_index=True) # Save params DataFrame params_df.to_parquet(PARAMS_DIR, index=False) return new_id def _delete_params(self, id_params: int): """Delete parameters with given id_params. Only for internal use""" # Load info DataFrame info_df = self.load_info() if info_df.empty or id_params not in info_df["id_params"].values: mssg = f"No parameters found for id_params = {id_params}." logger.error(mssg) raise ValueError(mssg) # Delete from info DataFrame info_df = info_df[info_df["id_params"] != id_params] info_df.to_parquet(PARAMS_INFO_DIR, index=False) # Load params DataFrame params_df = self._load_only_params() # Delete from params DataFrame params_df = params_df[params_df["id_params"] != id_params] params_df.to_parquet(PARAMS_DIR, index=False)
[docs] @classmethod def deep_to_numpy(cls, arr): """Recursively convert nested lists or object arrays to float numpy arrays.""" # If arr is an object array, recursively convert to float arrays if isinstance(arr, np.ndarray) and arr.dtype == object: # Recursively process each row/layer return np.array([cls.deep_to_numpy(x) for x in arr], dtype=float) # If arr is a list, recursively process if isinstance(arr, list): return np.array([cls.deep_to_numpy(x) for x in arr], dtype=float) # If arr is already float or int, just return return arr
@classmethod def _convert_params_df_list2tm(cls, df: pd.DataFrame) -> pd.DataFrame: """Convert transition_matrix column from list/nested array to 3D np.ndarray.""" df["transition_matrix"] = df["transition_matrix"].apply(cls.deep_to_numpy) return df @staticmethod def _convert_params_df_tm2list(df: pd.DataFrame) -> pd.DataFrame: """Convert transition_matrix column from np.ndarray to list.""" df["transition_matrix"] = df["transition_matrix"].apply( lambda x: x.tolist() if isinstance(x, np.ndarray) else x ) return df