import logging
import pandas as pd
import numpy as np
from dataclasses import dataclass
from rich.progress import track
from scipy.stats import beta, mode
from champpy.core.mobility.mobility_data import MobProfiles
from champpy.core.mobility.parameterization import ModelParams
from champpy.utils.time_utils import get_day_index, get_datetime_array, TypeDays
logger = logging.getLogger(__name__)
[docs]
@dataclass
class UserParamsMobModel:
"""
User parameters for configuring the mobility profile generation.
This dataclass contains all user parameters for generating synthetic mobility profiles
with the :class:`MobModel`. It defines the simulation period, number of vehicles, random seed,
and other settings that control the profile generation process.
Raises
------
ValueError
If number_vehicles is less than 1.
ValueError
If start_date is not at least one day before end_date.
"""
number_vehicles: int = (
50 #: Number of vehicles to generate mobility profiles for. Must be at least 1. Default is 50.
)
start_date: pd.Timestamp = pd.Timestamp(
"2025-01-01"
) #: Start date for the mobility profile generation period. Default is "2025-01-01".
end_date: pd.Timestamp = pd.Timestamp(
"2025-12-31"
) #: End date for the mobility profile generation period. Must be at least one day after start_date. Default is "2025-12-31".
random_seed: int = 1 #: Random seed for reproducibility of the generated profiles. Default is 1.
days_buffer: int = (
1 #: Number of buffer days before and after the simulation period to avoid edge effects. Default is 1.
)
first_loc: int = (
1 #: Initial location ID for all vehicles at the start of the simulation. Typically 1 represents "Home". Default is 1.
)
def __post_init__(self):
# Validate number of vehicles: positive integer
if self.number_vehicles < 1:
message = "Number of vehicles must be at least 1."
logging.error(message)
raise ValueError(message)
# Validate start_date and end_date format: start must be before end and at least one day apart
if self.start_date.date() >= self.end_date.date():
message = "Start date must be at least one day before end date."
logging.error(message)
raise ValueError(message)
[docs]
class MobModel:
"""
Mobility model for generating synthetic vehicle mobility profiles.
The MobModel class uses a Markov chain approach to generate
realistic mobility profiles (:class:`MobProfiles`) for a fleet of vehicles. The model simulates
vehicle locations over time, journey starts and ends, speeds, and distances based on
statistical parameters defined in :class:`ModelParams`.
Parameters
----------
model_params : :class:`ModelParams`
Dataclass containing calibrated mobility model parameters including transition matrices,
speed distributions, and other statistical parameters for different vehicle clusters.
Attributes
----------
model_params : :class:`ModelParams`
Stored model parameters used for profile generation.
Examples
--------
.. code-block:: python
import pandas as pd
import champpy
# Load model parameters
params_loader = champpy.ParamsLoader()
model_params = params_loader.load_params(id_params=1)
# Initialize the mobility model with the model parameters
mob_model = champpy.MobModel(model_params=model_params)
# Define user parameters for generation
user_params = UserParamsMobModel(
number_vehicles=10,
start_date=pd.Timestamp("2025-01-01"),
end_date=pd.Timestamp("2025-12-31"),
random_seed=42
)
# Generate mobility profiles
mob_profiles = mob_model.generate_mob_profiles(user_params)
"""
def __init__(self, model_params: ModelParams):
"""
Initialize a MobModel instance.
The parameters are described in the class docstring.
"""
self.model_params = model_params
[docs]
def generate_mob_profiles(self, user_params: UserParamsMobModel) -> MobProfiles:
"""
Generate synthetic mobility profiles for a fleet of vehicles.
This method creates mobility profiles by simulating vehicle movements using a Markov chain
approach. For each time step, the model determines vehicle locations based on transition
probabilities, identifies journey starts and ends, and calculates speeds and distances.
Parameters
----------
user_params : :class:`UserParamsMobModel`
User-defined parameters specifying the number of vehicles, simulation period,
and other configuration settings.
Returns
-------
:class:`MobProfiles`
Generated mobility profiles containing logbooks, vehicles, clusters, and locations data.
"""
logger.info(
"Start generating mobility profiles for %d vehicles from %s to %s",
user_params.number_vehicles,
user_params.start_date,
user_params.end_date,
)
# Set random seed
np.random.seed(user_params.random_seed)
# Predefine variables
self._predefine_vars(user_params=user_params)
previous_start = np.zeros((self._number_vehicles,), dtype=int)
# Add rich progress bar for vehicle loop
for t in track(range(1, self._number_steps), description="Generating mobility profiles:"):
# Determine new location based on transition matrix
self._generate_location(t)
for v in range(self._number_vehicles):
location_t = self._location_array[t, v]
location_tminus1 = self._location_array[t - 1, v]
# Identify start and end of journeys
if location_t == 0 and location_tminus1 != 0 and t != self._number_steps - 1:
# Start of a new journey
self._start_journey_array[t, v] = True
previous_start[v] = t
elif (location_t > 0 or t == (self._number_steps - 1)) and location_tminus1 == 0:
# End of the current journey
journey_duration_h = (t - previous_start[v]) * self.model_params.info.temp_res
self._duration_array[previous_start[v], v] = journey_duration_h
if t == (self._number_steps - 1):
# Intervene if journey ends at the last time step
# Set location to most frequent location:
most_frequent_location = mode(self._location_array[:, v])[0]
self._location_array[t, v] = most_frequent_location
# Claculate speed and distance arrays based on start_journey_array and duration_array
self._generate_speed_and_distance()
# Convert arrays to MobProfiles instance
mob_profiles = self._convert_arrays2mob_profiles()
return mob_profiles
def _predefine_vars(self, user_params: UserParamsMobModel) -> None:
"""
Predefine variables for the mobility model.
Args:
user_params: UserParamsMobModel dataclass containing user parameters.
"""
self._number_vehicles = user_params.number_vehicles
# Datetime array mit Buffer über Utility-Funktion
self._dt_array, self._mask_buffer = get_datetime_array(
start_date=user_params.start_date,
end_date=user_params.end_date,
temp_res=self.model_params.info.temp_res,
number_days_buffer=user_params.days_buffer,
)
self._number_steps = len(self._dt_array)
self._index_day_array = get_day_index(self._dt_array, self.model_params.info.temp_res)
weekday_array = self._dt_array.weekday
mask_cluster1 = self.model_params.df["id_cluster"] == 1
typedays = [[int(i) for i in list(x)] for x in self.model_params.df.loc[mask_cluster1, "weekdays"]]
typedays_array = TypeDays(typedays).weekday2typeday(weekday_array)
first_weekday = self.model_params.df["weekdays"].apply(lambda x: x[0])
self.model_params.df["typeday"] = TypeDays(typedays).weekday2typeday(first_weekday)
# Store transition matrices in one array for faster access
self._tm_array = np.stack(self.model_params.df["transition_matrix"].to_numpy())
# generate random number to determine new location
self._rand1_array = np.random.rand(self._number_steps, self._number_vehicles)
# split vehicles into clusters
cluster_array = self._split_vehicles_per_cluster(self._number_vehicles)
# Determine index of parameters based on cluster_array and typedays_array
self._index_params_array = (
self.model_params.df.reset_index()
.pivot(index="typeday", columns="id_cluster", values="index")
.loc[typedays_array, cluster_array]
.to_numpy()
)
# Initialize arrays for location, speed, distance, duration (sparse for speed, distance, duration)
self._location_array = np.zeros((self._number_steps, self._number_vehicles), dtype=int) # location of vehicles
self._speed_array = np.zeros(
(self._number_steps, self._number_vehicles), dtype=float
) # speed of journeys (sparse)
self._distance_array = np.zeros(
(self._number_steps, self._number_vehicles), dtype=float
) # distance of journeys (sparse)
self._duration_array = np.zeros(
(self._number_steps, self._number_vehicles), dtype=float
) # duration of journeys (sparse)
self._start_journey_array = np.zeros(
(self._number_steps, self._number_vehicles), dtype=bool
) # start of journeys
# Set first location for all vehicles
self._location_array[0, :] = user_params.first_loc
def _split_vehicles_per_cluster(self, number_vehicles: int) -> np.ndarray:
"""
Split the total number of vehicles into clusters based on the model parameters.
Args:
number_vehicles: Total number of vehicles to split.
Returns:
dict[int, int]: Dictionary with cluster ID as key and number of vehicles as value.
"""
percentages_per_cluster = self.model_params.df.groupby("id_cluster")["percentage"].first()
vehicles_per_cluster = (percentages_per_cluster / 100 * number_vehicles).round().astype(int)
rest = number_vehicles - vehicles_per_cluster.sum()
if rest > 0:
# Assign remaining vehicles to the largest cluster
largest_cluster = vehicles_per_cluster.idxmax()
vehicles_per_cluster[largest_cluster] += rest
# Create array with cluster IDs for each vehicle
cluster_array = np.zeros(number_vehicles, dtype=int)
current_idx = 0
for cluster_id, n_vehicles in vehicles_per_cluster.items():
cluster_array[current_idx : current_idx + n_vehicles] = cluster_id
current_idx += n_vehicles
return cluster_array
def _generate_location(self, t) -> None:
"""
Generate the locations for all vehicle for one timestep based on the transition matrix.
Args:
t: Time step
"""
# Parameter index, previous location and day index for all vehicles
params_idx = self._index_params_array[t, :]
loc_tminus1 = self._location_array[t - 1, :]
day_idx = self._index_day_array[t]
# Transition vectors and cumulative transition vectors for all vehicles
trans_vecs = self._tm_array[params_idx, day_idx, loc_tminus1, :]
cum_trans_vecs = np.cumsum(trans_vecs, axis=1)
cum_trans_vecs[:, -1] = 1.0
# Random numbers for all vehicles
rand_t = self._rand1_array[t, :]
# New locations for all vehicles
location_t = np.sum(rand_t[:, None] > cum_trans_vecs, axis=1)
self._location_array[t, :] = location_t
def _generate_speed_and_distance(self) -> None:
"""
Generate speed and distance arrays based on start_journey_array and duration_array.
"""
# generate speed and distance for all fields where duration > 0
mask_start = self._start_journey_array
index_params_jarray = self._index_params_array[mask_start]
edges_duration = self.model_params.df.loc[0, "speed_dist_edges_duration"]
# only use the array entries where journeys start: journeys array (jarray)
duration_jarray = self._duration_array[mask_start]
number_journeys = duration_jarray.shape[0]
# identify idx_duration for all journeys
idx_duration_jarray = np.searchsorted(edges_duration, duration_jarray, side="right") - 1
max_index_duration = len(edges_duration) - 2
idx_duration_jarray = np.minimum(idx_duration_jarray, max_index_duration) # cap at max index
# Get speed distribution parameters for all journeys (vectorized, no loop)
speed_param1_full = np.array(
self.model_params.df["speed_dist_param1"].to_list()
) # shape: (n_paramsets, n_bins)
speed_param1_jarray = speed_param1_full[index_params_jarray, idx_duration_jarray]
speed_param2_full = np.array(
self.model_params.df["speed_dist_param2"].to_list()
) # shape: (n_paramsets, n_bins)
speed_param2_jarray = speed_param2_full[index_params_jarray, idx_duration_jarray]
speed_max_array = self.model_params.df["speed_max"].to_numpy()[index_params_jarray]
# Generate random numbers for all journeys where duration > 0
rand2_array = np.random.rand(number_journeys)
# Generate speed for all journeys (vectorized, no loop)
speed_jarray = beta.ppf(rand2_array, speed_param1_jarray, speed_param2_jarray) * speed_max_array
# Generate distance array
distance_jarray = speed_jarray * duration_jarray
# Set speed and distance values back to full arrays
self._speed_array[mask_start] = speed_jarray
self._distance_array[mask_start] = distance_jarray
def _convert_arrays2mob_profiles(self) -> MobProfiles:
"""
Convert the generated arrays to a pandas DataFrame representing the logbook.
Returns:
pd.DataFrame: DataFrame containing the logbook data.
"""
# Extract first and last datetime without buffer
dt_no_buffer = self._dt_array[~self._mask_buffer]
first_step_no_buffer = dt_no_buffer[0]
last_step_no_buffer = dt_no_buffer[-1]
# convert location idx into location IDs
self._location_array = self.model_params.info.locations[self._location_array]
# get rows and cols of journeys
rows, cols = np.nonzero(self._start_journey_array)
sort_idx = np.lexsort((rows, cols)) # sort by vehicle and time
rows_sorted = rows[sort_idx]
cols_sorted = cols[sort_idx]
# Predefine empty logbook DataFrame
logbook_df = pd.DataFrame()
logbook_df["id_vehicle"] = cols_sorted + 1 # vehicle IDs start at 1
logbook_df["dep_dt"] = self._dt_array[rows_sorted]
logbook_df["arr_dt"] = self._dt_array[rows_sorted] + pd.to_timedelta(
self._duration_array[rows_sorted, cols_sorted], unit="h"
)
logbook_df["dep_loc"] = self._location_array[rows_sorted - 1, cols_sorted]
step_end_journey = rows_sorted + (
self._duration_array[rows_sorted, cols_sorted] / self.model_params.info.temp_res
).round().astype(int)
logbook_df["arr_loc"] = self._location_array[step_end_journey, cols_sorted]
logbook_df["distance"] = self._distance_array[rows_sorted, cols_sorted]
# Remove buffer from logbook_df:
# Deleteing rows with arr_dt before first_step_no_buffer or dep_dt after last_step_no_buffer
mask_buffer_logbook = (logbook_df["arr_dt"] < first_step_no_buffer) | (
logbook_df["dep_dt"] > last_step_no_buffer
)
logbook_df = logbook_df[~mask_buffer_logbook].reset_index(drop=True)
# remove buffer from all arrays
self._dt_array = self._dt_array[~self._mask_buffer]
self._start_journey_array = self._start_journey_array[~self._mask_buffer, :]
self._duration_array = self._duration_array[~self._mask_buffer, :]
self._distance_array = self._distance_array[~self._mask_buffer, :]
self._location_array = self._location_array[~self._mask_buffer, :]
self._speed_array = self._speed_array[~self._mask_buffer, :]
# Create vehicle DataFrame
first_day = first_step_no_buffer.floor("D")
last_day = last_step_no_buffer.floor("D")
id_cluster = self.model_params.df.id_cluster[self._index_params_array[1, :]]
vehicle_df = pd.DataFrame(
{
"id_vehicle": range(1, self._number_vehicles + 1),
"first_day": [first_day] * self._number_vehicles,
"last_day": [last_day] * self._number_vehicles,
"id_cluster": id_cluster.tolist(),
"first_loc": self._location_array[0, :],
}
)
mob_profiles = MobProfiles(input_logbooks_df=logbook_df, input_vehicles_df=vehicle_df)
# Update location labels
locations_df = mob_profiles.locations.df
locations_df["label"] = self.model_params.info.labels_locations
mob_profiles.locations.update_locations(locations_df)
# Update cluster labels
clusters_df = mob_profiles.clusters.df
clusters_df["label"] = self.model_params.info.labels_clusters
mob_profiles.clusters.update_clusters(clusters_df)
return mob_profiles