Skip to content

AReS

Anchor/Rule-based Explanations

AReS generates rule-based global explanations.

Overview

AReS identifies interpretable rules that describe when and how predictions can be changed.

Usage

from counterfactuals.cf_methods.global_methods import AReS

method = AReS(
    gen_model=gen_model,
    disc_model=classifier,
    disc_model_criterion=criterion,
    device="cuda"
)

result = method.explain(
    X=X_test,
    y_origin=y_test,
    y_target=target_class,
    X_train=X_train,
    y_train=y_train
)

API Reference

AReS

AReS(predict_fn, dataset, X, dropped_features=[], n_bins=10, ordinal_features=[], normalise=False, constraints=[20, 7, 10], correctness=False)

(required arguments) model : Any black box model with a predict() method that returns a binary prediction x_aff : Pandas DataFrame. Full training data of interest (positive and negative predictions) dataset : Our custom dataset_loader object including the data (there is no direct need to pass X as an argument) and information on categorical/continuous features

(optional arguments) dropped_features : List of dropped features in the form of just the feature e.g. 'Foreign-Worker' add_redundant : If True, evaluate each candidate rule and reject those which don't provide any recourse for the affected inputs (speeds up optimisation) apriori_threshold : The support threshold used by the apriori algorithm (probability of an itemset, lower values thus return more possible rules) constraints : As defined by the paper e1 = total number of rules e2 = maximum rule width (number of conditions) e3 = total number of unique subgroup descriptors (outer if-then clauses) lams : hyperparameters for objective function (list of size 4 for AReS, size 2 for our objective) feature_costs : optional vector for defined feature costs (otherwise, we use l1 norm) ordinal_features : List of categorical features that require ordinal costs when moving between categories (typically continuous features which have been one-hot encoded before model training) original_objective : If True, use the original AReS objective function (otherwise just optimise correctness and cost) n_bins : number of (equal) bins for continuous variables normalise : If True, normalise the inputs prior to the self.model.predict() call then_generation : Apriori threshold value. In progress. If not None, then generate the "then" condition using apriori on a set filtered according to each if-if condition (search for candidate rules in SDxRL). May find more relevant rules. If None, search for candidate rules in SDxRLxRL. There's also the possibility to set "then" to RL, and divide SD appropriately to match RL. As well as (alternatively) the possibility to allow "then" to not match the "inner-if" entirely.

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def __init__(
    self,
    predict_fn,
    dataset,
    X,
    dropped_features=[],
    n_bins=10,
    ordinal_features=[],
    normalise=False,
    constraints=[20, 7, 10],
    correctness=False,
):
    """
    Normalise is implemented differently to GLOBE_CE
    AReS Implementation:

    (required arguments)
    model            : Any black box model with a predict() method that
                       returns a binary prediction
    x_aff            : Pandas DataFrame. Full training data of interest
                       (positive and negative predictions)
    dataset          : Our custom dataset_loader object including the data
                       (there is no direct need to pass X as an argument)
                       and information on categorical/continuous features

    (optional arguments)
    dropped_features    : List of dropped features in the form of just the
                          feature e.g. 'Foreign-Worker'
    add_redundant       : If True, evaluate each candidate rule and reject
                          those which don't provide any recourse for the
                          affected inputs (speeds up optimisation)
    apriori_threshold   : The support threshold used by the apriori
                          algorithm (probability of an itemset, lower
                          values thus return more possible rules)
    constraints         : As defined by the paper
                          e1 = total number of rules
                          e2 = maximum rule width (number of conditions)
                          e3 = total number of unique subgroup descriptors
                               (outer if-then clauses)
    lams                : hyperparameters for objective function (list of
                          size 4 for AReS, size 2 for our objective)
    feature_costs       : optional vector for defined feature costs
                          (otherwise, we use l1 norm)
    ordinal_features    : List of categorical features that require ordinal
                          costs when moving between categories (typically
                          continuous features which have been one-hot
                          encoded before model training)
    original_objective  : If True, use the original AReS objective function
                          (otherwise just optimise correctness and cost)
    n_bins              : number of (equal) bins for continuous variables
    normalise           : If True, normalise the inputs prior to the
                          self.model.predict() call
    then_generation     : Apriori threshold value. In progress. If not
                          None, then generate the "then" condition using
                          apriori on a set filtered according to each if-if
                          condition (search for candidate rules in SDxRL).
                          May find more relevant rules. If None, search for
                          candidate rules in SDxRLxRL. There's also the
                          possibility to set "then" to RL, and divide SD
                          appropriately to match RL. As well as
                          (alternatively) the possibility to allow "then"
                          to not match the "inner-if" entirely.
    """

    # Set Input Parameters
    self.predict_fn = predict_fn
    self.normalise = normalise
    if self.normalise:
        self.means = X.values.mean(axis=0)
        self.stds = X.values.std(axis=0)
        self.preds = self.predict_fn((X.values - self.means) / self.stds)
    else:
        self.preds = self.predict_fn(X)  # to determine affected inputs
    self.X_original = X  # store original inputs
    # copy is needed since continuous features are binned for apriori
    self.X = self.X_original.copy()
    self.dataset = copy.deepcopy(dataset)
    self.dropped_features = dropped_features
    self.ordinal_features = ordinal_features
    self.n_bins = n_bins
    self.correctness = correctness
    self.e1, self.e2, self.e3 = constraints

    # Store Features. Generate l1 feature costs (need to differentiate between categorical/continuous)
    # Continuous/categorical/non-dropped features are all computed/stored
    self.features_tree = (
        self.dataset.features_tree
    )  # dictionary of form 'feature: [feature values]'
    self.features_tree_dropped = copy.deepcopy(self.features_tree)
    for feature in self.dropped_features:
        del self.features_tree_dropped[feature]
    self.features = self.dataset.features  # list of feature values (includes class label)
    # list of categorical features (not values)
    # if a continuous feature was binned before model training, then it's treated as categorical (though ordinal)
    self.categorical_features = self.dataset.categorical_features

    # Bin continuous features and store resulting data (dimensionality of input data increases)
    self.X, self.binned_features, self.binned_features_continuous = (
        self.bin_continuous_features(self.X)
    )
    self.continuous_features = []  # list of continuous features
    self.feature_costs_vector = np.zeros(len(self.features) - 1)
    self.non_ordinal_categories_idx = np.ones(len(self.features) - 1, dtype=bool)
    i = 0
    for feature in self.features_tree:
        if feature not in self.categorical_features:
            self.continuous_features.append(feature)
            self.feature_costs_vector[i] = (
                1 / self.bin_widths[feature]
            )  # includes dropped features
            self.non_ordinal_categories_idx[i] = True
            i += 1
        else:
            n = len(self.features_tree[feature])
            if feature in self.ordinal_features:
                self.feature_costs_vector[i : i + n] = range(
                    n
                )  # if ordinal, default to unit change between bins
                self.non_ordinal_categories_idx[i : i + n] = False
            else:
                self.feature_costs_vector[i : i + n] = (
                    0.5  # categorical features have cost 1 (2 changes of 0.5)
                )
                self.non_ordinal_categories_idx[i : i + n] = True
            i += n
    # either we bin continuous features before model training (ordinal categories),
    # or we don't (non-ordinal categories)
    # non-continuous features are also included in non-ordinal categories
    # (see self.objective_terms r_costs computation)
    self.ordinal_categories_idx = ~self.non_ordinal_categories_idx
    self.any_non_ordinal = self.non_ordinal_categories_idx.any()
    self.any_ordinal = self.ordinal_categories_idx.any()

    # Drop features
    self.X_drop = self.X.copy()  # self.X_drop is used just for apriori itemset generation
    for feature in self.dropped_features:
        print("Dropping Feature:", feature)
        for feature_value in self.features_tree[feature]:
            self.X_drop = self.X_drop.drop(feature_value, axis=1)

    # Compute affected features
    self.X_aff_original = (
        self.X_original.iloc[self.preds == 0].copy().reset_index(drop=True)
    )  # original data
    self.X_aff = (
        self.X.iloc[self.preds == 0].copy().reset_index(drop=True)
    )  # data with continuous variables binned
    self.U = self.n_bins * self.e2  # custom objective function
    self.U1 = self.X_aff.shape[0] * self.e1  # incorrectrecourse
    self.U3 = 0  # featurecost, not implemented
    self.U4 = self.n_bins * self.e1 * self.e2  # featurechange

    # Assign features to feature values, used when computing if rules are valid
    self.feature_values_tree = create_feature_values_tree(self.features_tree, use_values=False)

    # The following are updated using
    # self.compute_SD_RL and self.compute_V
    self.SD, self.RL, self.RL2 = None, None, None
    self.SD_copy, self.RL_copy = None, None
    self.V, self.V_copy = None, None
    self.f, self.V_opt = None, None
    self.R = None

bin_continuous_features

bin_continuous_features(data)

Method for binning continuous features. Also computes self.bin_mids and self.bin_mids_tree (dictionary and dictionary of dictionaries respectively) which store mid point values for each bin range

Input: original data Outputs: data with continuous features binned (default is 10 equally sized bins) list of all feature values, with binned feature values included list of only binned feature values

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def bin_continuous_features(self, data):
    """
    Method for binning continuous features. Also computes self.bin_mids and self.bin_mids_tree (dictionary
    and dictionary of dictionaries respectively) which store mid point values for each bin range

    Input: original data
    Outputs: data with continuous features binned (default is 10 equally sized bins)
             list of all feature values, with binned feature values included
             list of only binned feature values
    """
    self.data_binned = data.copy()
    data_oh, features, continuous_features = [], [], set()
    self.bins = {}
    self.bin_mids = {}
    self.bin_mids_tree = {}
    self.bin_widths = {}
    for x in data.columns:
        if x.split()[0] in self.categorical_features:
            data_oh.append(pd.DataFrame(data[x]))
            features.append(x)
        else:
            self.data_binned[x], self.bins[x] = pd.cut(
                self.data_binned[x].apply(lambda x: float(x)),
                bins=self.n_bins,
                retbins=True,
            )
            one_hot = pd.get_dummies(self.data_binned[x])
            one_hot.columns = pd.Index(list(one_hot.columns))  # necessary?
            data_oh.append(one_hot)
            cols = self.data_binned[x].cat.categories
            self.bin_mids_tree[x] = {}
            width = cols.length[-1]
            self.bin_widths[x] = width
            for i, col in enumerate(cols):
                feature_value = x + " = " + str(col)
                features.append(feature_value)
                continuous_features.add(feature_value)
                self.features_tree[x].append(feature_value)
                mid = (
                    cols.mid[1] - width if i == 0 else col.mid
                )  # adjust for pd.cut extending the first bin
                self.bin_mids[feature_value] = mid
                self.bin_mids_tree[x][feature_value] = mid
    data_oh = pd.concat(data_oh, axis=1)
    data_oh.columns = features
    return data_oh, features, continuous_features

generate_itemsets

generate_itemsets(apriori_threshold, max_width=None, affected_subgroup=None, save_copy=False)
The feature value of the subgroup of interest

e.g. 'Foreign-Worker = A201' (see dataset_loader naming) If None, SD and RL are set to the same set generated by apriori

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def generate_itemsets(
    self, apriori_threshold, max_width=None, affected_subgroup=None, save_copy=False
):
    """
    affected_subgroup   : The feature value of the subgroup of interest
                          e.g. 'Foreign-Worker = A201' (see dataset_loader naming)
                          If None, SD and RL are set to the same set
                          generated by apriori
    """
    # Max width
    if max_width is None:
        max_width = self.e2 - 1
    # Compute SD and RL
    print("Computing Candidate Sets of Conjunctions of Predicates SD and RL")
    self.SD = Apriori(
        x=self.X_drop,
        apriori_threshold=apriori_threshold,
        affected_subgroup=affected_subgroup,
        max_width=max_width,
        feature_values_tree=self.feature_values_tree,
    )
    if affected_subgroup is None:
        self.RL = copy.deepcopy(self.SD)
    else:
        self.RL = Apriori(
            x=self.X_drop,
            apriori_threshold=apriori_threshold,
            affected_subgroup=None,
            max_width=max_width,
            feature_values_tree=self.feature_values_tree,
        )

    # Update affected inputs
    self.X_aff_original = (
        self.X_original.iloc[(self.preds == 0) & self.SD.sub_idx].copy().reset_index(drop=True)
    )  # original data
    self.X_aff = (
        self.X.iloc[(self.preds == 0) & self.SD.sub_idx].copy().reset_index(drop=True)
    )  # data with continuous variables binned

    print("SD and RL Computed with Lengths {} and {}".format(self.SD.length, self.RL.length))

    if save_copy:
        print("Saving Copies of SD and RL as SD_copy and RL_copy")
        self.SD_copy, self.RL_copy = copy.deepcopy(self.SD), copy.deepcopy(self.RL)

generate_groundset

generate_groundset(max_width=None, RL_reduction=False, then_generation=None, save_copy=False)

Compute candidate set of rules for self.optimise(). Determines if rules are valid and also applies maxwidth constraint. User sets self.add_redundant to False (init method) if we ignore any rules that do not provide any successful recourse (slower, but completely irrelevant rules are not added). Size of candidate rules, V, seems to be the bottleneck in the submodular maximisation.

SD and RL: outer and inner if conditions (as per paper)

SD_lengths and RL_lengths: widths of each SD/RL element feature_values_tree: as described in self.encode_feature_values then_gen UPDATE

Output: candidate set of rules after applying constraints

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def generate_groundset(
    self, max_width=None, RL_reduction=False, then_generation=None, save_copy=False
):
    """
    Compute candidate set of rules for self.optimise(). Determines if rules are valid and also applies
    maxwidth constraint. User sets self.add_redundant to False (__init__ method) if we ignore any rules
    that do not provide any successful recourse (slower, but completely irrelevant rules are not added).
    Size of candidate rules, V, seems to be the bottleneck in the submodular maximisation.

    Inputs: SD and RL: outer and inner if conditions (as per paper)
            SD_lengths and RL_lengths: widths of each SD/RL element
            feature_values_tree: as described in self.encode_feature_values
            then_gen UPDATE
    Output: candidate set of rules after applying constraints
    """
    # Max width
    if max_width is None:
        max_width = self.e2

    self.V = TwoLevelRecourseSet()

    self.V.generate_triples(
        self.SD,
        self.RL,
        max_width=max_width,
        RL_reduction=RL_reduction,
        then_generation=then_generation,
    )

    print("Ground Set Computed with Length", self.V.length)
    if save_copy:
        print("Saving Copy of Ground Set as V_copy")
        self.V_copy = copy.deepcopy(self.V)

optimise_groundset

optimise_groundset(lams, factor=1, print_updates=False, print_terms=False, save_copy=False, plot_accuracy=True)

Submodular maximisation. We make 2 major modifications: 1. Don't repeat procedure k times, where k is the number of constraints. This rarely increased performance yet increases computation time k-fold (mostly pointless despite formal guarantees) 2. Don't permit up to k elements to be exchanged (computationally infeasible- to this day I am clueless regarding how this is done efficiently). In this case, you might have 20 choose 2 = 190 options for elements to exchange (instead of just 20) which is just not a worthwhile trade-off.

Output: Final two level recourse set, S

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def optimise_groundset(
    self,
    lams,
    factor=1,
    print_updates=False,
    print_terms=False,
    save_copy=False,
    plot_accuracy=True,
):
    """
    Submodular maximisation. We make 2 major modifications:
        1. Don't repeat procedure k times, where k is the number of constraints. This rarely increased
           performance yet increases computation time k-fold (mostly pointless despite formal guarantees)
        2. Don't permit up to k elements to be exchanged (computationally infeasible- to this day I am clueless
           regarding how this is done efficiently). In this case, you might have 20 choose 2 = 190 options for
           elements to exchange (instead of just 20) which is just not a worthwhile trade-off.

    Output: Final two level recourse set, S
    """
    print("Initialising Copy of Ground Set")
    if save_copy:
        self.V_opt = None
        # ensures python doesn't use an already
        # stored version during the deepcopy process
        self.V_opt = copy.deepcopy(self.V)
        print("Ground Set Copied")
    else:
        self.V_opt = self.V
    N = self.V_opt.length
    selected_idx = np.argsort(self.V_opt.objectives)[: -(N + 1) : -1]
    self.V_opt.objectives = self.V_opt.objectives[selected_idx]
    self.V_opt.triples_array = self.V_opt.triples_array[selected_idx]
    self.V_opt.index_terms(selected_idx)
    self.V_opt.cost_matrix[np.isnan(self.V_opt.cost_matrix)] = 0
    self.f = AReS.f_custom if len(lams) == 2 else AReS.f_ares

    R_idx = np.zeros(N, dtype=bool)
    f_argmax = np.argmax(self.V_opt.objectives)
    f_max = self.V_opt.objectives[f_argmax]
    R_idx[f_argmax] = True
    f_thresh = factor * f_max

    # Compute objectives then select triples
    if len(lams) == 2:
        bounds = self.U
        self.f = AReS.f_custom
    else:
        bounds = [self.U1, self.U3, self.U4]
        self.f = AReS.f_ares

    # While there exists a delete/update operation do:
    print("While there exists a delete/update operation, loop:")
    delete, add, exchange = True, True, True
    while True:
        # Delete check
        print("Checking Delete")
        delete = False
        for idx in np.arange(N)[R_idx]:
            R_idx[idx] = False
            f_delete = self.f(self.V_opt, lams, bounds, idx=R_idx)
            if f_delete > f_thresh:
                if print_updates:
                    print("Deleting Element ({} >= {})".format(f_delete, f_thresh))
                # self.min_costs.append(self.f_custom(Si_delete, return_costs=True))
                if print_terms:
                    self.f(self.V_opt, lams, bounds, idx=R_idx, print_terms=True)
                f_thresh = factor * f_delete
                delete = True
                break
            R_idx[idx] = True
        if (not delete) and print_updates:
            print("No Delete Operation Found")
        if not (delete or add or exchange):
            break

        # Actual flow should be: always add elements until
        # constraint is reached or no element to add
        # Then exchange up to k... ?

        # Add check
        print("Checking Add")
        add = False
        if R_idx.sum() < self.e1:
            for idx in tqdm(np.arange(N)[~R_idx]):
                R_idx[idx] = True
                if self.constraints(self.V_opt.triples_array[R_idx]):
                    f_add = self.f(self.V_opt, lams, bounds, idx=R_idx)
                    if f_add > f_thresh:
                        if print_updates:
                            print("Adding Element ({} >= {})".format(f_add, f_thresh))
                        if print_terms:
                            self.f(
                                self.V_opt,
                                lams,
                                bounds,
                                idx=R_idx,
                                print_terms=True,
                            )
                        # self.min_costs.append(self.f_custom(Si_add, return_costs=True))
                        f_thresh = factor * f_add
                        add = True
                        continue
                R_idx[idx] = False
        if (not add) and print_updates:
            print("No Add Operation Found")
        if not (delete or add or exchange):
            break

        # Exchange check
        print("Checking Exchange")
        exchange = False
        for add_idx in tqdm(np.arange(N)[~R_idx]):
            # Permit only 1 removal (not k, as in algorithm)
            for delete_idx in np.arange(N)[R_idx]:
                R_idx[add_idx] = True
                R_idx[delete_idx] = False
                if self.constraints(self.V_opt.triples_array[R_idx]):
                    f_exchange = self.f(self.V_opt, lams, bounds, idx=R_idx)
                    if f_exchange > f_thresh:
                        if print_updates:
                            print("Exchanging Element ({} >= {})".format(f_exchange, f_thresh))
                        if print_terms:
                            self.f(
                                self.V_opt,
                                lams,
                                bounds,
                                idx=R_idx,
                                print_terms=True,
                            )
                        # self.min_costs.append(self.f(Si_exchange, print_terms=True, return_costs=True))
                        # self.min_costs.append(self.f_custom(Si_exchange, return_costs=True))
                        f_thresh = factor * f_exchange
                        exchange = True
                        break
                R_idx[delete_idx] = True
                R_idx[add_idx] = False
        if (not exchange) and print_updates:
            print("No Exchange Operation Found")
        if not (delete or add or exchange):
            break
        # break  # fix this to loop multiple times but only if necessary. also skip "add" if size constraints met
    # self.min_costs = np.array(self.min_costs)
    # self.V -= Si

    self.R = TwoLevelRecourseSet()
    self.R.triples = set(self.V_opt.triples_array[R_idx])
    self.R.length = len(self.R.triples)
    self.R.evaluate_triples(self, plot_accuracy=plot_accuracy)

constraints

constraints(triples)

Computes if constraints (e1: total number of rules, e3: total number of unique sub-descriptors) are violated

Input: Two Level Recourse Set, Si Output: boolean (True if constraints are not violated)

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def constraints(self, triples):
    """
    Computes if constraints (e1: total number of rules, e3: total number of unique sub-descriptors) are violated

    Input: Two Level Recourse Set, Si
    Output: boolean (True if constraints are not violated)
    """
    if len(triples) > self.e1:
        return False
    subgroups = set()
    for triple in triples:
        subgroups.add(triple[0])  # only adds if the sub-descriptor is unseen
    if len(subgroups) > self.e3:
        return False
    return True

bin_X_test

bin_X_test(data)

Combine with first class method?

Source code in counterfactuals/cf_methods/global_methods/ares/ares.py
def bin_X_test(self, data):
    """
    Combine with first class method?
    """
    label_encoder = preprocessing.LabelEncoder()
    data_encode = data.copy()
    self.data_binned_te = data.copy()
    data_oh = []
    for x in data.columns:
        if x.split()[0] in self.categorical_features:
            data_oh.append(pd.DataFrame(data[x]))
        else:
            self.data_binned_te[x] = pd.cut(
                self.data_binned_te[x].apply(lambda x: float(x)), bins=self.bins[x]
            )
            one_hot = pd.get_dummies(self.data_binned_te[x])
            one_hot.columns = pd.Index(list(one_hot.columns))  # necessary?
            data_oh.append(one_hot)
    data_oh = pd.concat(data_oh, axis=1)
    data_oh.columns = self.binned_features
    return data_oh