Source code for champpy.core.mobility.mobility_model

import logging
import pandas as pd
import numpy as np

from dataclasses import dataclass
from rich.progress import track
from scipy.stats import beta, mode

from champpy.core.mobility.mobility_data import MobProfiles
from champpy.core.mobility.parameterization import ModelParams
from champpy.utils.time_utils import get_day_index, get_datetime_array, TypeDays

logger = logging.getLogger(__name__)


[docs] @dataclass class UserParamsMobModel: """ User parameters for configuring the mobility profile generation. This dataclass contains all user parameters for generating synthetic mobility profiles with the :class:`MobModel`. It defines the simulation period, number of vehicles, random seed, and other settings that control the profile generation process. Raises ------ ValueError If number_vehicles is less than 1. ValueError If start_date is not at least one day before end_date. """ number_vehicles: int = ( 50 #: Number of vehicles to generate mobility profiles for. Must be at least 1. Default is 50. ) start_date: pd.Timestamp = pd.Timestamp( "2025-01-01" ) #: Start date for the mobility profile generation period. Default is "2025-01-01". end_date: pd.Timestamp = pd.Timestamp( "2025-12-31" ) #: End date for the mobility profile generation period. Must be at least one day after start_date. Default is "2025-12-31". random_seed: int = 1 #: Random seed for reproducibility of the generated profiles. Default is 1. days_buffer: int = ( 1 #: Number of buffer days before and after the simulation period to avoid edge effects. Default is 1. ) first_loc: int = ( 1 #: Initial location ID for all vehicles at the start of the simulation. Typically 1 represents "Home". Default is 1. ) def __post_init__(self): # Validate number of vehicles: positive integer if self.number_vehicles < 1: message = "Number of vehicles must be at least 1." logging.error(message) raise ValueError(message) # Validate start_date and end_date format: start must be before end and at least one day apart if self.start_date.date() >= self.end_date.date(): message = "Start date must be at least one day before end date." logging.error(message) raise ValueError(message)
[docs] class MobModel: """ Mobility model for generating synthetic vehicle mobility profiles. The MobModel class uses a Markov chain approach to generate realistic mobility profiles (:class:`MobProfiles`) for a fleet of vehicles. The model simulates vehicle locations over time, journey starts and ends, speeds, and distances based on statistical parameters defined in :class:`ModelParams`. Parameters ---------- model_params : :class:`ModelParams` Dataclass containing calibrated mobility model parameters including transition matrices, speed distributions, and other statistical parameters for different vehicle clusters. Attributes ---------- model_params : :class:`ModelParams` Stored model parameters used for profile generation. Examples -------- .. code-block:: python import pandas as pd import champpy # Load model parameters params_loader = champpy.ParamsLoader() model_params = params_loader.load_params(id_params=1) # Initialize the mobility model with the model parameters mob_model = champpy.MobModel(model_params=model_params) # Define user parameters for generation user_params = UserParamsMobModel( number_vehicles=10, start_date=pd.Timestamp("2025-01-01"), end_date=pd.Timestamp("2025-12-31"), random_seed=42 ) # Generate mobility profiles mob_profiles = mob_model.generate_mob_profiles(user_params) """ def __init__(self, model_params: ModelParams): """ Initialize a MobModel instance. The parameters are described in the class docstring. """ self.model_params = model_params
[docs] def generate_mob_profiles(self, user_params: UserParamsMobModel) -> MobProfiles: """ Generate synthetic mobility profiles for a fleet of vehicles. This method creates mobility profiles by simulating vehicle movements using a Markov chain approach. For each time step, the model determines vehicle locations based on transition probabilities, identifies journey starts and ends, and calculates speeds and distances. Parameters ---------- user_params : :class:`UserParamsMobModel` User-defined parameters specifying the number of vehicles, simulation period, and other configuration settings. Returns ------- :class:`MobProfiles` Generated mobility profiles containing logbooks, vehicles, clusters, and locations data. """ logger.info( "Start generating mobility profiles for %d vehicles from %s to %s", user_params.number_vehicles, user_params.start_date, user_params.end_date, ) # Set random seed np.random.seed(user_params.random_seed) # Predefine variables self._predefine_vars(user_params=user_params) previous_start = np.zeros((self._number_vehicles,), dtype=int) # Add rich progress bar for vehicle loop for t in track(range(1, self._number_steps), description="Generating mobility profiles:"): # Determine new location based on transition matrix self._generate_location(t) for v in range(self._number_vehicles): location_t = self._location_array[t, v] location_tminus1 = self._location_array[t - 1, v] # Identify start and end of journeys if location_t == 0 and location_tminus1 != 0 and t != self._number_steps - 1: # Start of a new journey self._start_journey_array[t, v] = True previous_start[v] = t elif (location_t > 0 or t == (self._number_steps - 1)) and location_tminus1 == 0: # End of the current journey journey_duration_h = (t - previous_start[v]) * self.model_params.info.temp_res self._duration_array[previous_start[v], v] = journey_duration_h if t == (self._number_steps - 1): # Intervene if journey ends at the last time step # Set location to most frequent location: most_frequent_location = mode(self._location_array[:, v])[0] self._location_array[t, v] = most_frequent_location # Claculate speed and distance arrays based on start_journey_array and duration_array self._generate_speed_and_distance() # Convert arrays to MobProfiles instance mob_profiles = self._convert_arrays2mob_profiles() return mob_profiles
def _predefine_vars(self, user_params: UserParamsMobModel) -> None: """ Predefine variables for the mobility model. Args: user_params: UserParamsMobModel dataclass containing user parameters. """ self._number_vehicles = user_params.number_vehicles # Datetime array mit Buffer über Utility-Funktion self._dt_array, self._mask_buffer = get_datetime_array( start_date=user_params.start_date, end_date=user_params.end_date, temp_res=self.model_params.info.temp_res, number_days_buffer=user_params.days_buffer, ) self._number_steps = len(self._dt_array) self._index_day_array = get_day_index(self._dt_array, self.model_params.info.temp_res) weekday_array = self._dt_array.weekday mask_cluster1 = self.model_params.df["id_cluster"] == 1 typedays = [[int(i) for i in list(x)] for x in self.model_params.df.loc[mask_cluster1, "weekdays"]] typedays_array = TypeDays(typedays).weekday2typeday(weekday_array) first_weekday = self.model_params.df["weekdays"].apply(lambda x: x[0]) self.model_params.df["typeday"] = TypeDays(typedays).weekday2typeday(first_weekday) # Store transition matrices in one array for faster access self._tm_array = np.stack(self.model_params.df["transition_matrix"].to_numpy()) # generate random number to determine new location self._rand1_array = np.random.rand(self._number_steps, self._number_vehicles) # split vehicles into clusters cluster_array = self._split_vehicles_per_cluster(self._number_vehicles) # Determine index of parameters based on cluster_array and typedays_array self._index_params_array = ( self.model_params.df.reset_index() .pivot(index="typeday", columns="id_cluster", values="index") .loc[typedays_array, cluster_array] .to_numpy() ) # Initialize arrays for location, speed, distance, duration (sparse for speed, distance, duration) self._location_array = np.zeros((self._number_steps, self._number_vehicles), dtype=int) # location of vehicles self._speed_array = np.zeros( (self._number_steps, self._number_vehicles), dtype=float ) # speed of journeys (sparse) self._distance_array = np.zeros( (self._number_steps, self._number_vehicles), dtype=float ) # distance of journeys (sparse) self._duration_array = np.zeros( (self._number_steps, self._number_vehicles), dtype=float ) # duration of journeys (sparse) self._start_journey_array = np.zeros( (self._number_steps, self._number_vehicles), dtype=bool ) # start of journeys # Set first location for all vehicles self._location_array[0, :] = user_params.first_loc def _split_vehicles_per_cluster(self, number_vehicles: int) -> np.ndarray: """ Split the total number of vehicles into clusters based on the model parameters. Args: number_vehicles: Total number of vehicles to split. Returns: dict[int, int]: Dictionary with cluster ID as key and number of vehicles as value. """ percentages_per_cluster = self.model_params.df.groupby("id_cluster")["percentage"].first() vehicles_per_cluster = (percentages_per_cluster / 100 * number_vehicles).round().astype(int) rest = number_vehicles - vehicles_per_cluster.sum() if rest > 0: # Assign remaining vehicles to the largest cluster largest_cluster = vehicles_per_cluster.idxmax() vehicles_per_cluster[largest_cluster] += rest # Create array with cluster IDs for each vehicle cluster_array = np.zeros(number_vehicles, dtype=int) current_idx = 0 for cluster_id, n_vehicles in vehicles_per_cluster.items(): cluster_array[current_idx : current_idx + n_vehicles] = cluster_id current_idx += n_vehicles return cluster_array def _generate_location(self, t) -> None: """ Generate the locations for all vehicle for one timestep based on the transition matrix. Args: t: Time step """ # Parameter index, previous location and day index for all vehicles params_idx = self._index_params_array[t, :] loc_tminus1 = self._location_array[t - 1, :] day_idx = self._index_day_array[t] # Transition vectors and cumulative transition vectors for all vehicles trans_vecs = self._tm_array[params_idx, day_idx, loc_tminus1, :] cum_trans_vecs = np.cumsum(trans_vecs, axis=1) cum_trans_vecs[:, -1] = 1.0 # Random numbers for all vehicles rand_t = self._rand1_array[t, :] # New locations for all vehicles location_t = np.sum(rand_t[:, None] > cum_trans_vecs, axis=1) self._location_array[t, :] = location_t def _generate_speed_and_distance(self) -> None: """ Generate speed and distance arrays based on start_journey_array and duration_array. """ # generate speed and distance for all fields where duration > 0 mask_start = self._start_journey_array index_params_jarray = self._index_params_array[mask_start] edges_duration = self.model_params.df.loc[0, "speed_dist_edges_duration"] # only use the array entries where journeys start: journeys array (jarray) duration_jarray = self._duration_array[mask_start] number_journeys = duration_jarray.shape[0] # identify idx_duration for all journeys idx_duration_jarray = np.searchsorted(edges_duration, duration_jarray, side="right") - 1 max_index_duration = len(edges_duration) - 2 idx_duration_jarray = np.minimum(idx_duration_jarray, max_index_duration) # cap at max index # Get speed distribution parameters for all journeys (vectorized, no loop) speed_param1_full = np.array( self.model_params.df["speed_dist_param1"].to_list() ) # shape: (n_paramsets, n_bins) speed_param1_jarray = speed_param1_full[index_params_jarray, idx_duration_jarray] speed_param2_full = np.array( self.model_params.df["speed_dist_param2"].to_list() ) # shape: (n_paramsets, n_bins) speed_param2_jarray = speed_param2_full[index_params_jarray, idx_duration_jarray] speed_max_array = self.model_params.df["speed_max"].to_numpy()[index_params_jarray] # Generate random numbers for all journeys where duration > 0 rand2_array = np.random.rand(number_journeys) # Generate speed for all journeys (vectorized, no loop) speed_jarray = beta.ppf(rand2_array, speed_param1_jarray, speed_param2_jarray) * speed_max_array # Generate distance array distance_jarray = speed_jarray * duration_jarray # Set speed and distance values back to full arrays self._speed_array[mask_start] = speed_jarray self._distance_array[mask_start] = distance_jarray def _convert_arrays2mob_profiles(self) -> MobProfiles: """ Convert the generated arrays to a pandas DataFrame representing the logbook. Returns: pd.DataFrame: DataFrame containing the logbook data. """ # Extract first and last datetime without buffer dt_no_buffer = self._dt_array[~self._mask_buffer] first_step_no_buffer = dt_no_buffer[0] last_step_no_buffer = dt_no_buffer[-1] # convert location idx into location IDs self._location_array = self.model_params.info.locations[self._location_array] # get rows and cols of journeys rows, cols = np.nonzero(self._start_journey_array) sort_idx = np.lexsort((rows, cols)) # sort by vehicle and time rows_sorted = rows[sort_idx] cols_sorted = cols[sort_idx] # Predefine empty logbook DataFrame logbook_df = pd.DataFrame() logbook_df["id_vehicle"] = cols_sorted + 1 # vehicle IDs start at 1 logbook_df["dep_dt"] = self._dt_array[rows_sorted] logbook_df["arr_dt"] = self._dt_array[rows_sorted] + pd.to_timedelta( self._duration_array[rows_sorted, cols_sorted], unit="h" ) logbook_df["dep_loc"] = self._location_array[rows_sorted - 1, cols_sorted] step_end_journey = rows_sorted + ( self._duration_array[rows_sorted, cols_sorted] / self.model_params.info.temp_res ).round().astype(int) logbook_df["arr_loc"] = self._location_array[step_end_journey, cols_sorted] logbook_df["distance"] = self._distance_array[rows_sorted, cols_sorted] # Remove buffer from logbook_df: # Deleteing rows with arr_dt before first_step_no_buffer or dep_dt after last_step_no_buffer mask_buffer_logbook = (logbook_df["arr_dt"] < first_step_no_buffer) | ( logbook_df["dep_dt"] > last_step_no_buffer ) logbook_df = logbook_df[~mask_buffer_logbook].reset_index(drop=True) # remove buffer from all arrays self._dt_array = self._dt_array[~self._mask_buffer] self._start_journey_array = self._start_journey_array[~self._mask_buffer, :] self._duration_array = self._duration_array[~self._mask_buffer, :] self._distance_array = self._distance_array[~self._mask_buffer, :] self._location_array = self._location_array[~self._mask_buffer, :] self._speed_array = self._speed_array[~self._mask_buffer, :] # Create vehicle DataFrame first_day = first_step_no_buffer.floor("D") last_day = last_step_no_buffer.floor("D") id_cluster = self.model_params.df.id_cluster[self._index_params_array[1, :]] vehicle_df = pd.DataFrame( { "id_vehicle": range(1, self._number_vehicles + 1), "first_day": [first_day] * self._number_vehicles, "last_day": [last_day] * self._number_vehicles, "id_cluster": id_cluster.tolist(), "first_loc": self._location_array[0, :], } ) mob_profiles = MobProfiles(input_logbooks_df=logbook_df, input_vehicles_df=vehicle_df) # Update location labels locations_df = mob_profiles.locations.df locations_df["label"] = self.model_params.info.labels_locations mob_profiles.locations.update_locations(locations_df) # Update cluster labels clusters_df = mob_profiles.clusters.df clusters_df["label"] = self.model_params.info.labels_clusters mob_profiles.clusters.update_clusters(clusters_df) return mob_profiles