Module ethik.query
Expand source code
import hashlib
import itertools
import warnings
import pandas as pd
from .utils import decimal_range
from .warnings import ConstantWarning
class Query:
@classmethod
def create_gid(cls, features, targets):
k = ";".join(f"{f}={t}" for f, t in sorted(zip(features, targets)))
h = hashlib.sha256()
h.update(k.encode("utf-8"))
return h.hexdigest()
@classmethod
def taus(cls, n):
tau_precision = 2 / (n - 1)
return list(decimal_range(-1, 1, tau_precision))
@classmethod
def target_from_tau(cls, q_min, q_max, mean, tau):
return mean + tau * (max(mean - q_min, 0) if tau < 0 else max(q_max - mean, 0))
@classmethod
def _unidim_from_taus(
cls, free_features, labels, taus, q_mins, q_maxs, means, constraints=None
):
groups = []
for feature in free_features:
for tau in taus:
targets = [
cls.target_from_tau(
q_min=q_mins[feature],
q_max=q_maxs[feature],
mean=means[feature],
tau=tau,
),
*constraints.values(),
]
gid = cls.create_gid([feature, *constraints], targets)
for label in labels:
groups.append(
{
"group": gid,
"tau": tau,
"target": targets[0],
"feature": feature,
"label": label,
}
)
for constraint, target in constraints.items():
groups.append(
{
"group": gid,
"tau": None,
"target": target,
"feature": constraint,
"label": label,
}
)
query = pd.DataFrame(groups)
return query
@classmethod
def _multidim_from_taus(
cls, free_features, labels, taus, q_mins, q_maxs, means, constraints=None
):
targets_product = [
[
(
feature,
tau,
cls.target_from_tau(
q_min=q_mins[feature],
q_max=q_maxs[feature],
mean=means[feature],
tau=tau,
),
)
for tau in taus
]
for feature in free_features
]
# If `constraints` is empty, `itertools.product` would return an empty generator
if constraints:
targets_product.append(
[(feature, None, target) for feature, target in constraints.items()]
)
targets = itertools.product(*targets_product)
# Each element of `targets` is a tuple of length `n_features + n_constraints`
# The `n_features` first elements are the tuples `(feature, tau, target)` for every
# feature
# The `n_constraints` last elements are the couples `(feature, 0, target)` for every
# constraint
groups = []
for i, group in enumerate(targets):
gid = cls.create_gid(
[*free_features, *constraints], [target for _, _, target in group]
)
for label in labels:
for feature, tau, target in group:
groups.append(
{
"group": gid,
"tau": tau,
"target": target,
"feature": feature,
"label": label,
}
)
# There should be n_taus^n_features * n_features rows
return pd.DataFrame(groups)
@classmethod
def from_taus(
cls, X_test, labels, n_taus, q, link_variables=False, constraints=None
):
if constraints is None:
constraints = {}
diff_constraints = set(constraints) - set(X_test.columns)
if diff_constraints:
raise ValueError(
f"Unknown features in constraints: {', '.join(diff_constraints)}"
)
quantiles = X_test.quantile(q=q)
# Issue a warning if a feature doesn't have distinct quantiles
for feature, n_unique in quantiles.nunique().to_dict().items():
if n_unique == 1:
warnings.warn(
message=f"all the values of feature {feature} are identical",
category=ConstantWarning,
)
q_mins = quantiles.loc[q[0]].to_dict()
q_maxs = quantiles.loc[q[1]].to_dict()
# Cannot use `means = X_test.mean().to_dict()` as it may give a different
# result depending on whether the feature is alone in the dataset or not
# Instead, we compute the means for each pandas series independently
means = {feature: X_test[feature].mean() for feature in X_test.columns}
taus = cls.taus(n_taus)
# Need to keep the order of X_test columns
free_features = [f for f in X_test.columns if f not in constraints]
if link_variables:
return cls._multidim_from_taus(
free_features=free_features,
labels=labels,
taus=taus,
q_mins=q_mins,
q_maxs=q_maxs,
means=means,
constraints=constraints,
)
return cls._unidim_from_taus(
free_features=free_features,
labels=labels,
taus=taus,
q_mins=q_mins,
q_maxs=q_maxs,
means=means,
constraints=constraints,
)
Classes
class Query (*args, **kwargs)
-
Expand source code
class Query: @classmethod def create_gid(cls, features, targets): k = ";".join(f"{f}={t}" for f, t in sorted(zip(features, targets))) h = hashlib.sha256() h.update(k.encode("utf-8")) return h.hexdigest() @classmethod def taus(cls, n): tau_precision = 2 / (n - 1) return list(decimal_range(-1, 1, tau_precision)) @classmethod def target_from_tau(cls, q_min, q_max, mean, tau): return mean + tau * (max(mean - q_min, 0) if tau < 0 else max(q_max - mean, 0)) @classmethod def _unidim_from_taus( cls, free_features, labels, taus, q_mins, q_maxs, means, constraints=None ): groups = [] for feature in free_features: for tau in taus: targets = [ cls.target_from_tau( q_min=q_mins[feature], q_max=q_maxs[feature], mean=means[feature], tau=tau, ), *constraints.values(), ] gid = cls.create_gid([feature, *constraints], targets) for label in labels: groups.append( { "group": gid, "tau": tau, "target": targets[0], "feature": feature, "label": label, } ) for constraint, target in constraints.items(): groups.append( { "group": gid, "tau": None, "target": target, "feature": constraint, "label": label, } ) query = pd.DataFrame(groups) return query @classmethod def _multidim_from_taus( cls, free_features, labels, taus, q_mins, q_maxs, means, constraints=None ): targets_product = [ [ ( feature, tau, cls.target_from_tau( q_min=q_mins[feature], q_max=q_maxs[feature], mean=means[feature], tau=tau, ), ) for tau in taus ] for feature in free_features ] # If `constraints` is empty, `itertools.product` would return an empty generator if constraints: targets_product.append( [(feature, None, target) for feature, target in constraints.items()] ) targets = itertools.product(*targets_product) # Each element of `targets` is a tuple of length `n_features + n_constraints` # The `n_features` first elements are the tuples `(feature, tau, target)` for every # feature # The `n_constraints` last elements are the couples `(feature, 0, target)` for every # constraint groups = [] for i, group in enumerate(targets): gid = cls.create_gid( [*free_features, *constraints], [target for _, _, target in group] ) for label in labels: for feature, tau, target in group: groups.append( { "group": gid, "tau": tau, "target": target, "feature": feature, "label": label, } ) # There should be n_taus^n_features * n_features rows return pd.DataFrame(groups) @classmethod def from_taus( cls, X_test, labels, n_taus, q, link_variables=False, constraints=None ): if constraints is None: constraints = {} diff_constraints = set(constraints) - set(X_test.columns) if diff_constraints: raise ValueError( f"Unknown features in constraints: {', '.join(diff_constraints)}" ) quantiles = X_test.quantile(q=q) # Issue a warning if a feature doesn't have distinct quantiles for feature, n_unique in quantiles.nunique().to_dict().items(): if n_unique == 1: warnings.warn( message=f"all the values of feature {feature} are identical", category=ConstantWarning, ) q_mins = quantiles.loc[q[0]].to_dict() q_maxs = quantiles.loc[q[1]].to_dict() # Cannot use `means = X_test.mean().to_dict()` as it may give a different # result depending on whether the feature is alone in the dataset or not # Instead, we compute the means for each pandas series independently means = {feature: X_test[feature].mean() for feature in X_test.columns} taus = cls.taus(n_taus) # Need to keep the order of X_test columns free_features = [f for f in X_test.columns if f not in constraints] if link_variables: return cls._multidim_from_taus( free_features=free_features, labels=labels, taus=taus, q_mins=q_mins, q_maxs=q_maxs, means=means, constraints=constraints, ) return cls._unidim_from_taus( free_features=free_features, labels=labels, taus=taus, q_mins=q_mins, q_maxs=q_maxs, means=means, constraints=constraints, )
Static methods
def create_gid(features, targets)
-
Expand source code
@classmethod def create_gid(cls, features, targets): k = ";".join(f"{f}={t}" for f, t in sorted(zip(features, targets))) h = hashlib.sha256() h.update(k.encode("utf-8")) return h.hexdigest()
def from_taus(X_test, labels, n_taus, q, link_variables=False, constraints=None)
-
Expand source code
@classmethod def from_taus( cls, X_test, labels, n_taus, q, link_variables=False, constraints=None ): if constraints is None: constraints = {} diff_constraints = set(constraints) - set(X_test.columns) if diff_constraints: raise ValueError( f"Unknown features in constraints: {', '.join(diff_constraints)}" ) quantiles = X_test.quantile(q=q) # Issue a warning if a feature doesn't have distinct quantiles for feature, n_unique in quantiles.nunique().to_dict().items(): if n_unique == 1: warnings.warn( message=f"all the values of feature {feature} are identical", category=ConstantWarning, ) q_mins = quantiles.loc[q[0]].to_dict() q_maxs = quantiles.loc[q[1]].to_dict() # Cannot use `means = X_test.mean().to_dict()` as it may give a different # result depending on whether the feature is alone in the dataset or not # Instead, we compute the means for each pandas series independently means = {feature: X_test[feature].mean() for feature in X_test.columns} taus = cls.taus(n_taus) # Need to keep the order of X_test columns free_features = [f for f in X_test.columns if f not in constraints] if link_variables: return cls._multidim_from_taus( free_features=free_features, labels=labels, taus=taus, q_mins=q_mins, q_maxs=q_maxs, means=means, constraints=constraints, ) return cls._unidim_from_taus( free_features=free_features, labels=labels, taus=taus, q_mins=q_mins, q_maxs=q_maxs, means=means, constraints=constraints, )
def target_from_tau(q_min, q_max, mean, tau)
-
Expand source code
@classmethod def target_from_tau(cls, q_min, q_max, mean, tau): return mean + tau * (max(mean - q_min, 0) if tau < 0 else max(q_max - mean, 0))
def taus(n)
-
Expand source code
@classmethod def taus(cls, n): tau_precision = 2 / (n - 1) return list(decimal_range(-1, 1, tau_precision))