Source code for votekit.pref_profile.utils

from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .pref_profile import PreferenceProfile, RankProfile, ScoreProfile

from votekit.ballot import Ballot, RankBallot, ScoreBallot
from typing import Optional, Sequence
import pandas as pd
from functools import partial
import numpy as np


def _convert_ranking_cols_to_ranking(
    row: pd.Series, max_ranking_length: int
) -> Optional[tuple[frozenset, ...]]:
    """
    Convert the ranking cols to a ranking tuple in profile.df.

    Args:
        row (pd.Series): Row of a profile.df.
        max_ranking_length (int, optional): The maximum length of a ranking.

    Returns:
        Optional[tuple[frozenset, ...]]: Ranking of ballot.

    Raises:
        ValueError: NaN values can only trail on a ranking.

    """
    ranking_cols_idxs = [f"Ranking_{i+1}" for i in range(max_ranking_length)]

    if any(idx not in row.index for idx in ranking_cols_idxs):
        raise ValueError(f"Row has improper ranking columns: {row.index}.")

    if any(
        row[col_idx] == frozenset({"~"})
        and not all(row[idx] == frozenset({"~"}) for idx in ranking_cols_idxs[i:])
        for i, col_idx in enumerate(ranking_cols_idxs)
    ):
        raise ValueError(
            f"Row {row} has '~' between valid ranking positions. "
            "'~' values can only trail on a ranking."
        )

    ranking = [
        row[col_idx] for col_idx in ranking_cols_idxs if row[col_idx] != frozenset("~")
    ]

    return tuple(ranking) if len(ranking) > 0 else None


[docs] def convert_row_to_rank_ballot( row: pd.Series, max_ranking_length: int = 0 ) -> RankBallot: """ Convert a row of a properly formatted profile.df to a Ballot. Args: row (pd.Series): Row of a profile.df. max_ranking_length (int, optional): The maximum length of a ranking. Defaults to 0, which is used for ballots with no ranking. Returns: RankBallot: Ballot corresponding to the row of the df. """ ranking = None if max_ranking_length > 0: ranking = _convert_ranking_cols_to_ranking(row, max_ranking_length) voter_set = row["Voter Set"] weight = row["Weight"] return RankBallot( ranking=ranking, weight=weight, voter_set=voter_set, )
[docs] def convert_row_to_score_ballot( row: pd.Series, candidates: tuple[str, ...] ) -> ScoreBallot: """ Convert a row of a properly formatted profile.df to a Ballot. Args: row (pd.Series): Row of a profile.df. candidates (tuple[str,...]): The name of the candidates. Returns: ScoreBallot: Ballot corresponding to the row of the df. """ scores = {c: row[c] for c in candidates if c in row and not pd.isna(row[c])} voter_set = row["Voter Set"] weight = row["Weight"] return ScoreBallot( scores=scores if scores != dict() else None, weight=weight, voter_set=voter_set, )
def _df_to_rank_ballot_tuple( df: pd.DataFrame, candidates: tuple[str, ...], max_ranking_length: int = 0 ) -> tuple[RankBallot, ...]: """ Convert a properly formatted profile.df into a list of ballots. Args: df (pd.DataFrame): A profile.df. candidates (tuple[str,...]): The candidates. max_ranking_length (int, optional): The maximum length of a ranking. Defaults to 0, which is used for ballots with no ranking. Returns: tuple[RankBallot]: The tuple of ballots. """ if df.empty: return tuple() return tuple( df.apply( # type: ignore[call-overload] partial( convert_row_to_rank_ballot, max_ranking_length=max_ranking_length, ), axis="columns", ) )
[docs] def rank_profile_to_ballot_dict( rank_profile: RankProfile, standardize: bool = False ) -> dict[RankBallot, float]: """ Converts profile to dictionary with keys = ballots and values = corresponding total weights. Args: rank_profile (RankProfile): Profile to convert. standardize (bool, optional): If True, divides the weight of each ballot by the total weight. Defaults to False. Returns: dict[Ballot, float]: A dictionary with ballots (keys) and corresponding total weights (values). """ tot_weight = rank_profile.total_ballot_wt di: dict = {} for ballot in rank_profile.ballots: weightless_ballot = Ballot( ranking=ballot.ranking, voter_set=ballot.voter_set, ) weight = ballot.weight if standardize: weight /= tot_weight if weightless_ballot not in di.keys(): di[weightless_ballot] = weight else: di[weightless_ballot] += weight return di
[docs] def score_profile_to_ballot_dict( score_profile: ScoreProfile, standardize: bool = False ) -> dict[ScoreBallot, float]: """ Converts profile to dictionary with keys = ballots and values = corresponding total weights. Args: score_profile (ScoreProfile): Profile to convert. standardize (bool, optional): If True, divides the weight of each ballot by the total weight. Defaults to False. Returns: dict[Ballot, float]: A dictionary with ballots (keys) and corresponding total weights (values). """ tot_weight = score_profile.total_ballot_wt di: dict = {} for ballot in score_profile.ballots: weightless_ballot = Ballot( scores=ballot.scores, voter_set=ballot.voter_set, ) weight = ballot.weight if standardize: weight /= tot_weight if weightless_ballot not in di.keys(): di[weightless_ballot] = weight else: di[weightless_ballot] += weight return di
[docs] def rank_profile_to_ranking_dict( rank_profile: RankProfile, standardize: bool = False ) -> dict[tuple[frozenset[str], ...], float]: """ Converts profile to dictionary with keys = rankings and values = corresponding total weights. Args: rank_profile (RankProfile): Profile to convert. standardize (bool, optional): If True, divides the weight of each ballot by the total weight. Defaults to False. Returns: dict[tuple[frozenset[str],...], float]: A dictionary with rankings (keys) and corresponding total weights (values). Raises: TypeError: Profile must be a RankProfile. """ from .pref_profile import RankProfile if not isinstance(rank_profile, RankProfile): raise TypeError(("Profile must be a RankProfile.")) tot_weight = rank_profile.total_ballot_wt di: dict = {} for ballot in rank_profile.ballots: ranking = ballot.ranking weight = ballot.weight if standardize: weight /= tot_weight di[ranking] = di.get(ranking, 0) + weight return di
[docs] def score_profile_to_scores_dict( score_profile: ScoreProfile, standardize: bool = False ) -> dict[tuple[str, float], float]: """ Converts profile to dictionary with keys = scores and values = corresponding total weights. Args: score_profile (ScoreProfile): Profile to convert. standardize (bool, optional): If True, divides the weight of each ballot by the total weight. Defaults to False. Returns: dict[tuple[str, float], float]: A dictionary with scores (keys) and corresponding total weights (values). Raises: TypeError: Profile must be a ScoreProfile. """ from .pref_profile import ScoreProfile if not isinstance(score_profile, ScoreProfile): raise TypeError(("Profile must be a ScoreProfile.")) tot_weight = score_profile.total_ballot_wt di: dict = {} for ballot in score_profile.ballots: scores = tuple(ballot.scores.items()) if ballot.scores else None weight = ballot.weight if standardize: weight /= tot_weight di[scores] = di.get(scores, 0) + weight return di
[docs] def profile_df_head( profile: PreferenceProfile, n: int, sort_by_weight: Optional[bool] = True, percents: Optional[bool] = False, totals: Optional[bool] = False, n_decimals: int = 1, ) -> pd.DataFrame: """ Returns a pd.DataFrame with the top-n ballots in profile. Args: n (int): Number of ballots to view. sort_by_weight (bool, optional): If True, rank ballot from most to least votes. If sorting by weight, index resets. Defaults to True. percents (bool, optional): If True, show voter share for a given ballot. Defaults to False. totals (bool, optional): If True, show total values for Percent and Weight. Defaults to False. n_decimals (int, optional): Number of decimals to round to. Defaults to 1. Returns: pandas.DataFrame: A dataframe with top-n ballots. Raises: ZeroDivisionError: Profile has 0 total ballot weight; cannot show percentages. """ if sort_by_weight: df = profile.df.sort_values(by="Weight", ascending=False).head(n).copy() else: df = profile.df.head(n).copy() df_col_num = len(df.columns) if percents: if profile.total_ballot_wt == 0: raise ZeroDivisionError( "Profile has 0 total ballot weight; cannot show percentages." ) df["Percent"] = df["Weight"] / float(profile.total_ballot_wt) if totals: total_row = [""] * (df_col_num - 1) + [df["Weight"].sum()] if percents: total_row += [df["Percent"].sum()] df.loc["Total"] = total_row if percents: df["Percent"] = df["Percent"].apply(lambda x: f"{float(x):.{n_decimals}%}") return df
[docs] def profile_df_tail( profile: PreferenceProfile, n: int, sort_by_weight: Optional[bool] = True, percents: Optional[bool] = False, totals: Optional[bool] = False, n_decimals: int = 1, ) -> pd.DataFrame: """ Returns a pd.DataFrame with the bottom-n ballots in profile. Args: n (int): Number of ballots to view. sort_by_weight (bool, optional): If True, rank ballot from least to most votes. Defaults to True. percents (bool, optional): If True, show voter share for a given ballot. Defaults to False. totals (bool, optional): If True, show total values for Percent and Weight. Defaults to False. n_decimals (int, optional): Number of decimals to round to. Defaults to 1. Returns: pandas.DataFrame: A data frame with bottom-n ballots. Raises: ZeroDivisionError: Profile has 0 total ballot weight; cannot show percentages. """ if sort_by_weight: df = profile.df.sort_values(by="Weight", ascending=False).tail(n).copy() else: df = profile.df.tail(n).copy() df_col_num = len(df.columns) if percents: if profile.total_ballot_wt == 0: raise ZeroDivisionError( "Profile has 0 total ballot weight; cannot show percentages." ) df["Percent"] = df["Weight"] / float(profile.total_ballot_wt) if totals: total_row = [""] * (df_col_num - 1) + [df["Weight"].sum()] if percents: total_row += [df["Percent"].sum()] df.loc["Total"] = total_row if percents: df["Percent"] = df["Percent"].apply(lambda x: f"{float(x):.{n_decimals}%}") return df
[docs] def convert_rank_profile_to_score_profile_via_score_vector( rank_profile: RankProfile, score_vector: Sequence[float], ) -> ScoreProfile: """ Convert a rank profile to a score profile using a score vector. Ballots must not contain ties. Score vector should be non-increasing and non-negative. Args: rank_profile (RankProfile): Rank profile to convert. score_vector (Sequence[float]): Score vector to use. Returns: ScoreProfile: Score profile. Raises: ValueError: Ballots must not contain ties. ValueError: Score vector must be non-increasing and non-negative. """ # here to prevent circular import from votekit.utils import validate_score_vector from votekit.pref_profile import ScoreProfile validate_score_vector(score_vector) score_vector = list(score_vector) assert rank_profile.max_ranking_length is not None if len(score_vector) < rank_profile.max_ranking_length: score_vector += [0] * (rank_profile.max_ranking_length - len(score_vector)) ranking_cols = [ f"Ranking_{i}" for i in range(1, rank_profile.max_ranking_length + 1) ] rankings_arr = rank_profile.df[ranking_cols].to_numpy(dtype=object).ravel(order="K") if any(len(x) > 1 for x in rankings_arr): raise ValueError("Ballots must not contain ties.") cand_to_score_list = { c: [np.nan for _ in range(len(rank_profile.df))] for c in rank_profile.candidates } for df_tuple in rank_profile.df[ranking_cols].itertuples(): ballot_idx, ranking = df_tuple[0], df_tuple[1:] for ranking_pos, cand_set in enumerate(ranking): if cand_set == frozenset({"~"}): continue cand = next(iter(cand_set)) # no ties so this is unique cand_to_score_list[cand][ballot_idx] = ( score_vector[ranking_pos] if score_vector[ranking_pos] > 0 else np.nan ) new_df = pd.DataFrame(cand_to_score_list) new_df.index.name = "Ballot Index" new_df["Voter Set"] = rank_profile.df["Voter Set"] new_df["Weight"] = rank_profile.df["Weight"] return ScoreProfile( df=new_df, candidates=rank_profile.candidates, )