#!/usr/bin/env python
"""Functions for streamlining analysis."""
# Imports
from collections import defaultdict, Sequence, namedtuple
import functools
import itertools
import concurrent.futures as futures
import pandas as pd
import numpy as np
import statsmodels.formula.api as smf
import statsmodels
import munch
import patsy
from crunchers import parallel as putils
from crunchers import ipython_info
if ipython_info():
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
# Metadata
__author__ = "Gus Dunn"
__email__ = "w.gus.dunn@gmail.com"
[docs]def tree():
return defaultdict(tree)
[docs]def report_logitreg(formula, data, verbose=True, disp=1):
"""Fit logistic regression, print a report, and return the fit object."""
y, X = patsy.dmatrices(formula, data=data, return_type='dataframe')
results = smf.Logit(endog=y, exog=X).fit(disp=disp)
# results = smf.logit(formula, data=data).fit()
summary = results.summary()
margeff = results.get_margeff().summary()
if verbose:
report = """\n{summary}\n\n\n{margeff}\n""".format(summary=summary,margeff=margeff)
print(report)
return results
[docs]def report_glm(formula, data, verbose=True, **kwargs):
"""Fit GLM, print a report, and return the fit object."""
results = smf.glm(formula, data=data, **kwargs).fit(disp=False, **kwargs)
summary = results.summary()
if verbose:
report = """\n{summary}\n""".format(summary=summary)
print(report)
return results
[docs]def report_ols(formula, data, fit_regularized=False, L1_wt=1, refit=False, **kwargs):
"""Fit OLS regression, print a report, and return the fit object."""
RegressionResultsWrapper = statsmodels.regression.linear_model.RegressionResultsWrapper
# parse formula string into design matrix
y, X = patsy.dmatrices(formula, data=data, return_type='dataframe')
if fit_regularized:
# Does not wrap in RegressionResultsWrapper automatically when using elastic_net
results = RegressionResultsWrapper(smf.OLS(endog=y, exog=X,).fit_regularized(start_params=None, L1_wt=L1_wt, refit=refit, **kwargs))
else:
results = smf.OLS(endog=y, exog=X,).fit(**kwargs)
return results
[docs]def report_rlm(formula, data, verbose=True, **kwargs):
"""Fit RLM, print a report, and return the fit object."""
results = smf.rlm(formula, data=data, **kwargs).fit(**kwargs)
summary = results.summary()
if verbose:
report = """\n{summary}\n""".format(summary=summary)
print(report)
return results
[docs]def do_regression(data, y_var, X_ctrls=None, X_hyp=None, kind='OLS', **kwargs):
"""Provide a further abstracted way to build and run multiple types of regressions.
data (pd.DataFrame): data table to use when retrieving the column headers
y_var (str): column header of the outcome variable
X_ctrls (str): formula specification of the "boring" variables "column_header_1 + column_header_2"...
X_hyp (str): formula specification of the "interesting" variables "column_header_1 + column_header_2"...
kind (str): the type of regression to run `kind in ['GLM','OLS','RLM'] == True`
"""
assert (X_ctrls is not None) or (X_hyp is not None)
if X_hyp is not None:
X_hyp = ' + {X_hyp}'.format(X_hyp=X_hyp)
else:
X_hyp = ''
if X_ctrls is None:
X_ctrls = ''
formula = '{y_var} ~ {X_ctrls}{X_hyp}'.format(y_var=y_var,
X_ctrls=X_ctrls,
X_hyp=X_hyp)
regs = munch.Munch()
regs.GLM = report_glm
regs.OLS = report_ols
regs.RLM = report_rlm
regs.LOGIT = report_logitreg
return regs[kind](formula=formula,
data=data,
**kwargs)
[docs]def build_regression_models_grid(X_hyps_dicts, ctrl_coefs_dicts, outcomes_dicts):
grid = munch.Munch()
combos = itertools.product(X_hyps_dicts.keys(),ctrl_coefs_dicts.keys(),outcomes_dicts.keys())
for model_vars in tqdm(combos, desc="building regression model grid"):
grid_key = putils.GridKey(*model_vars)
grid[grid_key] = putils.GridValues(X_hyps=X_hyps_dicts[grid_key.X_hyps],
ctrl_coefs=ctrl_coefs_dicts[grid_key.ctrl_coefs],
outcomes=outcomes_dicts[grid_key.outcomes])
return grid
[docs]def regression_grid_single(grid_item, data, kind, **kwargs):
grid_key, grid_value = grid_item
y_var = grid_value.outcomes
X_ctrls = ' + '.join(grid_value.ctrl_coefs)
X_hyp = ' + '.join(grid_value.X_hyps)
try:
result = do_regression(data=data,
y_var=y_var,
X_ctrls=X_ctrls,
X_hyp=X_hyp,
kind=kind,
**kwargs
)
return grid_key, result
except np.linalg.linalg.LinAlgError:
print('error with: {x}'.format(x=X_hyp))
return grid_key, 'LinAlgError'
[docs]def run_regressions_grid(grid, data, kind, max_workers=None, **kwargs):
regressions = tree()
partial_regression_grid_single = functools.partial(regression_grid_single,
data=data,
kind=kind,
**kwargs
)
with futures.ProcessPoolExecutor(max_workers=max_workers) as worker_pool:
# results = tqdm(worker_pool.map(partial_regression_grid_single, (grid_item for grid_item in grid.items())))
results = worker_pool.map(partial_regression_grid_single, (grid_item for grid_item in grid.items()))
for job in tqdm(results, total=len(grid)):
grid_key, reg_result = job
X_hyps = grid_key.X_hyps
ctrl_coefs = grid_key.ctrl_coefs
outcomes = grid_key.outcomes
regressions[outcomes][ctrl_coefs][X_hyps] = reg_result
return munch.munchify(regressions)
[docs]def summarize_multi_LOGIT(results):
"""Return dataframe aggregating over-all stats from a dictionary-like object containing LOGIT result objects."""
s = defaultdict(defaultdict)
for name,reg in results.items():
s['converged'][name] = reg.mle_retvals['converged']
s['iterations'][name] = reg.mle_retvals['iterations']
s['warnflag'][name] = reg.mle_retvals['warnflag']
s['pseudo_rsqrd'][name] = reg.prsquared
s['aic'][name] = reg.aic
return pd.DataFrame(s)
[docs]def summarize_single_OLS(regression, col_dict, name, is_regularized=False):
"""Return dataframe aggregating over-all stats from a dictionary-like object containing OLS result objects."""
reg = regression
try:
col_dict['rsquared'][name] = reg.rsquared
except AttributeError:
col_dict['rsquared'][name] = 'NA'
try:
col_dict['rsquared_adj'][name] = reg.rsquared_adj
except AttributeError:
col_dict['rsquared_adj'][name] = 'NA'
col_dict['f_pvalue'][name] = reg.f_pvalue
col_dict['condition_number'][name] = reg.condition_number
col_dict['regularized'][name] = is_regularized
if not is_regularized:
outliers = reg.outlier_test(method='fdr_bh')['fdr_bh(p)'] <= 0.05
col_dict['n_outliers'][name] = (outliers).sum()
col_dict['outliers'][name] = ','.join(outliers.index[outliers].values)
else:
col_dict['n_outliers'][name] = "NA"
col_dict['outliers'][name] = "NA"
col_dict['aic'][name] = reg.aic
return col_dict
[docs]def summarize_multi_OLS(results):
"""Return dataframe aggregating over-all stats from a dictionary-like object containing OLS result objects."""
col_dict = defaultdict(defaultdict)
test_reg = list(results.values())[0]
try:
is_regularized = test_reg.regularized
except AttributeError:
is_regularized = False
for name, reg in results.items():
# TODO: Adpat summarize_single_OLS to be used here
col_dict = summarize_single_OLS(regression=reg, col_dict=col_dict, name=name, is_regularized=is_regularized)
df = pd.DataFrame(col_dict)
df.index.name = 'outcome'
return df
[docs]def summarize_grid_OLS(regs, reg_grid):
summaries_overall = []
grid_keys = list(reg_grid.keys())
for gk in tqdm(grid_keys, desc="initial summary"):
# col_dict = defaultdict(defaultdict)
r = regs[gk.outcomes][gk.ctrl_coefs][gk.X_hyps]
df_part = summarize_multi_OLS({gk.X_hyps: r})
df_part.index.name = 'X_hyps'
df_part = df_part.reset_index()
df_part['ctrl_coefs'] = gk.ctrl_coefs
df_part['outcomes'] = gk.outcomes
summaries_overall.append(df_part)
summaries_overall_df = pd.concat(summaries_overall)
# calculate q-values and add to regression objects
outcome_ctrl_grps = summaries_overall_df.groupby(["outcomes","ctrl_coefs"])
qvals = []
def assign_qvalue_reg(row, regs):
qval = row['qvalue_reg']
r = regs[row.outcomes][row.ctrl_coefs][row.X_hyps]
r.qvalue_reg = qval
for name,df in tqdm(outcome_ctrl_grps, desc="adding qvalues"):
df = df.copy()
df.index.name = 'dropme'
df['qvalue_reg'] = statsmodels.stats.multitest.multipletests(pvals=df['f_pvalue'], alpha=0.05, method='fdr_bh')[1]
df["reg_obj"] = df.apply(lambda row: assign_qvalue_reg(row, regs), axis=1)
df = df.set_index(["outcomes", "ctrl_coefs","X_hyps"])
qvals.append(df)
summaries_overall_df = pd.concat(qvals)
columns = ["aic","condition_number","f_pvalue","qvalue_reg","n_outliers","outliers","regularized","rsquared","rsquared_adj"]
return summaries_overall_df[columns]
[docs]def get_diff(a,b):
return abs(a) - abs(b)
[docs]def get_log2_fold(a,b):
return np.log2(abs(a) / abs(b))
[docs]def compare_coefs(row, value, results):
reg = results[row['regression']]
try:
X_var = reg.params[row['X_hyp']]
value = reg.params[value]
except KeyError:
return "NA"
comparisons = "{value} | {diff} | {log2_fold}".format(value=round(value,4),
diff=round(get_diff(value,X_var),4),
log2_fold=round(get_log2_fold(value,X_var),4)
)
return comparisons
[docs]def identify_full_ctrl_names(X_vars, orig_ctrl_names):
"""Return set of variable names actually used in regression, tolerating mangling of categoricals."""
X_vars = set(X_vars)
ctrls = []
for X_var in X_vars:
for orig_ctrl in orig_ctrl_names:
if X_var == orig_ctrl:
ctrls.append(X_var)
elif X_var.startswith(orig_ctrl) and (X_var.startswith('C(') and X_var.endswith(']')):
ctrls.append(X_var)
else:
pass
return set(ctrls)
[docs]def summarize_X_vars(results, sig_thresh=0.05, X_ctrls=None, X_ignore=None):
if sig_thresh is None:
sig_thresh = 1
if X_ctrls is None:
X_ctrls = []
if X_ignore is None:
X_ignore = []
regs_dfs = []
for name,reg in results.items():
regs_dfs.append(pd.DataFrame(data={'pvalue_coef':reg.pvalues,
'coef':reg.params,
'regression': name,
'pvalue_reg': reg.f_pvalue,
'qvalue_reg': reg.qvalue_reg
},
columns=['regression','pvalue_reg','qvalue_reg','pvalue_coef','coef'],
)
)
pvals = pd.concat(regs_dfs)
pvals['X_hyp'] = pvals.index.values
pvals = pvals.reset_index(drop=True)
X_ctrls = identify_full_ctrl_names(X_vars=pvals['X_hyp'].values, orig_ctrl_names=X_ctrls)
X_ignore.extend(X_ctrls)
ignore_these = pvals.X_hyp.isin(X_ignore)
sig = pvals[~ignore_these].query(''' pvalue_coef <= {thresh} '''.format(thresh=sig_thresh))
return sig
[docs]def summarize_grid_X_vars_OLS(regs, reg_grid, sig_thresh=0.05):
rename_map = {"pvalue_coef": "pvalue_param_coef",
"coef": "param_coef",
"X_hyp": "param_name"}
summaries_X_vars = []
grid_keys = list(reg_grid.keys())
for gk in tqdm(grid_keys, desc="initial summaries"):
# col_dict = defaultdict(defaultdict)
r = regs[gk.outcomes][gk.ctrl_coefs][gk.X_hyps]
# df_part = stats.summarize_multi_OLS({gk.X_hyps:r})
df_part = summarize_X_vars(results={gk.X_hyps:r}, sig_thresh=sig_thresh, X_ctrls=None, X_ignore=["Intercept"])
# df_part.index.name = 'X_hyps'
# df_part = df_part.reset_index(inplace=False)
df_part['ctrl_coefs'] = gk.ctrl_coefs
df_part['qvalue_reg'] = r.qvalue_reg
df_part['outcomes'] = gk.outcomes
summaries_X_vars.append(df_part)
columns = ["pvalue_reg","qvalue_reg","pvalue_param_coef","param_coef"]
summaries_X_vars_df = pd.concat(summaries_X_vars).rename(columns=rename_map).set_index(["outcomes", "ctrl_coefs","regression","param_name"], append=False, inplace=False).sort_index()[columns]
return summaries_X_vars_df