Source code for votekit.cleaning.rank_profiles_cleaning

from functools import partial
from typing import Callable, Union
from votekit.pref_profile import (
    RankProfile,
    CleanedRankProfile,
    ProfileError,
)
import pandas as pd
import numpy as np


def _iterate_and_clean_ranking_tuples(
    profile: RankProfile,
    clean_ranking_func: Callable[[tuple], tuple],
) -> tuple[pd.DataFrame, set[int], set[int], set[int], set[int]]:
    """
    Clean the ranking tuples of the df according to the cleaning rule.

    Args:
        profile (RankProfile): The original profile of ballots.
        clean_ranking_func (Callable[[tuple], tuple]): Function that
            takes the ranking portion of a row of the profile df and returns an altered ranking.

    Returns:
        tuple[pd.DataFrame, set[int], set[int], set[int], set[int]]: cleaned_df, unaltr_idxs,
            nonempty_altr_idxs, no_wt_altr_idxs, no_rank_altr_idxs
    """
    cleaned_df = profile.df.copy()
    assert profile.max_ranking_length is not None
    ranking_cols = [f"Ranking_{i}" for i in range(1, profile.max_ranking_length + 1)]

    orig_rows = [
        tuple(vals)
        for vals in cleaned_df[ranking_cols].itertuples(index=False, name=None)
    ]
    cleaned_rows = [clean_ranking_func(row) for row in orig_rows]

    cleaned_df[ranking_cols] = pd.DataFrame(cleaned_rows, index=cleaned_df.index)

    tilde = frozenset({"~"})
    idxs = cleaned_df.index

    unaltr_idxs = {
        idx for idx, (o, c) in zip(idxs, zip(orig_rows, cleaned_rows)) if o == c
    }
    no_rank_altr_idxs = {
        idx for idx, c in zip(idxs, cleaned_rows) if all(x == tilde for x in c)
    }
    nonempty_altr_idxs = set(idxs) - unaltr_idxs - no_rank_altr_idxs
    no_wt_altr_idxs: set[int] = set()

    return (
        cleaned_df,
        unaltr_idxs,
        nonempty_altr_idxs,
        no_wt_altr_idxs,
        no_rank_altr_idxs,
    )


[docs] def clean_rank_profile( profile: RankProfile, clean_ranking_func: Callable[[tuple], tuple], remove_empty_ballots: bool = True, remove_zero_weight_ballots: bool = True, retain_original_candidate_list: bool = True, ) -> CleanedRankProfile: """ Allows user-defined cleaning rules for RankProfile. Input function that applies modification to ranking portion of the df, not weight or voterset. Args: profile (RankProfile): A RankProfile to clean. clean_ranking_func (Callable[[tuple], tuple]): Function that takes the ranking portion of a row of the profile df and returns an altered ranking. remove_empty_ballots (bool, optional): Whether or not to remove ballots that have no ranking and no scores as a result of the cleaning. Defaults to True. remove_zero_weight_ballots (bool, optional): Whether or not to remove ballots that have no weight as a result of the cleaning. Defaults to True. retain_original_candidate_list (bool, optional): Whether or not to use the candidate list from the original profile in the new profile. If False, uses only candidates who receive votes. Defaults to True. Returns: CleanedRankProfile: A cleaned ``RankProfile``. Raises: ProfileError: Profile must only contain ranked ballots. """ if not isinstance(profile, RankProfile): raise ProfileError("Profile must be a RankProfile.") ( cleaned_df, unaltr_idxs, nonempty_altr_idxs, no_wt_altr_idxs, no_rank_altr_idxs, ) = _iterate_and_clean_ranking_tuples(profile, clean_ranking_func) if remove_empty_ballots: assert profile.max_ranking_length is not None ranking_cols = [ f"Ranking_{i}" for i in range(1, profile.max_ranking_length + 1) ] mask = ( cleaned_df[ranking_cols] .map(lambda x: x == frozenset({"~"})) # type: ignore[operator] .all(axis=1) ) cleaned_df = cleaned_df[~mask] if remove_zero_weight_ballots: cleaned_df = cleaned_df[cleaned_df["Weight"] > 0] return CleanedRankProfile( df=cleaned_df, candidates=(profile.candidates if retain_original_candidate_list else tuple()), max_ranking_length=profile.max_ranking_length, no_wt_altr_idxs=no_wt_altr_idxs, no_rank_altr_idxs=no_rank_altr_idxs, nonempty_altr_idxs=nonempty_altr_idxs, unaltr_idxs=unaltr_idxs, parent_profile=profile, df_index_column=list(cleaned_df.index), )
def remove_repeat_cands_from_ranking_row( ranking_tup: tuple, ) -> tuple: """ Given a ranking tuple, if a candidate appears multiple times, keep the first instance, and remove any further instances. Does not condense the ranking. Args: ranking_tup (tuple): Ranking to remove repeated candidates from. Returns: tuple: Ranking with duplicate candidate(s) removed. """ dedup_ranking: list[float | frozenset] = [] seen_cands = [] for cand_set in ranking_tup: if cand_set == frozenset({"~"}): dedup_ranking.append(frozenset({"~"})) continue if not isinstance(cand_set, frozenset): dedup_ranking.append(np.nan) continue new_position = [] for cand in cand_set: if cand not in seen_cands: new_position.append(cand) seen_cands.append(cand) dedup_ranking.append(frozenset(new_position)) return tuple(dedup_ranking)
[docs] def remove_repeat_cands_rank_profile( profile: RankProfile, remove_empty_ballots: bool = True, remove_zero_weight_ballots: bool = True, retain_original_candidate_list: bool = True, ) -> CleanedRankProfile: """ Given a profile, if a candidate appears multiple times on a ballot, keep the first instance and remove any further instances. Does not condense any empty rankings as as result. Only works on ranking ballots, not score ballots. Wrapper for clean_rank_profile. Args: profile (RankProfile): Profile to remove repeated candidates from. remove_empty_ballots (bool, optional): Whether or not to remove ballots that have no ranking or scores as a result of cleaning. Defaults to True. remove_zero_weight_ballots (bool, optional): Whether or not to remove ballots that have no weight as a result of cleaning. Defaults to True. retain_original_candidate_list (bool, optional): Whether or not to use the candidate list from the original profile in the new profile. If False, uses only candidates who receive votes. Defaults to True. Returns: CleanedRankProfile: A cleaned ``RankProfile``. Raises: ProfileError: Profile must only contain ranked ballots. """ return clean_rank_profile( profile, remove_repeat_cands_from_ranking_row, remove_empty_ballots, remove_zero_weight_ballots, retain_original_candidate_list, )
def remove_cand_from_ranking_row( removed: Union[str, list], ranking_tup: tuple[frozenset, ...], ) -> tuple[frozenset, ...]: """ Removes specified candidate(s) from ranking. Does not condense the resulting ranking. Args: removed (Union[str, list]): Candidate or list of candidates to be removed. ranking_tup (tuple): Ranking to remove candidates from. Returns: tuple: Ranking with candidate(s) removed. """ if isinstance(removed, str): removed = [removed] removed_set = set(removed) out = [] for s in ranking_tup: if s.isdisjoint(removed_set): out.append(s) else: out.append(frozenset(s - removed_set)) return tuple(out)
[docs] def remove_cand_rank_profile( removed: Union[str, list], profile: RankProfile, remove_empty_ballots: bool = True, remove_zero_weight_ballots: bool = True, retain_original_candidate_list: bool = False, ) -> CleanedRankProfile: """ Given a ranked profile, remove the given candidate(s) from the ballots. Does not condense the resulting ballots. Wrapper for clean_rank_profile that does some extra processing to ensure the candidate list is handled correctly. Args: removed (Union[str, list]): Candidate or list of candidates to be removed. profile (RankProfile): Profile to remove candidates from. remove_empty_ballots (bool, optional): Whether or not to remove ballots that have no ranking or scores as a result of cleaning. Defaults to True. remove_zero_weight_ballots (bool, optional): Whether or not to remove ballots that have no weight as a result of cleaning. Defaults to True. retain_original_candidate_list (bool, optional): Whether or not to use the candidate list from the orginal profile in the new profile. If False, takes the original candidate list and removes the candidate(s) given in ``removed``, but preserves all others. Defaults to False. Returns: CleanedRankProfile: A cleaned ``RankProfile``. Raises: ProfileError: Profile must only contain ranked ballots. """ if isinstance(removed, str): removed = [removed] cleaned_profile = clean_rank_profile( profile, partial(remove_cand_from_ranking_row, removed), remove_empty_ballots, remove_zero_weight_ballots, retain_original_candidate_list=True, ) new_candidates = ( profile.candidates if retain_original_candidate_list else tuple(set(profile.candidates) - set(removed)) ) return CleanedRankProfile( df=cleaned_profile.df, candidates=new_candidates, max_ranking_length=cleaned_profile.max_ranking_length, parent_profile=cleaned_profile.parent_profile, df_index_column=cleaned_profile.df_index_column, no_wt_altr_idxs=cleaned_profile.no_wt_altr_idxs, no_rank_altr_idxs=cleaned_profile.no_rank_altr_idxs, nonempty_altr_idxs=cleaned_profile.nonempty_altr_idxs, unaltr_idxs=cleaned_profile.unaltr_idxs, )
def condense_ranking_row( ranking_tup: tuple, ) -> tuple: """ Given a ranking, removes any empty ranking positions and moves up any lower ranked candidates. Args: ranking_tup (tuple): Ranking to condense. Returns: tuple: Condensed tanking. """ max_ranking_length = len(ranking_tup) condensed_ranking = [ cand_set for cand_set in ranking_tup if cand_set != frozenset() ] if len(condensed_ranking) < max_ranking_length: condensed_ranking += [frozenset("~")] * ( max_ranking_length - len(condensed_ranking) ) return tuple(condensed_ranking) def _is_equiv_to_condensed(ranking: pd.Series) -> bool: """ Returns True if the given ranking is equivalent to its condensed form. It is equivalent if the rankings are identical, or if the original ranking only has trailing empty frozensets in its ranking after some listed candidate. Args: ranking (pd.Series): Ranking to check. Returns: bool: True if the given ranking is equivalent to its condensed form. """ if all(cs == frozenset() for cs in ranking): return False for i, cand_set in enumerate(ranking): if cand_set != frozenset(): continue if all(cs in [frozenset(), frozenset("~")] for cs in ranking[i:]): return True return False return True
[docs] def condense_rank_profile( profile: RankProfile, remove_empty_ballots: bool = True, remove_zero_weight_ballots: bool = True, retain_original_candidate_list: bool = True, ) -> CleanedRankProfile: """ Given a ranked profile, removes any empty frozensets from the rankings and condenses the resulting ranking. If a ranking only has trailing empty positions, the condensed ranking is considered equivalent. For example, (A,B,{},{}) is mapped to (A,B) but considered unaltered since the ranking did not change. Wrapper for clean_rank_profile that does some extra processing to ensure condensed ranking equivalence is handled correctly. Args: profile (RankProfile): Profile to remove repeated candidates from. remove_empty_ballots (bool, optional): Whether or not to remove ballots that have no ranking or scores as a result of cleaning. Defaults to True. remove_zero_weight_ballots (bool, optional): Whether or not to remove ballots that have no weight as a result of cleaning. Defaults to True. retain_original_candidate_list (bool, optional): Whether or not to use the candidate list from the original profile in the new profile. If False, uses only candidates who receive votes. Defaults to True. Returns: CleanedRankProfile: A cleaned ``RankProfile``. """ condensed_profile = clean_rank_profile( profile, condense_ranking_row, remove_empty_ballots, remove_zero_weight_ballots, retain_original_candidate_list, ) assert profile.max_ranking_length is not None ranking_cols = [f"Ranking_{i}" for i in range(1, profile.max_ranking_length + 1)] ranking_df = profile.df[ranking_cols] additional_unaltr_idxs = set( [ i for i in condensed_profile.nonempty_altr_idxs if _is_equiv_to_condensed(ranking_df.loc[i]) # type: ignore[arg-type] ] ) new_unaltr_idxs = condensed_profile.unaltr_idxs | additional_unaltr_idxs new_nonempty_altr_idxs = condensed_profile.nonempty_altr_idxs.difference( additional_unaltr_idxs ) return CleanedRankProfile( df=condensed_profile.df, candidates=condensed_profile.candidates, max_ranking_length=condensed_profile.max_ranking_length, parent_profile=profile, df_index_column=condensed_profile.df_index_column, no_wt_altr_idxs=condensed_profile.no_wt_altr_idxs, no_rank_altr_idxs=condensed_profile.no_rank_altr_idxs, nonempty_altr_idxs=new_nonempty_altr_idxs, unaltr_idxs=new_unaltr_idxs, )
def _is_equiv_for_remove_and_condense(removed: list[str], ranking: pd.Series) -> bool: """ Returns True if the given ranking is equivalent to its removed and condensed form. It is equivalent if the ranking has no candidate in the removed list and either no empty frozensets or only trailing ones. If its has internal empty frozensets or any candidate in the removed list, it is not equivalent. Args: removed (list[str]): Candidates to be removed. ranking (pd.Series): Ranking to check. Returns: bool: True if the given ranking is equivalent to its remove and condensed form. """ if any( c_remove == cand for c_remove in removed for c_set in ranking if isinstance(c_set, frozenset) for cand in c_set ): return False if all(c_set != frozenset() for c_set in ranking): return True for i, cand_set in enumerate(ranking): if cand_set != frozenset(): continue if all(cs == frozenset() for cs in ranking[i:]): return True return False return True
[docs] def remove_and_condense_rank_profile( removed: Union[str, list], profile: RankProfile, remove_empty_ballots: bool = True, remove_zero_weight_ballots: bool = True, retain_original_candidate_list: bool = False, ) -> CleanedRankProfile: """ Given a ranked profile, remove the given candidate(s) and condense the resulting rankings. If a ranking only has trailing empty positions, the condensed ranking is considered equivalent. For example, (A,B,{},{}) is mapped to (A,B) but considered unaltered since the ranking did not change. This function is intended to save computational time in election methods, where removing and condensing happen frequently. Researches interested in the difference between removing and condensing should use ``remove_cand`` and ``condense_profile`` in series. Wrapper for clean_rank_profile that does some extra processing to ensure the candidate list is handled correctly, and that ballot equivalence is checked. Args: removed (Union[str, list]): Candidate or list of candidates to be removed. profile (RankProfile): Profile to remove repeated candidates from. remove_empty_ballots (bool, optional): Whether or not to remove ballots that have no ranking or scores as a result of cleaning. Defaults to True. remove_zero_weight_ballots (bool, optional): Whether or not to remove ballots that have no weight as a result of cleaning. Defaults to True. retain_original_candidate_list (bool, optional): Whether or not to use the candidate list from the orginal profile in the new profile. If False, takes the original candidate list and removes the candidate(s) given in ``removed``, but preserves all others. Defaults to False. Returns: CleanedRankProfile: A cleaned ``RankProfile``. """ if isinstance(removed, str): removed = [removed] cleaned_profile = clean_rank_profile( profile, lambda b: condense_ranking_row(remove_cand_from_ranking_row(removed, b)), remove_empty_ballots, remove_zero_weight_ballots, retain_original_candidate_list=True, ) new_candidates = ( profile.candidates if retain_original_candidate_list else tuple(set(profile.candidates) - set(removed)) ) assert profile.max_ranking_length is not None ranking_cols = [f"Ranking_{i}" for i in range(1, profile.max_ranking_length + 1)] ranking_df = profile.df[ranking_cols] additional_unaltr_idxs = set( [ i for i in cleaned_profile.nonempty_altr_idxs if _is_equiv_for_remove_and_condense( removed, ranking_df.loc[i], # type: ignore[arg-type] ) ] ) new_unaltr_idxs = cleaned_profile.unaltr_idxs | additional_unaltr_idxs new_nonempty_altr_idxs = cleaned_profile.nonempty_altr_idxs.difference( additional_unaltr_idxs ) return CleanedRankProfile( df=cleaned_profile.df, candidates=new_candidates, max_ranking_length=cleaned_profile.max_ranking_length, parent_profile=cleaned_profile.parent_profile, df_index_column=cleaned_profile.df_index_column, no_wt_altr_idxs=cleaned_profile.no_wt_altr_idxs, no_rank_altr_idxs=cleaned_profile.no_rank_altr_idxs, nonempty_altr_idxs=new_nonempty_altr_idxs, unaltr_idxs=new_unaltr_idxs, )