import pandas as pd
from dataclasses import dataclass
from typing import Literal, Tuple
from champpy.core.mobility.mobility_data import MobProfiles
import logging
# Configure logger for this module
logger = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True)
class LimitConfig:
"""Configuration for a single limit parameter in data cleaning."""
min_value: float = 0
"""Minimum value threshold (default: ``0``)."""
min_method: Literal["delete"] = "delete"
"""Method to handle values below minimum (default: ``"delete"``)."""
max_value: float = float("inf")
"""Maximum value threshold (default: ``inf``)."""
max_method: Literal["delete", "cap"] = "cap"
"""Method to handle values above maximum: ``"delete"`` or ``"cap"`` (default: ``"cap"``)."""
[docs]
@dataclass(frozen=True)
class UserParamsCleaning:
"""
User parameters for cleaning MobProfiles.
Configuration for data quality checks including speed, duration, distance limits
and temporal resolution settings.
"""
speed: LimitConfig = LimitConfig(min_value=0.01, min_method="delete", max_value=120.0, max_method="cap")
"""Speed limits configuration in km/h.
Default: ``LimitConfig(min_value=0.01, max_value=120.0)``
"""
duration: LimitConfig = LimitConfig(min_value=0.25, min_method="delete", max_value=8.0, max_method="cap")
"""Duration limits configuration in hours.
Default: ``LimitConfig(min_value=0.25, max_value=8.0)``
"""
distance: LimitConfig = LimitConfig(min_value=0.5, min_method="delete", max_value=500.0, max_method="cap")
"""
Distance limits configuration in kilometers.
Default: ``LimitConfig(min_value=0.5, max_value=500.0)``
"""
temp_res: float = 0.25
"""
Temporal resolution in hours for resampling during cleaning.
Default: ``0.25`` (15-minute resolution)
"""
print_summary: bool = True
"""
Whether to print cleaning summary to logger.
Default: ``True``
"""
[docs]
class MobProfilesCleaner:
"""
Cleaner for MobProfiles with configurable limits.
This class provides configurable data cleaning for mobility profiles, including
removal or capping of outliers in speed, duration, and distance. It also validates
and corrects first/last journey locations.
Parameters
----------
user_params : UserParamsCleaning, optional
Cleaning limits. If None, default limits from :class:`UserParamsCleaning` are used.
Attributes
----------
modified_id_journeys : dict
Dictionary tracking modified journeys by type of modification (distance, speed, duration, location).
deleted_id_journeys : dict
Dictionary tracking deleted journeys by type of modification (distance, speed, duration).
params : UserParamsCleaning
User parameters for cleaning.
Examples
--------
Create a cleaner with default limits:
>>> from champpy import MobProfilesCleaner, UserParamsCleaning
>>> cleaner = MobProfilesCleaner()
>>> cleaned_profiles = cleaner.clean(mob_profiles)
Create a cleaner with custom limits:
>>> from champpy import LimitConfig, UserParamsCleaning
>>> custom_params = UserParamsCleaning(
... speed=LimitConfig(min_value=0.5, max_value=100.0, max_method="cap"),
... duration=LimitConfig(min_value=0.1, max_value=12.0, max_method="cap"),
... distance=LimitConfig(min_value=0.1, max_value=600.0, max_method="cap"),
... temp_res=0.5,
... print_summary=True
... )
>>> cleaner = MobProfilesCleaner(custom_params)
>>> cleaned_profiles = cleaner.clean(mob_profiles)
"""
def __init__(self, user_params: UserParamsCleaning = None):
"""
Initialize MobProfilesCleaner with limits. See Class docstring for parameters.
"""
# initialize variables
self.params = user_params or UserParamsCleaning()
self.modified_id_journeys = {
"distance": [],
"speed": [],
"duration": [],
"location": [],
}
self.deleted_id_journeys = {"distance": [], "speed": [], "duration": []}
[docs]
def clean(self, mob_profiles: MobProfiles) -> MobProfiles:
"""
Clean the input MobProfiles based on configured limits.
This method applies the following cleaning steps:
- Resample to specified temporal resolution
- Reindex IDs for consistency
- Clean first/last journey locations
- Apply limits to duration, speed, and distance
- Log cleaning summary
Parameters
----------
mob_profiles : MobProfiles
MobProfiles instance to clean.
Returns
-------
MobProfiles
Cleaned MobProfiles instance with ``_is_cleaned`` flag set to ``True``.
"""
# do nothung if input is empty
if mob_profiles.logbooks.df is None or mob_profiles.logbooks.df.empty:
return mob_profiles
# Resample to temporal resolution
mob_profiles.logbooks.temp_res = self.params.temp_res
# Reindex id_journey, id_vehicle, id_cluster
mob_profiles.reindexing("all")
# Ensure first/last journeys start/end at plausible locations
self._clean_first_last_locations(mob_profiles)
# Apply cleaning to duration, speed, and distance
self._clean_column(mob_profiles, "duration")
self._clean_column(mob_profiles, "speed")
self._clean_column(mob_profiles, "distance")
# Reindex id_journey, id_vehicle, id_cluster
mob_profiles.reindexing("all")
# Print summary of cleaning
self._log_summary()
# mark as cleaned
mob_profiles._is_cleaned = True
return mob_profiles
def _clean_column(self, mob_profiles: MobProfiles, column: str, update_column: str = None) -> MobProfiles:
"""
Generic method to clean a specific column based on limits.
Removes journeys with values below the minimum threshold or applies capping/deletion
to values above the maximum threshold based on the configured method.
Parameters
----------
mob_profiles : MobProfiles
MobProfiles instance to clean.
column : str
Column name to check against limits (e.g., ``"distance"``, ``"speed"``, ``"duration"``).
update_column : str, optional
Column to update when capping (default: same as column).
Used when derived columns (e.g., speed) require updates to base columns (e.g., distance).
Returns
-------
MobProfiles
Cleaned MobProfiles instance.
"""
lb_df = mob_profiles.logbooks.df
limit_config = getattr(self.params, column)
# Identify rows above and below limits
mask_below = lb_df[column] < limit_config.min_value
mask_above = lb_df[column] > limit_config.max_value
# Handle values below min_value
if mask_below.any():
id_journeys_to_delete = lb_df.loc[mask_below, "id_journey"].tolist()
mob_profiles.logbooks.delete_journeys(id_journeys_to_delete)
self.deleted_id_journeys[column].extend(id_journeys_to_delete)
# Handle values above max_value
if mask_above.any():
if limit_config.max_method == "cap":
update_df = lb_df.loc[mask_above].copy()
# Calculate new value based on column type
if column == "speed":
update_df["distance"] = limit_config.max_value * update_df.duration
elif column == "duration":
update_df["arr_dt"] = update_df["dep_dt"] + pd.to_timedelta(limit_config.max_value, unit="h")
else:
update_df[update_column] = limit_config.max_value
mob_profiles.logbooks.update_journeys(update_df)
self.modified_id_journeys[column].extend(lb_df.loc[mask_above, "id_journey"].tolist())
elif limit_config.max_method == "delete":
id_journeys_to_delete = lb_df.loc[mask_above, "id_journey"].tolist()
mob_profiles.logbooks.delete_journeys(id_journeys_to_delete)
self.deleted_id_journeys[column].extend(id_journeys_to_delete)
return mob_profiles
def _get_first_last_locations_of_day(self, mob_profiles: MobProfiles) -> Tuple[pd.DataFrame, list, list]:
"""
Analyze first/last journey locations per weekday.
For each weekday and location combination, count how many first trips of the day
start at that location and how many last trips of the day end at that location.
Parameters
----------
mob_profiles : MobProfiles
MobProfiles instance with logbooks data.
Returns
-------
Tuple[pd.DataFrame, list, list]
A tuple containing:
- DataFrame with counts of first and last trips per weekday and location
- List of ``id_journey`` for trips that are first trips of the day
- List of ``id_journey`` for trips that are last trips of the day
"""
lb_df = mob_profiles.logbooks.df
if lb_df is None or lb_df.empty:
return pd.DataFrame(), [], []
# Work on a copy sorted by vehicle and dep_dt
df = lb_df.sort_values(["id_vehicle", "dep_dt"]).reset_index(drop=True)
# Add day and weekday information
df["day"] = df["dep_dt"].dt.floor("D")
df["weekday"] = df["dep_dt"].dt.dayofweek # Monday=0, Sunday=6
# Get first and last trips per day per vehicle
first_of_day = df.sort_values("dep_dt").groupby(["id_vehicle", "day"]).first().reset_index()
last_of_day = df.sort_values("dep_dt").groupby(["id_vehicle", "day"]).last().reset_index()
# Count first trips by weekday and location
first_counts = first_of_day.groupby(["weekday", "dep_loc"]).size().reset_index(name="count_first")
# Count last trips by weekday and location
last_counts = last_of_day.groupby(["weekday", "arr_loc"]).size().reset_index(name="count_last")
# Merge first and last counts for reporting
merged_counts = pd.merge(
first_counts,
last_counts,
left_on=["weekday", "dep_loc"],
right_on=["weekday", "arr_loc"],
how="outer",
).fillna(0)
id_journeys_first = first_of_day["id_journey"].tolist()
id_journeys_last = last_of_day["id_journey"].tolist()
return merged_counts, id_journeys_first, id_journeys_last
def _clean_first_last_locations(self, mob_profiles: MobProfiles) -> MobProfiles:
"""
Analyze and correct first/last journey locations per weekday.
For each weekday and location combination, count how many first trips of the day
start at that location and how many last trips of the day end at that location.
Replace implausible first/last locations with the most frequent location for that weekday.
Parameters
----------
mob_profiles : MobProfiles
MobProfiles instance with logbooks data.
Returns
-------
MobProfiles
Updated MobProfiles instance with corrected locations.
"""
# Abort if logbooks is empty
lb_df = mob_profiles.logbooks.df
if lb_df is None or lb_df.empty:
return
# Get first/last locations of day and corresponding id_journeys
merged_counts, id_journeys_first, id_journeys_last = self._get_first_last_locations_of_day(mob_profiles)
# Get dataframes for first and last of day journeys
lb_df_only_last = mob_profiles.logbooks.df.loc[mob_profiles.logbooks.df["id_journey"].isin(id_journeys_last)]
lb_df_only_first = mob_profiles.logbooks.df.loc[mob_profiles.logbooks.df["id_journey"].isin(id_journeys_first)]
# add weekday column to lb_df_only_last and lb_df_only_first
lb_df_only_last["weekday"] = lb_df_only_last["arr_dt"].dt.dayofweek
lb_df_only_first["weekday"] = lb_df_only_first["dep_dt"].dt.dayofweek
# identify first or last counts without matching counts
only_first = merged_counts[merged_counts["count_last"] == 0]
only_last = merged_counts[merged_counts["count_first"] == 0]
# merge only first on first_counts to fint the most frequent dep_loc for the smae weekday
most_frequent_first = merged_counts.loc[merged_counts.groupby("weekday")["count_first"].idxmax()]
most_frequent_last = merged_counts.loc[merged_counts.groupby("weekday")["count_last"].idxmax()]
# Update dep_loc if journey is in only_first
if not only_first.empty:
# extrat rows from first_of_day where dep_loc match only_first
first_of_day_to_update = lb_df_only_first.merge(
only_first[["weekday", "dep_loc"]],
left_on=["weekday", "dep_loc"],
right_on=["weekday", "dep_loc"],
how="inner",
)
# drop dep_loc and replace with most frequent dep_loc for that weekday
first_of_day_to_update = first_of_day_to_update.drop(columns=["dep_loc"]).merge(
most_frequent_first[["weekday", "dep_loc"]],
left_on="weekday",
right_on="weekday",
how="left",
)
mob_profiles.logbooks.update_journeys(first_of_day_to_update)
# Restore location continuity after updates
mob_profiles.logbooks.restore_location_continuity(target="dep")
# Update first_loc in vehicle dataframe - only for affected vehicles
new_first_locs = (
first_of_day_to_update[["id_vehicle", "dep_loc"]]
.drop_duplicates()
.rename(columns={"dep_loc": "first_loc"})
)
# Get full vehicle data for affected vehicles and update first_loc
vehicle_updates = mob_profiles.vehicles.df[
mob_profiles.vehicles.df["id_vehicle"].isin(new_first_locs["id_vehicle"])
].copy()
vehicle_updates = (
vehicle_updates.drop(columns=["first_loc"])
.set_index("id_vehicle")
.join(new_first_locs.set_index("id_vehicle"))
.reset_index()
)
mob_profiles.vehicles.update_vehicles(vehicle_updates)
# log modified id_journeys
self.modified_id_journeys["location"].extend(first_of_day_to_update.index.tolist())
# Update arr_loc if journey is in only_last
if not only_last.empty:
# extrat rows from last_of_day where arr_loc match only_last
last_of_day_to_update = lb_df_only_last.merge(
only_last[["weekday", "arr_loc"]],
left_on=["weekday", "arr_loc"],
right_on=["weekday", "arr_loc"],
how="inner",
)
# drop arr_loc and replace with most frequent arr_loc for that weekday
last_of_day_to_update = last_of_day_to_update.drop(columns=["arr_loc"]).merge(
most_frequent_last[["weekday", "arr_loc"]],
left_on="weekday",
right_on="weekday",
how="left",
)
mob_profiles.logbooks.update_journeys(last_of_day_to_update)
# Restore location continuity after updates
mob_profiles.logbooks.restore_location_continuity(target="arr")
# log modified id_journeys
self.modified_id_journeys["location"].extend(last_of_day_to_update.index.tolist())
return mob_profiles
def _clean_transition_between_days(self, mob_profiles: MobProfiles) -> MobProfiles:
"""
Clean logbooks to achieve consistent transition of first/last journey locations.
The number of last journeys of a weekday arriving at a location must
match the number of first journeys of the next weekday departing from that location.
.. note::
This method is not yet implemented.
Parameters
----------
mob_profiles : MobProfiles
MobProfiles instance with logbooks data.
Returns
-------
MobProfiles
Updated MobProfiles instance.
Raises
------
NotImplementedError
This method is not yet implemented.
"""
# get first/last locations of day and corresponding id_journeys
merged_counts, id_journeys_first, id_journeys_last = self._get_first_last_locations_of_day(mob_profiles)
# throw error as it is not implemented yet
raise NotImplementedError("Method _allign_first_last_locations is not implemented yet.")
# TODO: Implement alignment logic here
return mob_profiles
def _log_summary(self) -> None:
"""Print a summary of cleaning actions taken to the logger."""
# Abort if printing is disabled
if not self.params.print_summary:
return
# Log summary of cleaning
logger.info("MobProfiles Cleaning Summary:")
logger.info(f"Data has been converted to temporal resolution of {self.params.temp_res:.2f} hours.")
# Log deleted journeys summary
self._log_summary_method("delete")
# Log modified journeys summary
self._log_summary_method("modify")
logger.info("Check deleted_id_journeys and modified_id_journeys attribute for full list.")
def _log_summary_method(self, method: str) -> None:
"""
Log a summary of cleaning actions taken for a specific method.
Parameters
----------
method : str
Type of action: ``"delete"`` or ``"modify"``.
Raises
------
ValueError
If method is not ``"delete"`` or ``"modify"``.
"""
# Select appropriate dictionary based on method
if method == "delete":
id_journeys = self.deleted_id_journeys
name_action = "Deleted"
elif method == "modify":
id_journeys = self.modified_id_journeys
name_action = "Modified"
else:
message = "method must be either 'delete' or 'modify'"
logger.error(message)
raise ValueError(message)
# Build complete summary message
summary_lines = [f"{name_action} journeys:"]
for key, val in id_journeys.items():
display_ids = val[:5]
remaining = len(val) - 5
if remaining > 0:
display_list = display_ids + ["..."]
else:
display_list = display_ids
ids_str = "[" + ", ".join(map(str, display_list)) + "]"
summary_lines.append(f" - Due to {key} issues: {len(val)} journeys: id_journeys = {ids_str}")
# Log complete summary at once
message = "\n".join(summary_lines)
logger.info(message)