Source code for champpy.core.mobility.mobility_data

import pandas as pd
import numpy as np
from typing import Literal
import logging

from pydantic import validate_call
from champpy.core.mobility.mobility_components import (
    Logbooks,
    Vehicles,
    Clusters,
    Locations,
)
from champpy.utils.time_utils import get_datetime_array

logger = logging.getLogger(__name__)


[docs] class MobProfiles: """ Wrapper class for mobility profiles in the champpy framework. It contains the logbooks, vehicles, clusters and locations as separate classes. Parameters ---------- input_logbooks_df : :class:`pandas.DataFrame` Input DataFrame for the logbooks. Expected columns and dtypes: .. list-table:: :header-rows: 1 * - Column - Type - Description * - id_vehicle - :class:`int` - One-based index for vehicles, connected to id_vehicle in input_vehicles_df. * - dep_dt - :class:`pandas.Timestamp` - Departure datetime of each journey. * - arr_dt - :class:`pandas.Timestamp` - Arrival datetime of each journey. * - dep_loc - :class:`int` - Departure location of each journey as integer above 0. You can for example define 1 for home, 2 for work, etc. The location = 0 is reserved for driving and not allowed in this dataframe. * - arr_loc - :class:`int` - Arrival location of each journey as integer above 0. You can for example define 1 for home, 2 for work, etc. The location = 0 is reserved for driving and not allowed in this dataframe. * - distance - :class:`float` - Distance of each journey in km. input_vehicles_df : :class:`pandas.DataFrame`, optional Input DataFrame for the vehicles. If not provided, the vehicles will be generated from the logbooks. Expected columns and dtypes: .. list-table:: :header-rows: 1 * - Column - Type - Description * - id_vehicle - :class:`int` - Vehicle identifier. * - first_day - :class:`pandas.Timestamp` - First recorded day of the vehicle. * - last_day - :class:`pandas.Timestamp` - Last recorded day of the vehicle. * - cluster - :class:`int` - Split the vehicles into clusters by assigning a cluster ID (one-based) to each vehicle. This is optional and can be used for example to distinguish between different user groups. If you don't want to use clusters, you can simply set the cluster column to 1 for all vehicles. * - first_loc - :class:`int` - First location (optional). Use the same location encoding as in dep_loc and arr_loc in input_logbooks_df. It is espacially relevant for non-driving vehicles, which do not have any journeys in the logbooks. frozen : :class:`bool`, optional If True, the MobProfiles instance is immutable after creation. Default is False. Attributes ---------- logbooks : :class:`Logbooks` Contains the journey data of the mobility profile with departure and arrival information. vehicles : :class:`Vehicles` Contains vehicle-specific data about eaach vehicle, such as its first and last day of activity, cluster assignment, and first location. It is connected to logbooks via id_vehicle. clusters : :class:`Clusters` Describes the clusters defined in vehicles. It is connected to vehicles via id_cluster. It provides a label for each cluster. locations : :class:`Locations` Describes the locations defined in logbooks and vehicles. The location is connected to logbooks via dep_loc and arr_loc and to vehicles via first_loc. It provides a label for each location. The location = 0 is reserved for driving and gets the label "Driving". Examples -------- Create a MobProfiles instance with minimal example data: .. code-block:: python import pandas as pd import champpy # Create example logbook data with synthetic journeys logbook_df = pd.DataFrame({ 'id_vehicle': [1, 1, 2], 'dep_dt': pd.to_datetime(['2024-01-01 08:00', '2024-01-01 18:00', '2024-01-01 09:30']), 'arr_dt': pd.to_datetime(['2024-01-01 12:00', '2024-01-01 22:00', '2024-01-01 17:30']), 'dep_loc': [1, 2, 1], 'arr_loc': [2, 1, 1], 'distance': [25.5, 30.2, 18.0] }) # Create example vehicle data vehicle_df = pd.DataFrame({ 'id_vehicle': [1, 2], 'first_day': pd.to_datetime(['2024-01-01', '2024-01-01']), 'last_day': pd.to_datetime(['2024-01-02', '2024-01-02']), 'id_cluster': [1, 1], 'first_loc': [1, 1] }) # Create mobility profiles mob_profiles = champpy.MobProfiles(input_logbooks_df=logbook_df, input_vehicles_df=vehicle_df) """ def __init__( self, input_logbooks_df: pd.DataFrame, input_vehicles_df: pd.DataFrame | None = None, frozen: bool = False, ): """ Initialize a MobProfiles object. The parameters are described in the class docstring. """ # Initialize logbooks and vehicles self.logbooks = Logbooks(input_df=input_logbooks_df, frozen=frozen) if input_vehicles_df is not None: self.vehicles = Vehicles(input_df=input_vehicles_df) if self.vehicles.df["first_loc"].isnull().all(): self.vehicles.set_first_loc_from_logbooks(self.logbooks) else: self.vehicles = Vehicles(frozen=False) self.vehicles.generate_vehicles_from_logbooks(self.logbooks) # Initialize clusters self.clusters = Clusters(self.vehicles, frozen=frozen) # Initialize locations self.locations = Locations(logbooks=self.logbooks, vehicles=self.vehicles, frozen=frozen) # set frozen after initialization self._frozen = frozen self._cleaned = False # Add observers to trigger functions in logbooks and clusters on vehicle changes self.vehicles._event_on_logbooks.add_observer(self.vehicles.delete_vehicles) self.vehicles._event_on_clusters.add_observer(self.clusters.update_clusters_from_vehicles) self.logbooks._event_on_locations.add_observer(self.locations.update_locations_from_logbooks_vehicles) def __copy__(self): """Create Copy of Instance that can be called by copy.copy(obj)""" mob_profiles_copy = MobProfiles(self.logbooks.df, self.vehicles.df) # Preserve user-defined cluster and location definitions/labels in the copy mob_profiles_copy.clusters.update_clusters(self.clusters.df) mob_profiles_copy.locations.update_locations(self.locations.df) return mob_profiles_copy
[docs] def copy(self): """Create Copy of Instance""" return self.__copy__()
[docs] def add_mob_profiles( self, input_mob_profiles: "MobProfiles", old_cluster_label: str = "Old", new_cluster_label: str = "New", ) -> None: """ Add mobility data from another MobProfiles instance. The vehicles of the existing MobProfiles instance gets id_cluster = 1. The vehicles of the added MobProfiles instance gets id_cluster = 2. You can set labels for existing data using old_cluster_label and for added data using new_cluster_label. Parameters ---------- input_mob_profiles : MobProfiles Another MobProfiles instance to add data from. old_cluster_label: str Label for existing data new_cluster_label: str Label for added data Examples -------- Assuming `mob_profiles` exists (see :class:`MobProfiles` examples): .. code-block:: python # Create second dataset other_logbook_df = pd.DataFrame({...}) other_mob_profiles = champpy.MobProfiles(other_logbook_df) # Add to existing mob_profiles mob_profiles.add_mob_profiles(input_mob_profiles=other_mob_profiles, old_cluster_label="Existing", new_cluster_label="Added") """ if not isinstance(input_mob_profiles, MobProfiles): message = "other must be an instance of MobProfiles." logger.error(message) raise TypeError(message) # extract dataframes new_logbooks_df = input_mob_profiles.logbooks.df new_vehicles_df = input_mob_profiles.vehicles.df # Make sure vehicle IDs and clusters are unique across both datasets max_id_vehicle = self.vehicles.df["id_vehicle"].max() if not self.vehicles.df.empty else 0 new_vehicles_df["id_vehicle"] += max_id_vehicle new_logbooks_df["id_vehicle"] += max_id_vehicle # Old data gets id_cluster = 1 old_vehicles_df = self.vehicles.df old_vehicles_df["id_cluster"] = 1 self.vehicles.df = old_vehicles_df # new data gets cluster = 2 new_vehicles_df["id_cluster"] = 2 # Add to logbooks and vehicles self.vehicles.add_vehicles(new_vehicles_df) self.logbooks.add_journeys(new_logbooks_df) # Set cluster labels clusters_df = self.clusters.df clusters_df.loc[clusters_df["id_cluster"] == 1, "label"] = old_cluster_label clusters_df.loc[clusters_df["id_cluster"] == 2, "label"] = new_cluster_label self.clusters.update_clusters(clusters_df) # Reindex IDs id_vehicles and id_journey after addition self.reindexing()
[docs] @validate_call def reindexing(self, type: Literal["all", "id_journey", "id_vehicle", "id_cluster"] = "all") -> None: """ Reindex of IDs in the MobProfiles instance (id_journey, id_vehicle, id_cluster). Parameters ---------- type : Literal["all", "id_journey", "id_vehicle", "id_cluster"], optional Specifies which IDs to reindex. Default is "all". - "all": Reindex all IDs (id_journey, id_vehicle, id_cluster) - "id_journey": Reindex only journey IDs - "id_vehicle": Reindex only vehicle IDs - "id_cluster": Reindex only cluster IDs """ if type in ["all", "id_vehicle"]: # Reindex vehicles based on logbooks starting from 1 unique_vehicles = self.vehicles.df["id_vehicle"].unique() reindex_map = {old_id: new_id for new_id, old_id in enumerate(sorted(unique_vehicles), start=1)} self.logbooks._df["id_vehicle"] = self.logbooks._df["id_vehicle"].map(reindex_map) self.vehicles._df["id_vehicle"] = self.vehicles._df["id_vehicle"].map(reindex_map) if type in ["all", "id_cluster"]: # Reindex cluster starting from 1 unique_clusters = self.vehicles.df["id_cluster"].unique() cluster_map = { old_cluster: new_cluster for new_cluster, old_cluster in enumerate(sorted(unique_clusters), start=1) } self.vehicles._df["id_cluster"] = self.vehicles._df["id_cluster"].map(cluster_map) self.clusters._df["id_cluster"] = self.clusters._df["id_cluster"].map(cluster_map) if type in ["all", "id_journey"]: # Reindex id_journey unique_journeys = self.logbooks.df["id_journey"].unique() journey_map = {old_id: new_id for new_id, old_id in enumerate(sorted(unique_journeys), start=1)} self.logbooks._df["id_journey"] = self.logbooks._df["id_journey"].map(journey_map)
class MobProfilesExtended: """ Extended MobProfiles with additional attributes for modeling. Parameters ---------- mob_profiles : MobProfiles Base MobProfiles instance. """ def __init__(self, mob_profiles: MobProfiles, splitdays: bool = True, clustering: bool = True): if not isinstance(mob_profiles, MobProfiles): message = "mob_profiles must be an instance of MobProfiles class." logger.error(message) raise TypeError(message) # Predefine empty DataFrame with required columns self._df = pd.DataFrame( { "id_vehicle": pd.Series(dtype="int64"), "start_dt": pd.Series(dtype="datetime64[ns]"), "end_dt": pd.Series(dtype="datetime64[ns]"), "location": pd.Series(dtype="int64"), "speed": pd.Series(dtype="float64"), } ) # Extend mob_profiles to include standing and non-driving vehicles self._extended_mob_profiles(mob_profiles) # Split multi-day rows if required self._split_multi_day_rows(splitdays=splitdays) # Join the 'id_cluster' column from t_vehicle into t_location if clustering: self._df = self._df.merge( mob_profiles.vehicles._df[["id_vehicle", "id_cluster"]], on="id_vehicle", how="left", ) self._df["id_cluster"] = self._df["id_cluster"].astype("int64") self.labels_clusters = mob_profiles.clusters.df["label"].tolist() self.clusters = mob_profiles.clusters.df["id_cluster"].unique().tolist() else: self._df["id_cluster"] = 1 self.labels_clusters = ["Total"] self.clusters = [1] self.labels_locations = mob_profiles.locations.df["label"].tolist() self.locations = mob_profiles.locations.df["location"].unique().tolist() @property def df(self) -> pd.DataFrame: """Get a copy of the extended MobProfiles DataFrame.""" # Calculate distance and duration duration = (self._df["end_dt"] - self._df["start_dt"]).dt.total_seconds() / 3600 # in hours distance = self._df["speed"] * duration # in km/h return self._df.copy().assign(duration=duration, distance=distance) def _extended_mob_profiles(self, mob_profiles: MobProfiles): """ Create extended DataFrame with additional attributes. Returns ------- pd.DataFrame Extended DataFrame. """ # Logging logger.info("Extending MobProfiles") # convert automatically to uniform temporal resolution if mob_profiles.logbooks.temp_res is None: # find the minimum temporal resolution in hours min_res = mob_profiles.logbooks.df.apply( lambda row: (row["arr_dt"] - row["dep_dt"]).total_seconds() / 3600, axis=1, ).min() mob_profiles.logbooks.temp_res = min_res lb_df = mob_profiles.logbooks._df vehicles_df = mob_profiles.vehicles._df # determine first_loc for vehicles if nan if any(vehicles_df["first_loc"].isna()): vehicles_df.set_first_loc_from_logbooks(mob_profiles.logbooks) # Identify non-drivers mask_nondriver_vehicle = ~vehicles_df["id_vehicle"].isin(lb_df["id_vehicle"]) n_nondriver_vehicle = mask_nondriver_vehicle.sum() # Create t_nondriver only if there are non-driver vehicles if n_nondriver_vehicle > 0: # Use first_loc if available, otherwise use default location 1 nondriver_locations = vehicles_df.loc[mask_nondriver_vehicle, "first_loc"].astype("int64") nondriver_df = pd.DataFrame( { "id_vehicle": vehicles_df.loc[mask_nondriver_vehicle, "id_vehicle"], "start_dt": vehicles_df.loc[mask_nondriver_vehicle, "first_day"], "end_dt": vehicles_df.loc[mask_nondriver_vehicle, "last_day"], "location": nondriver_locations, "speed": 0, } ) else: nondriver_df = pd.DataFrame() # return if all vehicles are non-drivers if n_nondriver_vehicle == len(vehicles_df): self._df = nondriver_df.sort_values(by=["id_vehicle", "start_dt"]).reset_index(drop=True) return # Filter vehicles with journeys vehicle_df_drivers = vehicles_df.loc[~mask_nondriver_vehicle] # Find first and last track of each vehicle group = lb_df.groupby("id_vehicle") first_id_track = group["id_journey"].min() last_id_track = group["id_journey"].max() # Define rows for locations before the first trip start_df = pd.DataFrame( { "id_vehicle": vehicle_df_drivers["id_vehicle"], "start_dt": vehicle_df_drivers["first_day"], "end_dt": lb_df.dep_dt[lb_df["id_journey"].isin(first_id_track)].values, "location": lb_df.dep_loc[lb_df["id_journey"].isin(first_id_track)].values, "speed": 0, } ) # Define rows for locations after the last trip end_df = pd.DataFrame( { "id_vehicle": vehicle_df_drivers["id_vehicle"], "start_dt": lb_df.arr_dt[lb_df["id_journey"].isin(last_id_track)].values, "end_dt": vehicle_df_drivers["last_day"] + pd.Timedelta(days=1), "location": lb_df.arr_loc[lb_df["id_journey"].isin(last_id_track)].values, "speed": 0, } ) # Define rows for locations between trips standing_df = pd.DataFrame( { "id_vehicle": lb_df.id_vehicle[~lb_df["id_journey"].isin(last_id_track)].values, "start_dt": lb_df.arr_dt[~lb_df["id_journey"].isin(last_id_track)].values, "end_dt": lb_df.dep_dt[~lb_df["id_journey"].isin(first_id_track)].values, "location": lb_df.arr_loc[~lb_df["id_journey"].isin(last_id_track)].values, "speed": 0, } ) # Define rows for location driving driving_df = pd.DataFrame( { "id_vehicle": lb_df["id_vehicle"], "start_dt": lb_df["dep_dt"], "end_dt": lb_df["arr_dt"], "location": 0, "speed": lb_df["distance"] / ((lb_df["arr_dt"] - lb_df["dep_dt"]).dt.total_seconds() / 3600), } ) # Merge dataframes self._df = pd.concat([nondriver_df, start_df, standing_df, driving_df, end_df]).sort_values( by=["id_vehicle", "start_dt"] ) self._df.reset_index(drop=True, inplace=True) def _split_multi_day_rows(self, splitdays: bool) -> pd.DataFrame: """ Split multi-day rows in t_location into single-day rows. """ if not splitdays: return # Split multi-day rows: vehicle is at one location over several days day_start = self._df["start_dt"].dt.floor("D") day_end = self._df["end_dt"].dt.floor("D") n_days = (day_end - day_start).dt.days + 1 row_end_at_midnight = (self._df["end_dt"].dt.time == pd.Timestamp("00:00:00").time()) & (n_days > 1) # determine days per vehicle group = self._df.groupby("id_vehicle") first_day = group["start_dt"].min() last_day = group["end_dt"].max() days_per_vehicle = (last_day - first_day).dt.days # Abort if no multi-day rows exist if all(n_days == 1) or all(days_per_vehicle == 1): return self._df # New row for the last day of a multi-day row log_add_row_end = ~row_end_at_midnight & (n_days > 1) split_end_df = pd.DataFrame( { "id_vehicle": self._df.loc[log_add_row_end, "id_vehicle"], "start_dt": self._df.loc[log_add_row_end, "end_dt"].dt.floor("D"), "end_dt": self._df.loc[log_add_row_end, "end_dt"], "location": self._df.loc[log_add_row_end, "location"], "speed": self._df.loc[log_add_row_end, "speed"], } ) # New rows for constant days in the middle: vehicle is at the same location over the whole day n_parking_days = n_days - 2 n_parking_days[n_parking_days < 0] = 0 parking_start_day = day_end[n_parking_days > 0] - pd.to_timedelta(n_parking_days[n_parking_days > 0], unit="D") parking_end_day = day_end[n_parking_days > 0] parking_days = [ pd.date_range(start, end, inclusive="left") for start, end in zip(parking_start_day, parking_end_day) ] split_mid_df = pd.DataFrame( { "id_vehicle": np.repeat( self._df.loc[n_parking_days > 0, "id_vehicle"].values, n_parking_days[n_parking_days > 0], ), "start_dt": np.concatenate(parking_days), "end_dt": np.concatenate(parking_days) + pd.Timedelta(days=1), "location": np.repeat( self._df.loc[n_parking_days > 0, "location"].values, n_parking_days[n_parking_days > 0], ), "speed": 0, } ) # Modify t_location for the first day of multi-day row self._df.loc[n_days > 1, "end_dt"] = day_start[n_days > 1] + pd.Timedelta(days=1) # Merge self._df = pd.concat([self._df, split_mid_df, split_end_df]).sort_values(by=["id_vehicle", "start_dt"]) # reset index self._df.reset_index(drop=True, inplace=True) class MobArray: """ Mobility data in array format for efficient modeling. Child of MobProfilesExtended. """ def __init__(self, mob_profiles: MobProfiles): # Logging logger.info("Creating MobArray from MobProfiles") # Check that all vehicles have same first_day and last_day n_first_days = mob_profiles.vehicles.df["first_day"].nunique() n_last_days = mob_profiles.vehicles.df["last_day"].nunique() if n_first_days != 1 or n_last_days != 1: message = "All vehicles in mob_profiles must have the same first_day and last_day to create MobArray." logger.error(message) raise ValueError(message) first_day = mob_profiles.vehicles.df["first_day"].iloc[0] last_day = mob_profiles.vehicles.df["last_day"].iloc[0] mob_profiles_ext_df = MobProfilesExtended(mob_profiles=mob_profiles, splitdays=True).df temp_res = mob_profiles.logbooks.temp_res dt_array, _ = get_datetime_array(start_date=first_day, end_date=last_day, temp_res=temp_res) # Get index in dt_array for start_dt and end_dt start_idx = pd.Series( np.searchsorted(dt_array, mob_profiles_ext_df["start_dt"].values), index=mob_profiles_ext_df.index, ) end_idx = pd.Series( np.searchsorted(dt_array, mob_profiles_ext_df["end_dt"].values), index=mob_profiles_ext_df.index, ) # Predefine arrays number_vehicles = mob_profiles.vehicles.number number_steps = len(dt_array) self.location = np.zeros((number_steps, number_vehicles), dtype=int) self.speed = np.zeros((number_steps, number_vehicles), dtype=float) self.distance = np.zeros((number_steps, number_vehicles), dtype=float) self.distance_distributed = np.zeros((number_steps, number_vehicles), dtype=float) self.speed_distributed = np.zeros((number_steps, number_vehicles), dtype=float) # Extract data into 1D arrays all_idx = np.concatenate([np.arange(s, e) for s, e in zip(start_idx, end_idx)]) all_id_vehicles = np.concatenate( [np.full(e - s, vid) for vid, s, e in zip(mob_profiles_ext_df["id_vehicle"], start_idx, end_idx)] ) all_locations = np.concatenate( [np.full(e - s, loc) for loc, s, e in zip(mob_profiles_ext_df["location"], start_idx, end_idx)] ) all_speeds = np.concatenate( [np.full(e - s, spd) for spd, s, e in zip(mob_profiles_ext_df["speed"], start_idx, end_idx)] ) all_distances = np.concatenate( [np.full(e - s, spd) for spd, s, e in zip(mob_profiles_ext_df["speed"], start_idx, end_idx)] ) all_distances_distributed = np.concatenate( [ np.full(e - s, dist / (e - s) if e > s else 0) for dist, s, e in zip(mob_profiles_ext_df["distance"], start_idx, end_idx) ] ) all_speeds_distributed = np.concatenate( [np.full(e - s, spd) for spd, s, e in zip(mob_profiles_ext_df["speed"], start_idx, end_idx)] ) # Convert into 2D arrays self.location[all_idx, all_id_vehicles - 1] = all_locations self.speed[all_idx, all_id_vehicles - 1] = all_speeds self.distance[all_idx, all_id_vehicles - 1] = all_distances self.distance_distributed[all_idx, all_id_vehicles - 1] = all_distances_distributed self.speed_distributed[all_idx, all_id_vehicles - 1] = all_speeds_distributed self.id_vehicle = np.arange(1, number_vehicles + 1) # Define departure array self.departure = np.zeros((number_steps, number_vehicles), dtype=bool) self.departure = self.speed > 0 # Save datetime array self.datetime = dt_array