Source code for votekit.graphs.pairwise_comparison_graph

from itertools import combinations
import matplotlib.pyplot as plt  # type: ignore
from matplotlib.axes import Axes
import matplotlib.patches as mpatches
import networkx as nx  # type: ignore
from functools import cache
from typing import Optional
from votekit.pref_profile import RankProfile
from numpy.typing import NDArray
import numpy as np  # type: ignore
from numba import njit, float64, int32  # type: ignore


def __rows_to_indices(
    profile: RankProfile, cand_name_to_idx: dict[str, int]
) -> NDArray:
    """
    Converts the ranking columns of a RankProfile to integer indices.
    Each singleton candidate set is converted to an index based on the provided candidates list.
    A special value of -2 is used for the short ballot candidate set {"~"} and -1 for undefined
    entries.

    Args:
        profile (RankProfile): The preference profile containing rankings.
        cand_name_to_idx (dict[str, int]): A mapping from candidate names to their integer index
            representations.

    Returns:
        NDArray: A tuple containing: An NDArray of integer indices representing the rankings.
    """
    fs_to_idx = {frozenset({c}): cand_name_to_idx[c] for c in cand_name_to_idx.keys()}
    fs_to_idx[frozenset({"~"})] = -2  # Make a special value for short ballots
    assert profile.max_ranking_length is not None
    ranking_cols = [f"Ranking_{i}" for i in range(1, profile.max_ranking_length + 1)]
    mat_obj = profile.df[ranking_cols].to_numpy(object)
    flat = mat_obj.ravel()
    flat_idx = np.empty(flat.shape[0], dtype=np.int32)
    for k, x in enumerate(flat):
        flat_idx[k] = fs_to_idx.get(x, -1)  # -1 for any undefined entry
    return flat_idx.reshape(mat_obj.shape).astype(np.int32)


# NOTE: There are a couple more optimizations that could be made here, such as avoiding the
# boolean indexing and using a visited epoch array, but this is already a significant
# speedup and is further optimization just makes this hard to read.
@njit((float64[:, :], int32[:, :], float64[:], int32), cache=True, fastmath=True)
def __tally_and_mutate_head_to_head(
    mutated_head_to_head_matrix: NDArray,
    integer_rankings_mat: NDArray,
    wt_vec: NDArray,
    n_cands: int,
):
    """
    Tallies the head-to-head matrix and mutates it in place.

    Args:
        mutated_head_to_head_matrix (NDArray): The head to head matrix to mutate.
        integer_rankings_mat (NDArray): The integer rankings matrix.
        wt_vec (NDArray): The weight vector.
        n_cands (int): The number of candidates.


    Returns:
        NDArray: The mutated head to head matrix.
    """
    for row_idx, row in enumerate(integer_rankings_mat):
        seen_vec = np.zeros(n_cands, dtype=np.bool_)
        for candset_idx in row:
            # -1 is for undefined entries
            if candset_idx == -1:
                continue
            # Special value for short ballots "~"
            if candset_idx == -2:
                break

            seen_vec[candset_idx] = True

            # Candidates always beat everyone that has not been seen yet in head to head
            mutated_head_to_head_matrix[candset_idx, ~seen_vec] += wt_vec[row_idx]

    return mutated_head_to_head_matrix


[docs] def pairwise_dict( profile: RankProfile, *, sort_candidate_pairs: bool = True ) -> dict[tuple[str, str], tuple[float, float]]: """ Computes a dictionary whose keys are candidate pairs (A,B) and whose values are lists [a,b] where 'a' denotes the number of times A beats B head to head, and 'b' is the reverse. Args: profile (RankProfile): Profile to compute dict on. sort_candidate_pairs (bool): If True, candidate pairs in the pairwise comparison dictionary will be sorted lexicographically. Defaults to True. Returns: dict[tuple[str, str], tuple[float, float]]: Pairwise comparison dictionary. """ if not isinstance(profile, RankProfile): raise ValueError("Profile must be of type RankProfile.") elif not all(b.ranking is not None for b in profile.ballots): raise ValueError("All ballots must have rankings.") candidates_lst = list(profile.candidates) if sort_candidate_pairs: candidates_lst.sort() n_cands = len(candidates_lst) cand_to_idx = {c: i for i, c in enumerate(candidates_lst)} integer_rankings_mat = __rows_to_indices(profile, cand_to_idx) wt_vec = profile.df["Weight"].to_numpy().astype(np.float64) head_to_head_matrix = np.zeros((n_cands, n_cands), dtype=np.float64) head_to_head_matrix = __tally_and_mutate_head_to_head( head_to_head_matrix, integer_rankings_mat, wt_vec, n_cands ) pairwise = { (a, b): ( head_to_head_matrix[cand_to_idx[a], cand_to_idx[b]], head_to_head_matrix[cand_to_idx[b], cand_to_idx[a]], ) for a, b in combinations(sorted(candidates_lst), 2) } return pairwise
[docs] def get_dominating_tiers_digraph(graph: nx.DiGraph) -> list[set[str]]: """ Compute the dominating tiers of the pairwise comparison graph. Candidates in a tier beat all other candidates in lower tiers in head to head comparisons. In other words, every candidate in a given tier must have a path to every candidate in the lower tier in the head-to-head graph. Args: graph (nx.DiGraph): A directed graph representing pairwise comparisons. Returns: list[set[str]]: Dominating tiers, where the first entry of the list is the highest tier. """ # Condense the head-to-head cycles so we have a directed acyclic graph (DAG) condensed_acyclic_graph = nx.condensation(graph) quotient_generations = list(nx.topological_generations(condensed_acyclic_graph)) node_to_descendants = { n: set(nx.descendants(condensed_acyclic_graph, n)) for n in condensed_acyclic_graph.nodes } # Deal with unequal legs by checking checking the pairwise node sets for paths. required_merge = True while required_merge: merged_generations = [] required_merge = False seen_nodes: set[int] = set() for src_nlist_idx, source_nlist in enumerate(quotient_generations): if set(source_nlist).issubset(seen_nodes): continue generation_nlist = source_nlist.copy() # Include the source nodes in the common set to guarantee termination common_descendant_set = set.intersection( *[node_to_descendants[n] for n in source_nlist] ) | set(generation_nlist) for target_nlist_idx in range(src_nlist_idx + 1, len(quotient_generations)): target_nlist = quotient_generations[target_nlist_idx] if set(target_nlist).issubset(common_descendant_set): continue required_merge = True generation_nlist.extend(target_nlist) common_descendant_set = common_descendant_set.intersection( *[node_to_descendants[n] for n in target_nlist], ) | set(generation_nlist) merged_generations.append(generation_nlist) seen_nodes.update(generation_nlist) quotient_generations = merged_generations # Now we need to unpack the quotient generations back into our dominating tiers. return [ set().union(*[condensed_acyclic_graph.nodes[n]["members"] for n in nlist]) for nlist in quotient_generations ]
[docs] def restrict_pairwise_dict_to_subset( cand_subset: list[str] | tuple[str] | set[str], pairwise_dict: dict[tuple[str, str], tuple[float, float]], ) -> dict[tuple[str, str], tuple[float, float]]: """ Restricts the full pairwise dictionary to a subset of candidates. The pairwise dictionary is a dictionary whose keys are candidate pairs (A,B) and whose values are lists [a,b] where 'a' denotes the number of times A beats B head to head, and 'b' is the reverse. Args: cands (list[str] | tuple[str] | set[str]): Candidate subset to restrict to. pairwise_dict (dict[tuple[str, str], tuple[float, float]): Full pairwise comparison dictionary. Returns: dict[tuple[str, str], tuple[float, float]]: Pairwise dict restricted to the provided candidates. Raises: ValueError: cand_subset must be at least length 2. ValueError: cand_subset must be a subset of the candidates in the dictionary. """ if len(cand_subset) < 2: raise ValueError( f"Must be at least two candidates in cand_subset: {cand_subset}" ) candidates = [c for s in pairwise_dict.keys() for c in s] extra_cands = set(cand_subset).difference(candidates) if extra_cands != set(): raise ValueError( ( f"{extra_cands} are found in cand_subset but " f"not in the list of candidates found in the dictionary: {candidates}" ) ) new_pairwise_dict = {} for tup in combinations(cand_subset, 2): if tup in pairwise_dict: new_pairwise_dict[tup] = pairwise_dict[tup] rev_tup = (tup[1], tup[0]) if rev_tup in pairwise_dict: new_pairwise_dict[rev_tup] = pairwise_dict[rev_tup] return new_pairwise_dict
[docs] class PairwiseComparisonGraph(nx.DiGraph): """ Class to construct the pairwise comparison graph where nodes are candidates and edges are pairwise preferences. Args: profile (RankProfile): ``RankProfile`` to construct graph from. sort_candidate_pairs (bool): If True, candidate pairs in the pairwise comparison dictionary will be sorted lexicographically. Defaults to True. Attributes: profile (RankProfile): ``RankProfile`` to construct graph from. candidates (list): List of candidates. pairwise_dict (dict[tuple[str, str], tuple[float, float]]): Dictionary constructed from ``pairwise_dict``. The pairwise dictionary is a dictionary whose keys are candidate pairs (A,B) and whose values are lists [a,b] where 'a' denotes the number of times A beats B head to head, and 'b' is the number of times B beats A head to head. pairwise_graph (networkx.DiGraph): Underlying graph. """ def __init__(self, profile: RankProfile, *, sort_candidate_pairs: bool = True): self.profile = profile self.pairwise_dict = pairwise_dict( profile, sort_candidate_pairs=sort_candidate_pairs ) self.pairwise_graph: nx.DiGraph = self.__build_graph() def __build_graph(self) -> nx.DiGraph: """ Constructs the pairwise comparison graph from the pairwise comparison dictionary. """ G: nx.DiGraph = nx.DiGraph() decoupled = [item for tup in self.pairwise_dict.keys() for item in tup] G.add_nodes_from(decoupled) for (c1, c2), (v1, v2) in self.pairwise_dict.items(): if v1 > v2: if (c1, c2) not in G.edges(): G.add_edge(c1, c2, weight=v1 - v2) elif v2 > v1: if (c2, c1) not in G.edges(): G.add_edge(c2, c1, weight=v2 - v1) else: # Add in the tie edges so `ties_or_beats` works if (c1, c2) not in G.edges(): G.add_edge(c1, c2, weight=0) G.add_edge(c2, c1, weight=0) return G
[docs] def ties_or_beats(self, candidate: str) -> set[str]: """ Returns the predecessors of x, which are all of the nodes m that have a directed path from m to x. In the pairwise comparison graph, these are the candidates that tie or beat the given candidate. Args: candidate (str): Candidate. Returns: set[str]: Candidates that beat or tie given candidate head to head. """ return set(self.pairwise_graph.predecessors(candidate))
[docs] @cache def get_dominating_tiers(self) -> list[set[str]]: """ Compute the dominating tiers of the pairwise comparison graph. Candidates in a tier beat all other candidates in lower tiers in head to head comparisons. Returns: list[set[str]]: Dominating tiers, where the first entry of the list is the highest tier. """ return get_dominating_tiers_digraph(self.pairwise_graph)
[docs] def has_condorcet_winner(self) -> bool: """ Checks if graph has a condorcet winner. Returns: bool: True if condorcet winner exists, False otherwise. """ dominating_tiers = self.get_dominating_tiers() return len(dominating_tiers[0]) == 1
[docs] def get_condorcet_winner(self) -> str: """ Returns the condorcet winner. Raises a ValueError if no condorcet winner. Returns: str: The condorcet winner. Raises: ValueError: There is no condorcet winner. """ if self.has_condorcet_winner(): return list(self.get_dominating_tiers()[0])[0] else: raise ValueError("There is no condorcet winner.")
[docs] @cache def get_condorcet_cycles(self) -> list[set[str]]: """ Returns a list of condorcet cycles in the graph, which we define as any cycle of length greater than 2. Returns: list[set[str]]: List of condorcet cycles sorted by length. """ list_of_cycles = nx.recursive_simple_cycles(self.pairwise_graph) return [set(x) for x in sorted(list_of_cycles, key=lambda x: len(x))]
[docs] def has_condorcet_cycles(self) -> bool: """ Checks if graph has any condorcet cycles, which we define as any cycle of length greater than 2 in the graph. Returns: bool: True if condorcet cycles exists, False otherwise. """ return len(self.get_condorcet_cycles()) > 0
[docs] def draw( self, ax: Optional[Axes] = None, candidate_list: Optional[list[str]] = None, ) -> Axes: """ Draws pairwise comparison graph. Args: ax (Axes, optional): Matplotlib axes to plot on. Defaults to None, in which case an axes is generated. candidate_list (list[str], optional): List of candidates to plot. Defaults to None, in which case all candidates are used. Returns: Axes: Matplotlib axes of pairwise comparison graph. """ G: nx.DiGraph = self.pairwise_graph known_candidate_list = list(G.nodes) if candidate_list is None: candidate_list = known_candidate_list assert set(candidate_list).issubset(set(G.nodes)), ( f"Invalid candidates found: {set(candidate_list) - set(G.nodes)} does not appear as " f"a subset of known candidates {set(G.nodes)}" ) G = nx.DiGraph(G.subgraph(candidate_list)) ranking_labels = {c: i for i, c in enumerate(candidate_list)} if ax is None: fig, ax = plt.subplots(figsize=(10, 10)) pos = nx.circular_layout(G) nx.draw(G, pos, ax=ax, with_labels=False, node_color="lightblue", node_size=300) nx.draw_networkx_labels( G, pos, ax=ax, labels=ranking_labels, font_size=10, font_color="black" ) edge_weights = nx.get_edge_attributes(G, "weight") nx.draw_networkx_edge_labels( G, pos, ax=ax, edge_labels=edge_weights, font_size=10, label_pos=0.15 ) nx.draw_networkx_edge_labels( G, pos, ax=ax, edge_labels=edge_weights, font_size=10, label_pos=0.85 ) artists = [] for label, number in ranking_labels.items(): patch = mpatches.Patch(color="white", label=f"{number}: {label}") artists.append(patch) leg = ax.legend( handles=artists, labels=[f"{i}: {label}" for i, label in enumerate(candidate_list)], loc="center left", bbox_to_anchor=(1.03, 0.5), fontsize=10, frameon=True, borderaxespad=0.0, handlelength=0, handletextpad=0, fancybox=True, ) for item in leg.legend_handles: if item: item.set_visible(False) return ax