diff --git a/CLAUDE.md b/CLAUDE.md index 40ddad6..9e673be 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -96,10 +96,12 @@ pytest tests/test_rust_backend.py -v - `bacon_decompose()` - Convenience function for quick decomposition - Integrated with `TwoWayFixedEffects.decompose()` method -- **`diff_diff/linalg.py`** - Unified linear algebra backend (v1.4.0): +- **`diff_diff/linalg.py`** - Unified linear algebra backend (v1.4.0+): - `solve_ols()` - OLS solver using scipy's gelsy LAPACK driver (QR-based, faster than SVD) - `compute_robust_vcov()` - Vectorized HC1 and cluster-robust variance-covariance estimation - `compute_r_squared()` - R-squared and adjusted R-squared computation + - `LinearRegression` - High-level OLS helper class with unified coefficient extraction and inference + - `InferenceResult` - Dataclass container for coefficient-level inference (SE, t-stat, p-value, CI) - Single optimization point for all estimators (reduces code duplication) - Cluster-robust SEs use pandas groupby instead of O(n × clusters) loop @@ -270,7 +272,7 @@ Tests mirror the source modules: - `tests/test_sun_abraham.py` - Tests for SunAbraham interaction-weighted estimator - `tests/test_triple_diff.py` - Tests for Triple Difference (DDD) estimator - `tests/test_bacon.py` - Tests for Goodman-Bacon decomposition -- `tests/test_linalg.py` - Tests for unified OLS backend and robust variance estimation +- `tests/test_linalg.py` - Tests for unified OLS backend, robust variance estimation, LinearRegression helper, and InferenceResult - `tests/test_utils.py` - Tests for parallel trends, robust SE, synthetic weights - `tests/test_diagnostics.py` - Tests for placebo tests - `tests/test_wild_bootstrap.py` - Tests for wild cluster bootstrap diff --git a/ROADMAP.md b/ROADMAP.md index 954b4ea..1466787 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -224,7 +224,7 @@ Ongoing maintenance and developer experience. ### Code Quality - Extract shared within-transformation logic to utils -- Consolidate linear regression helpers +- ~~Consolidate linear regression helpers~~ ✓ Done (v2.1): Added `LinearRegression` helper class and `InferenceResult` dataclass in `linalg.py`. All major estimators (DifferenceInDifferences, TwoWayFixedEffects, SunAbraham, TripleDifference) now use the unified helper for coefficient extraction and inference. - Consider splitting `staggered.py` (1800+ lines) ### Documentation diff --git a/diff_diff/__init__.py b/diff_diff/__init__.py index 920891d..834077f 100644 --- a/diff_diff/__init__.py +++ b/diff_diff/__init__.py @@ -30,6 +30,10 @@ run_all_placebo_tests, run_placebo_test, ) +from diff_diff.linalg import ( + InferenceResult, + LinearRegression, +) from diff_diff.estimators import ( DifferenceInDifferences, MultiPeriodDiD, @@ -199,4 +203,7 @@ "plot_pretrends_power", # Rust backend "HAS_RUST_BACKEND", + # Linear algebra helpers + "LinearRegression", + "InferenceResult", ] diff --git a/diff_diff/estimators.py b/diff_diff/estimators.py index 40fd220..7fa4c80 100644 --- a/diff_diff/estimators.py +++ b/diff_diff/estimators.py @@ -17,7 +17,12 @@ import numpy as np import pandas as pd -from diff_diff.linalg import compute_r_squared, compute_robust_vcov, solve_ols +from diff_diff.linalg import ( + LinearRegression, + compute_r_squared, + compute_robust_vcov, + solve_ols, +) from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect from diff_diff.utils import ( WildBootstrapResults, @@ -262,56 +267,45 @@ def fit( X = np.column_stack([X, dummies[col].values.astype(float)]) var_names.append(col) - # Fit OLS using unified backend - coefficients, residuals, fitted, vcov = solve_ols( - X, y, return_fitted=True, return_vcov=False - ) - r_squared = compute_r_squared(y, residuals) - - # Extract ATT (coefficient on interaction term) + # Extract ATT index (coefficient on interaction term) att_idx = 3 # Index of interaction term att_var_name = f"{treatment}:{time}" assert var_names[att_idx] == att_var_name, ( f"ATT index mismatch: expected '{att_var_name}' at index {att_idx}, " f"but found '{var_names[att_idx]}'" ) - att = coefficients[att_idx] - # Compute degrees of freedom (used for analytical inference) - df = len(y) - X.shape[1] - n_absorbed_effects + # Always use LinearRegression for initial fit (unified code path) + # For wild bootstrap, we don't need cluster SEs from the initial fit + cluster_ids = data[self.cluster].values if self.cluster is not None else None + reg = LinearRegression( + include_intercept=False, # Intercept already in X + robust=self.robust, + cluster_ids=cluster_ids if self.inference != "wild_bootstrap" else None, + alpha=self.alpha, + ).fit(X, y, df_adjustment=n_absorbed_effects) + + coefficients = reg.coefficients_ + residuals = reg.residuals_ + fitted = reg.fitted_values_ + att = coefficients[att_idx] - # Compute standard errors and inference + # Get inference - either from bootstrap or analytical if self.inference == "wild_bootstrap" and self.cluster is not None: - # Wild cluster bootstrap for few-cluster inference - cluster_ids = data[self.cluster].values + # Override with wild cluster bootstrap inference se, p_value, conf_int, t_stat, vcov, _ = self._run_wild_bootstrap_inference( X, y, residuals, cluster_ids, att_idx ) - elif self.cluster is not None: - cluster_ids = data[self.cluster].values - vcov = compute_robust_vcov(X, residuals, cluster_ids) - se = np.sqrt(vcov[att_idx, att_idx]) - t_stat = att / se - p_value = compute_p_value(t_stat, df=df) - conf_int = compute_confidence_interval(att, se, self.alpha, df=df) - elif self.robust: - vcov = compute_robust_vcov(X, residuals) - se = np.sqrt(vcov[att_idx, att_idx]) - t_stat = att / se - p_value = compute_p_value(t_stat, df=df) - conf_int = compute_confidence_interval(att, se, self.alpha, df=df) else: - # Classical OLS standard errors - n = len(y) - k = X.shape[1] - mse = np.sum(residuals**2) / (n - k) - # Use solve() instead of inv() for numerical stability - # solve(A, B) computes X where AX=B, so this yields (X'X)^{-1} * mse - vcov = np.linalg.solve(X.T @ X, mse * np.eye(k)) - se = np.sqrt(vcov[att_idx, att_idx]) - t_stat = att / se - p_value = compute_p_value(t_stat, df=df) - conf_int = compute_confidence_interval(att, se, self.alpha, df=df) + # Use analytical inference from LinearRegression + vcov = reg.vcov_ + inference = reg.get_inference(att_idx) + se = inference.se + t_stat = inference.t_stat + p_value = inference.p_value + conf_int = inference.conf_int + + r_squared = compute_r_squared(y, residuals) # Count observations n_treated = int(np.sum(d)) diff --git a/diff_diff/linalg.py b/diff_diff/linalg.py index 51fa232..0caf419 100644 --- a/diff_diff/linalg.py +++ b/diff_diff/linalg.py @@ -14,10 +14,12 @@ fallback to NumPy/SciPy implementations. """ -from typing import Optional, Tuple, Union +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple, Union import numpy as np import pandas as pd +from scipy import stats from scipy.linalg import lstsq as scipy_lstsq # Import Rust backend if available (from _backend to avoid circular imports) @@ -183,7 +185,10 @@ def _solve_ols_numpy( NumPy/SciPy fallback implementation of solve_ols. Uses scipy.linalg.lstsq with 'gelsy' driver (QR with column pivoting) - for fast and stable least squares solving. + for numerically stable least squares solving. QR decomposition is preferred + over normal equations because it doesn't square the condition number of X, + making it more robust for ill-conditioned matrices common in DiD designs + (e.g., many unit/time fixed effects). Parameters ---------- @@ -209,9 +214,10 @@ def _solve_ols_numpy( vcov : np.ndarray, optional Variance-covariance matrix if return_vcov=True. """ - # Solve OLS using scipy's optimized solver - # 'gelsy' uses QR with column pivoting, faster than default 'gelsd' (SVD) - # Note: gelsy doesn't reliably report rank, so we don't check for deficiency + # Solve OLS using QR decomposition via scipy's optimized LAPACK routines + # 'gelsy' uses QR with column pivoting, which is numerically stable even + # for ill-conditioned matrices (doesn't square the condition number like + # normal equations would) coefficients = scipy_lstsq(X, y, lapack_driver="gelsy", check_finite=False)[0] # Compute residuals and fitted values @@ -416,3 +422,559 @@ def compute_r_squared( r_squared = 1 - (1 - r_squared) * (n - 1) / (n - n_params) return r_squared + + +# ============================================================================= +# LinearRegression Helper Class +# ============================================================================= + + +@dataclass +class InferenceResult: + """ + Container for inference results on a single coefficient. + + This dataclass provides a unified way to access coefficient estimates + and their associated inference statistics. + + Attributes + ---------- + coefficient : float + The point estimate of the coefficient. + se : float + Standard error of the coefficient. + t_stat : float + T-statistic (coefficient / se). + p_value : float + Two-sided p-value for the t-statistic. + conf_int : tuple of (float, float) + Confidence interval (lower, upper). + df : int or None + Degrees of freedom used for inference. None if using normal distribution. + alpha : float + Significance level used for confidence interval. + + Examples + -------- + >>> result = InferenceResult( + ... coefficient=2.5, se=0.5, t_stat=5.0, p_value=0.001, + ... conf_int=(1.52, 3.48), df=100, alpha=0.05 + ... ) + >>> result.is_significant() + True + >>> result.significance_stars() + '***' + """ + + coefficient: float + se: float + t_stat: float + p_value: float + conf_int: Tuple[float, float] + df: Optional[int] = None + alpha: float = 0.05 + + def is_significant(self, alpha: Optional[float] = None) -> bool: + """Check if the coefficient is statistically significant.""" + threshold = alpha if alpha is not None else self.alpha + return self.p_value < threshold + + def significance_stars(self) -> str: + """Return significance stars based on p-value.""" + if self.p_value < 0.001: + return "***" + elif self.p_value < 0.01: + return "**" + elif self.p_value < 0.05: + return "*" + elif self.p_value < 0.1: + return "." + return "" + + def to_dict(self) -> Dict[str, Union[float, Tuple[float, float], int, None]]: + """Convert to dictionary representation.""" + return { + "coefficient": self.coefficient, + "se": self.se, + "t_stat": self.t_stat, + "p_value": self.p_value, + "conf_int": self.conf_int, + "df": self.df, + "alpha": self.alpha, + } + + +class LinearRegression: + """ + OLS regression helper with unified coefficient extraction and inference. + + This class wraps the low-level `solve_ols` function and provides a clean + interface for fitting regressions and extracting coefficient-level inference. + It eliminates code duplication across estimators by centralizing the common + pattern of: fit OLS -> extract coefficient -> compute SE -> compute t-stat + -> compute p-value -> compute CI. + + Parameters + ---------- + include_intercept : bool, default True + Whether to automatically add an intercept column to the design matrix. + robust : bool, default True + Whether to use heteroskedasticity-robust (HC1) standard errors. + If False and cluster_ids is None, uses classical OLS standard errors. + cluster_ids : array-like, optional + Cluster identifiers for cluster-robust standard errors. + Overrides the `robust` parameter if provided. + alpha : float, default 0.05 + Significance level for confidence intervals. + + Attributes + ---------- + coefficients_ : ndarray + Fitted coefficient values (available after fit). + vcov_ : ndarray + Variance-covariance matrix (available after fit). + residuals_ : ndarray + Residuals from the fit (available after fit). + fitted_values_ : ndarray + Fitted values from the fit (available after fit). + n_obs_ : int + Number of observations (available after fit). + n_params_ : int + Number of parameters including intercept (available after fit). + df_ : int + Degrees of freedom (n - k) (available after fit). + + Examples + -------- + Basic usage with automatic intercept: + + >>> import numpy as np + >>> from diff_diff.linalg import LinearRegression + >>> X = np.random.randn(100, 2) + >>> y = 1 + 2 * X[:, 0] + 3 * X[:, 1] + np.random.randn(100) + >>> reg = LinearRegression().fit(X, y) + >>> print(f"Intercept: {reg.coefficients_[0]:.2f}") + >>> inference = reg.get_inference(1) # inference for first predictor + >>> print(f"Coef: {inference.coefficient:.2f}, SE: {inference.se:.2f}") + + Using with cluster-robust standard errors: + + >>> cluster_ids = np.repeat(np.arange(20), 5) # 20 clusters of 5 + >>> reg = LinearRegression(cluster_ids=cluster_ids).fit(X, y) + >>> inference = reg.get_inference(1) + >>> print(f"Cluster-robust SE: {inference.se:.2f}") + + Extracting multiple coefficients at once: + + >>> results = reg.get_inference_batch([1, 2]) + >>> for idx, inf in results.items(): + ... print(f"Coef {idx}: {inf.coefficient:.2f} ({inf.significance_stars()})") + """ + + def __init__( + self, + include_intercept: bool = True, + robust: bool = True, + cluster_ids: Optional[np.ndarray] = None, + alpha: float = 0.05, + ): + self.include_intercept = include_intercept + self.robust = robust + self.cluster_ids = cluster_ids + self.alpha = alpha + + # Fitted attributes (set by fit()) + self.coefficients_: Optional[np.ndarray] = None + self.vcov_: Optional[np.ndarray] = None + self.residuals_: Optional[np.ndarray] = None + self.fitted_values_: Optional[np.ndarray] = None + self._y: Optional[np.ndarray] = None + self._X: Optional[np.ndarray] = None + self.n_obs_: Optional[int] = None + self.n_params_: Optional[int] = None + self.df_: Optional[int] = None + + def fit( + self, + X: np.ndarray, + y: np.ndarray, + *, + cluster_ids: Optional[np.ndarray] = None, + df_adjustment: int = 0, + ) -> "LinearRegression": + """ + Fit OLS regression. + + Parameters + ---------- + X : ndarray of shape (n, k) + Design matrix. An intercept column will be added if include_intercept=True. + y : ndarray of shape (n,) + Response vector. + cluster_ids : ndarray, optional + Cluster identifiers for this fit. Overrides the instance-level + cluster_ids if provided. + df_adjustment : int, default 0 + Additional degrees of freedom adjustment (e.g., for absorbed fixed effects). + The effective df will be n - k - df_adjustment. + + Returns + ------- + self : LinearRegression + Fitted estimator. + """ + X = np.asarray(X, dtype=np.float64) + y = np.asarray(y, dtype=np.float64) + + # Add intercept if requested + if self.include_intercept: + X = np.column_stack([np.ones(X.shape[0]), X]) + + # Use provided cluster_ids or fall back to instance-level + effective_cluster_ids = cluster_ids if cluster_ids is not None else self.cluster_ids + + # Determine if we need robust/cluster vcov + compute_vcov = True + + if self.robust or effective_cluster_ids is not None: + # Use solve_ols with robust/cluster SEs + coefficients, residuals, fitted, vcov = solve_ols( + X, y, + cluster_ids=effective_cluster_ids, + return_fitted=True, + return_vcov=compute_vcov, + ) + else: + # Classical OLS - compute vcov separately + coefficients, residuals, fitted, _ = solve_ols( + X, y, + return_fitted=True, + return_vcov=False, + ) + # Compute classical OLS variance-covariance matrix + n, k = X.shape + mse = np.sum(residuals**2) / (n - k) + try: + vcov = np.linalg.solve(X.T @ X, mse * np.eye(k)) + except np.linalg.LinAlgError: + # Fall back to pseudo-inverse for singular matrices + vcov = np.linalg.pinv(X.T @ X) * mse + + # Store fitted attributes + self.coefficients_ = coefficients + self.vcov_ = vcov + self.residuals_ = residuals + self.fitted_values_ = fitted + self._y = y + self._X = X + self.n_obs_ = X.shape[0] + self.n_params_ = X.shape[1] + self.df_ = self.n_obs_ - self.n_params_ - df_adjustment + + return self + + def _check_fitted(self) -> None: + """Raise error if model has not been fitted.""" + if self.coefficients_ is None: + raise ValueError("Model has not been fitted. Call fit() first.") + + def get_coefficient(self, index: int) -> float: + """ + Get the coefficient value at a specific index. + + Parameters + ---------- + index : int + Index of the coefficient in the coefficient array. + + Returns + ------- + float + Coefficient value. + """ + self._check_fitted() + return float(self.coefficients_[index]) + + def get_se(self, index: int) -> float: + """ + Get the standard error for a coefficient. + + Parameters + ---------- + index : int + Index of the coefficient. + + Returns + ------- + float + Standard error. + """ + self._check_fitted() + return float(np.sqrt(self.vcov_[index, index])) + + def get_inference( + self, + index: int, + alpha: Optional[float] = None, + df: Optional[int] = None, + ) -> InferenceResult: + """ + Get full inference results for a coefficient. + + This is the primary method for extracting coefficient-level inference, + returning all statistics in a single call. + + Parameters + ---------- + index : int + Index of the coefficient in the coefficient array. + alpha : float, optional + Significance level for CI. Defaults to instance-level alpha. + df : int, optional + Degrees of freedom. Defaults to fitted df (n - k - df_adjustment). + Set to None explicitly to use normal distribution instead of t. + + Returns + ------- + InferenceResult + Dataclass containing coefficient, se, t_stat, p_value, conf_int. + + Examples + -------- + >>> reg = LinearRegression().fit(X, y) + >>> result = reg.get_inference(1) + >>> print(f"Effect: {result.coefficient:.3f} (SE: {result.se:.3f})") + >>> print(f"95% CI: [{result.conf_int[0]:.3f}, {result.conf_int[1]:.3f}]") + >>> if result.is_significant(): + ... print("Statistically significant!") + """ + self._check_fitted() + + coef = float(self.coefficients_[index]) + se = float(np.sqrt(self.vcov_[index, index])) + + # Handle zero or negative SE (indicates perfect fit or numerical issues) + if se <= 0: + import warnings + warnings.warn( + f"Standard error is zero or negative (se={se}) for coefficient at index {index}. " + "This may indicate perfect multicollinearity or numerical issues.", + UserWarning, + ) + # Use inf for t-stat when SE is zero (perfect fit scenario) + if coef > 0: + t_stat = np.inf + elif coef < 0: + t_stat = -np.inf + else: + t_stat = 0.0 + else: + t_stat = coef / se + + # Use instance alpha if not provided + effective_alpha = alpha if alpha is not None else self.alpha + + # Use fitted df if not explicitly provided + # Note: df=None means use normal distribution + effective_df = df if df is not None else self.df_ + + # Warn if df is non-positive and fall back to normal distribution + if effective_df is not None and effective_df <= 0: + import warnings + warnings.warn( + f"Degrees of freedom is non-positive (df={effective_df}). " + "Using normal distribution instead of t-distribution for inference.", + UserWarning, + ) + effective_df = None + + # Compute p-value + p_value = _compute_p_value(t_stat, df=effective_df) + + # Compute confidence interval + conf_int = _compute_confidence_interval(coef, se, effective_alpha, df=effective_df) + + return InferenceResult( + coefficient=coef, + se=se, + t_stat=t_stat, + p_value=p_value, + conf_int=conf_int, + df=effective_df, + alpha=effective_alpha, + ) + + def get_inference_batch( + self, + indices: List[int], + alpha: Optional[float] = None, + df: Optional[int] = None, + ) -> Dict[int, InferenceResult]: + """ + Get inference results for multiple coefficients. + + Parameters + ---------- + indices : list of int + Indices of coefficients to extract. + alpha : float, optional + Significance level for CIs. Defaults to instance-level alpha. + df : int, optional + Degrees of freedom. Defaults to fitted df. + + Returns + ------- + dict + Dictionary mapping index -> InferenceResult. + + Examples + -------- + >>> reg = LinearRegression().fit(X, y) + >>> results = reg.get_inference_batch([1, 2, 3]) + >>> for idx, inf in results.items(): + ... print(f"Coef {idx}: {inf.coefficient:.3f} {inf.significance_stars()}") + """ + self._check_fitted() + return {idx: self.get_inference(idx, alpha=alpha, df=df) for idx in indices} + + def get_all_inference( + self, + alpha: Optional[float] = None, + df: Optional[int] = None, + ) -> List[InferenceResult]: + """ + Get inference results for all coefficients. + + Parameters + ---------- + alpha : float, optional + Significance level for CIs. Defaults to instance-level alpha. + df : int, optional + Degrees of freedom. Defaults to fitted df. + + Returns + ------- + list of InferenceResult + Inference results for each coefficient in order. + """ + self._check_fitted() + return [ + self.get_inference(i, alpha=alpha, df=df) + for i in range(len(self.coefficients_)) + ] + + def r_squared(self, adjusted: bool = False) -> float: + """ + Compute R-squared or adjusted R-squared. + + Parameters + ---------- + adjusted : bool, default False + If True, return adjusted R-squared. + + Returns + ------- + float + R-squared value. + """ + self._check_fitted() + return compute_r_squared( + self._y, self.residuals_, adjusted=adjusted, n_params=self.n_params_ + ) + + def predict(self, X: np.ndarray) -> np.ndarray: + """ + Predict using the fitted model. + + Parameters + ---------- + X : ndarray of shape (n, k) + Design matrix for prediction. Should have same number of columns + as the original X (excluding intercept if include_intercept=True). + + Returns + ------- + ndarray + Predicted values. + """ + self._check_fitted() + X = np.asarray(X, dtype=np.float64) + + if self.include_intercept: + X = np.column_stack([np.ones(X.shape[0]), X]) + + return X @ self.coefficients_ + + +# ============================================================================= +# Internal helpers for inference (used by LinearRegression) +# ============================================================================= + + +def _compute_p_value( + t_stat: float, + df: Optional[int] = None, + two_sided: bool = True, +) -> float: + """ + Compute p-value for a t-statistic. + + Parameters + ---------- + t_stat : float + T-statistic. + df : int, optional + Degrees of freedom. If None, uses normal distribution. + two_sided : bool, default True + Whether to compute two-sided p-value. + + Returns + ------- + float + P-value. + """ + if df is not None and df > 0: + p_value = stats.t.sf(np.abs(t_stat), df) + else: + p_value = stats.norm.sf(np.abs(t_stat)) + + if two_sided: + p_value *= 2 + + return float(p_value) + + +def _compute_confidence_interval( + estimate: float, + se: float, + alpha: float = 0.05, + df: Optional[int] = None, +) -> Tuple[float, float]: + """ + Compute confidence interval for an estimate. + + Parameters + ---------- + estimate : float + Point estimate. + se : float + Standard error. + alpha : float, default 0.05 + Significance level (0.05 for 95% CI). + df : int, optional + Degrees of freedom. If None, uses normal distribution. + + Returns + ------- + tuple of (float, float) + (lower_bound, upper_bound) of confidence interval. + """ + if df is not None and df > 0: + critical_value = stats.t.ppf(1 - alpha / 2, df) + else: + critical_value = stats.norm.ppf(1 - alpha / 2) + + lower = estimate - critical_value * se + upper = estimate + critical_value * se + + return (lower, upper) diff --git a/diff_diff/sun_abraham.py b/diff_diff/sun_abraham.py index 914b6ff..018b618 100644 --- a/diff_diff/sun_abraham.py +++ b/diff_diff/sun_abraham.py @@ -16,7 +16,7 @@ import numpy as np import pandas as pd -from diff_diff.linalg import compute_robust_vcov +from diff_diff.linalg import LinearRegression, compute_robust_vcov from diff_diff.results import _get_significance_stars from diff_diff.utils import ( compute_confidence_interval, @@ -750,28 +750,26 @@ def _fit_saturated_regression( X = df_demeaned[X_cols].values y = df_demeaned[f"{outcome}_dm"].values - # Fit OLS - try: - XtX_inv = np.linalg.inv(X.T @ X) - except np.linalg.LinAlgError: - # Use pseudo-inverse for singular matrices - XtX_inv = np.linalg.pinv(X.T @ X) - - coefficients = XtX_inv @ (X.T @ y) - residuals = y - X @ coefficients - - # Compute cluster-robust standard errors + # Fit OLS using LinearRegression helper (more stable than manual X'X inverse) cluster_ids = df_demeaned[cluster_var].values - vcov = compute_robust_vcov(X, residuals, cluster_ids) + reg = LinearRegression( + include_intercept=False, # Already demeaned, no intercept needed + robust=True, + cluster_ids=cluster_ids, + ).fit(X, y) + + coefficients = reg.coefficients_ + vcov = reg.vcov_ - # Extract cohort effects and standard errors + # Extract cohort effects and standard errors using get_inference cohort_effects: Dict[Tuple[Any, int], float] = {} cohort_ses: Dict[Tuple[Any, int], float] = {} n_interactions = len(interaction_cols) for (g, e), coef_idx in coef_index_map.items(): - cohort_effects[(g, e)] = float(coefficients[coef_idx]) - cohort_ses[(g, e)] = float(np.sqrt(vcov[coef_idx, coef_idx])) + inference = reg.get_inference(coef_idx) + cohort_effects[(g, e)] = inference.coefficient + cohort_ses[(g, e)] = inference.se # Extract just the vcov for cohort effects (excluding covariates) vcov_cohort = vcov[:n_interactions, :n_interactions] diff --git a/diff_diff/triple_diff.py b/diff_diff/triple_diff.py index c3a4aa6..1c82fc0 100644 --- a/diff_diff/triple_diff.py +++ b/diff_diff/triple_diff.py @@ -37,7 +37,7 @@ import pandas as pd from scipy import optimize -from diff_diff.linalg import compute_robust_vcov, solve_ols +from diff_diff.linalg import LinearRegression, compute_robust_vcov, solve_ols from diff_diff.results import _get_significance_stars from diff_diff.utils import ( compute_confidence_interval, @@ -739,21 +739,18 @@ def _regression_adjustment( design_matrix = np.column_stack(design_cols) - # Fit OLS using unified backend - coefficients, residuals, fitted, _ = solve_ols( - design_matrix, y, return_fitted=True, return_vcov=False - ) - - # R-squared - ss_res = np.sum(residuals**2) - ss_tot = np.sum((y - np.mean(y)) ** 2) - r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + # Fit OLS using LinearRegression helper + reg = LinearRegression( + include_intercept=False, # Intercept already in design_matrix + robust=self.robust, + alpha=self.alpha, + ).fit(design_matrix, y) # ATT is the coefficient on G*P*T (index 7) - att = coefficients[7] - - # Compute standard error - se = self._compute_se(design_matrix, residuals, 7) + inference = reg.get_inference(7) + att = inference.coefficient + se = inference.se + r_squared = reg.r_squared() return att, se, r_squared, None diff --git a/diff_diff/twfe.py b/diff_diff/twfe.py index 54d9dce..f3167ec 100644 --- a/diff_diff/twfe.py +++ b/diff_diff/twfe.py @@ -12,7 +12,7 @@ from diff_diff.bacon import BaconDecompositionResults from diff_diff.estimators import DifferenceInDifferences -from diff_diff.linalg import compute_robust_vcov +from diff_diff.linalg import LinearRegression from diff_diff.results import DiDResults from diff_diff.utils import ( compute_confidence_interval, @@ -116,32 +116,44 @@ def fit( # type: ignore[override] X = np.column_stack([np.ones(len(y))] + X_list) - # Fit OLS on demeaned data - coefficients, residuals, fitted, r_squared = self._fit_ols(X, y) - # ATT is the coefficient on treatment_post (index 1) att_idx = 1 - att = coefficients[att_idx] # Degrees of freedom adjustment for fixed effects n_units = data[unit].nunique() n_times = data[time].nunique() - df = len(y) - X.shape[1] - n_units - n_times + 2 + df_adjustment = n_units + n_times - 2 - # Compute standard errors and inference + # Always use LinearRegression for initial fit (unified code path) + # For wild bootstrap, we don't need cluster SEs from the initial fit cluster_ids = data[cluster_var].values + reg = LinearRegression( + include_intercept=False, # Intercept already in X + robust=True, # TWFE always uses robust/cluster SEs + cluster_ids=cluster_ids if self.inference != "wild_bootstrap" else None, + alpha=self.alpha, + ).fit(X, y, df_adjustment=df_adjustment) + + coefficients = reg.coefficients_ + residuals = reg.residuals_ + fitted = reg.fitted_values_ + r_squared = reg.r_squared() + att = coefficients[att_idx] + + # Get inference - either from bootstrap or analytical if self.inference == "wild_bootstrap": - # Wild cluster bootstrap for few-cluster inference + # Override with wild cluster bootstrap inference se, p_value, conf_int, t_stat, vcov, _ = self._run_wild_bootstrap_inference( X, y, residuals, cluster_ids, att_idx ) else: - # Standard cluster-robust SE - vcov = compute_robust_vcov(X, residuals, cluster_ids) - se = np.sqrt(vcov[att_idx, att_idx]) - t_stat = att / se - p_value = compute_p_value(t_stat, df=df) - conf_int = compute_confidence_interval(att, se, self.alpha, df=df) + # Use analytical inference from LinearRegression + vcov = reg.vcov_ + inference = reg.get_inference(att_idx) + se = inference.se + t_stat = inference.t_stat + p_value = inference.p_value + conf_int = inference.conf_int # Count observations treated_units = data[data[treatment] == 1][unit].unique() diff --git a/tests/test_linalg.py b/tests/test_linalg.py index f502daa..7fab517 100644 --- a/tests/test_linalg.py +++ b/tests/test_linalg.py @@ -4,7 +4,13 @@ import pandas as pd import pytest -from diff_diff.linalg import compute_r_squared, compute_robust_vcov, solve_ols +from diff_diff.linalg import ( + InferenceResult, + LinearRegression, + compute_r_squared, + compute_robust_vcov, + solve_ols, +) class TestSolveOLS: @@ -420,3 +426,574 @@ def test_many_clusters_completes(self): coef, resid, vcov = solve_ols(X, y, cluster_ids=cluster_ids) assert vcov.shape == (k, k) + + +class TestInferenceResult: + """Tests for the InferenceResult dataclass.""" + + def test_basic_creation(self): + """Test basic InferenceResult creation.""" + result = InferenceResult( + coefficient=2.5, + se=0.5, + t_stat=5.0, + p_value=0.001, + conf_int=(1.52, 3.48), + df=100, + alpha=0.05, + ) + + assert result.coefficient == 2.5 + assert result.se == 0.5 + assert result.t_stat == 5.0 + assert result.p_value == 0.001 + assert result.conf_int == (1.52, 3.48) + assert result.df == 100 + assert result.alpha == 0.05 + + def test_is_significant_default_alpha(self): + """Test is_significant with default alpha.""" + # Significant at 0.05 + result = InferenceResult( + coefficient=2.0, se=0.5, t_stat=4.0, p_value=0.001, + conf_int=(1.0, 3.0), alpha=0.05 + ) + assert result.is_significant() is True + + # Not significant at 0.05 + result2 = InferenceResult( + coefficient=0.5, se=0.5, t_stat=1.0, p_value=0.3, + conf_int=(-0.5, 1.5), alpha=0.05 + ) + assert result2.is_significant() is False + + def test_is_significant_custom_alpha(self): + """Test is_significant with custom alpha override.""" + result = InferenceResult( + coefficient=2.0, se=0.5, t_stat=4.0, p_value=0.02, + conf_int=(1.0, 3.0), alpha=0.05 + ) + + # Significant at 0.05 (default) + assert result.is_significant() is True + + # Not significant at 0.01 + assert result.is_significant(alpha=0.01) is False + + def test_significance_stars(self): + """Test significance_stars returns correct stars.""" + # p < 0.001 -> *** + result = InferenceResult( + coefficient=1.0, se=0.1, t_stat=10.0, p_value=0.0001, + conf_int=(0.8, 1.2) + ) + assert result.significance_stars() == "***" + + # p < 0.01 -> ** + result2 = InferenceResult( + coefficient=1.0, se=0.2, t_stat=5.0, p_value=0.005, + conf_int=(0.6, 1.4) + ) + assert result2.significance_stars() == "**" + + # p < 0.05 -> * + result3 = InferenceResult( + coefficient=1.0, se=0.3, t_stat=3.0, p_value=0.03, + conf_int=(0.4, 1.6) + ) + assert result3.significance_stars() == "*" + + # p < 0.1 -> . + result4 = InferenceResult( + coefficient=1.0, se=0.4, t_stat=2.5, p_value=0.08, + conf_int=(0.2, 1.8) + ) + assert result4.significance_stars() == "." + + # p >= 0.1 -> "" + result5 = InferenceResult( + coefficient=1.0, se=0.5, t_stat=2.0, p_value=0.15, + conf_int=(0.0, 2.0) + ) + assert result5.significance_stars() == "" + + def test_to_dict(self): + """Test to_dict returns all fields.""" + result = InferenceResult( + coefficient=2.5, se=0.5, t_stat=5.0, p_value=0.001, + conf_int=(1.52, 3.48), df=100, alpha=0.05 + ) + d = result.to_dict() + + assert d["coefficient"] == 2.5 + assert d["se"] == 0.5 + assert d["t_stat"] == 5.0 + assert d["p_value"] == 0.001 + assert d["conf_int"] == (1.52, 3.48) + assert d["df"] == 100 + assert d["alpha"] == 0.05 + + +class TestLinearRegression: + """Tests for the LinearRegression helper class.""" + + @pytest.fixture + def simple_data(self): + """Create simple regression data with known coefficients.""" + np.random.seed(42) + n = 200 + # X without intercept (LinearRegression adds it by default) + X = np.random.randn(n, 2) + beta_true = np.array([5.0, 2.0, -1.0]) # intercept, x1, x2 + X_with_intercept = np.column_stack([np.ones(n), X]) + y = X_with_intercept @ beta_true + np.random.randn(n) * 0.5 + return X, y, beta_true + + @pytest.fixture + def clustered_data(self): + """Create clustered regression data.""" + np.random.seed(42) + n_clusters = 20 + obs_per_cluster = 10 + n = n_clusters * obs_per_cluster + + cluster_ids = np.repeat(np.arange(n_clusters), obs_per_cluster) + cluster_effects = np.random.randn(n_clusters) + + X = np.random.randn(n, 1) + beta_true = np.array([3.0, 1.5]) # intercept, x1 + X_with_intercept = np.column_stack([np.ones(n), X]) + errors = cluster_effects[cluster_ids] + np.random.randn(n) * 0.3 + y = X_with_intercept @ beta_true + errors + + return X, y, cluster_ids, beta_true + + def test_basic_fit(self, simple_data): + """Test basic LinearRegression fit.""" + X, y, beta_true = simple_data + reg = LinearRegression().fit(X, y) + + # Check coefficients are close to true values + np.testing.assert_allclose(reg.coefficients_, beta_true, atol=0.3) + + # Check fitted attributes exist + assert reg.coefficients_ is not None + assert reg.vcov_ is not None + assert reg.residuals_ is not None + assert reg.fitted_values_ is not None + assert reg.n_obs_ == X.shape[0] + assert reg.n_params_ == X.shape[1] + 1 # +1 for intercept + assert reg.df_ == reg.n_obs_ - reg.n_params_ + + def test_fit_without_intercept(self, simple_data): + """Test fit without automatic intercept.""" + X, y, _ = simple_data + n = X.shape[0] + + # Add intercept manually + X_full = np.column_stack([np.ones(n), X]) + + reg = LinearRegression(include_intercept=False).fit(X_full, y) + + # Should have same number of params as columns in X_full + assert reg.n_params_ == X_full.shape[1] + + def test_fit_not_called_error(self): + """Test that methods raise error if fit() not called.""" + reg = LinearRegression() + + with pytest.raises(ValueError, match="not been fitted"): + reg.get_coefficient(0) + + with pytest.raises(ValueError, match="not been fitted"): + reg.get_se(0) + + with pytest.raises(ValueError, match="not been fitted"): + reg.get_inference(0) + + def test_get_coefficient(self, simple_data): + """Test get_coefficient returns correct value.""" + X, y, beta_true = simple_data + reg = LinearRegression().fit(X, y) + + for i, expected in enumerate(beta_true): + actual = reg.get_coefficient(i) + np.testing.assert_allclose(actual, expected, atol=0.3) + + def test_get_se(self, simple_data): + """Test get_se returns positive standard errors.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + + for i in range(reg.n_params_): + se = reg.get_se(i) + assert se > 0 + + def test_get_inference(self, simple_data): + """Test get_inference returns InferenceResult with correct values.""" + X, y, beta_true = simple_data + reg = LinearRegression().fit(X, y) + + result = reg.get_inference(1) # First predictor (index 1 after intercept) + + # Check it's an InferenceResult + assert isinstance(result, InferenceResult) + + # Check coefficient is close to true value + np.testing.assert_allclose(result.coefficient, beta_true[1], atol=0.3) + + # Check SE is positive + assert result.se > 0 + + # Check t-stat computation + np.testing.assert_allclose(result.t_stat, result.coefficient / result.se) + + # Check p-value is in valid range + assert 0 <= result.p_value <= 1 + + # Check confidence interval contains point estimate + assert result.conf_int[0] < result.coefficient < result.conf_int[1] + + # Check df is set + assert result.df == reg.df_ + + def test_get_inference_significant_coefficient(self, simple_data): + """Test inference for a truly significant coefficient.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + + # First predictor should be significant (true coef = 2.0) + result = reg.get_inference(1) + + # With true effect of 2.0 and n=200, should be highly significant + assert result.p_value < 0.001 + assert result.is_significant() + assert result.significance_stars() == "***" + + def test_get_inference_batch(self, simple_data): + """Test get_inference_batch returns dict of results.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + + results = reg.get_inference_batch([0, 1, 2]) + + assert isinstance(results, dict) + assert len(results) == 3 + assert all(isinstance(v, InferenceResult) for v in results.values()) + assert all(idx in results for idx in [0, 1, 2]) + + def test_get_all_inference(self, simple_data): + """Test get_all_inference returns results for all coefficients.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + + results = reg.get_all_inference() + + assert isinstance(results, list) + assert len(results) == reg.n_params_ + assert all(isinstance(r, InferenceResult) for r in results) + + def test_custom_alpha(self, simple_data): + """Test that custom alpha affects confidence intervals.""" + X, y, _ = simple_data + reg = LinearRegression(alpha=0.10).fit(X, y) + + result = reg.get_inference(1) + assert result.alpha == 0.10 + + # 90% CI should be narrower than 95% CI + result_99 = reg.get_inference(1, alpha=0.01) + ci_width_90 = result.conf_int[1] - result.conf_int[0] + ci_width_99 = result_99.conf_int[1] - result_99.conf_int[0] + assert ci_width_90 < ci_width_99 + + def test_r_squared(self, simple_data): + """Test R-squared computation.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + + r2 = reg.r_squared() + r2_adj = reg.r_squared(adjusted=True) + + # Should be high for well-specified model + assert 0.8 < r2 <= 1.0 + + # Adjusted should be smaller + assert r2_adj < r2 + + def test_predict(self, simple_data): + """Test prediction on new data.""" + X, y, beta_true = simple_data + reg = LinearRegression().fit(X, y) + + # Predict on same data + y_pred = reg.predict(X) + + # Should match fitted values + np.testing.assert_allclose(y_pred, reg.fitted_values_, rtol=1e-10) + + # Predict on new data + X_new = np.random.randn(10, 2) + y_pred_new = reg.predict(X_new) + assert y_pred_new.shape == (10,) + + def test_robust_standard_errors(self, simple_data): + """Test that robust=True computes HC1 standard errors.""" + X, y, _ = simple_data + reg_robust = LinearRegression(robust=True).fit(X, y) + reg_classical = LinearRegression(robust=False).fit(X, y) + + # SEs should differ + se_robust = reg_robust.get_se(1) + se_classical = reg_classical.get_se(1) + + assert se_robust != se_classical + + def test_cluster_standard_errors(self, clustered_data): + """Test cluster-robust standard errors.""" + X, y, cluster_ids, _ = clustered_data + + reg_hc1 = LinearRegression(robust=True).fit(X, y) + reg_cluster = LinearRegression(cluster_ids=cluster_ids).fit(X, y) + + # Cluster SE should typically be larger with correlated errors + se_hc1 = reg_hc1.get_se(1) + se_cluster = reg_cluster.get_se(1) + + # They should differ (cluster SE usually larger with cluster correlation) + assert se_hc1 != se_cluster + + def test_cluster_ids_in_fit(self, clustered_data): + """Test passing cluster_ids to fit() method.""" + X, y, cluster_ids, _ = clustered_data + + # Pass cluster_ids in constructor + reg1 = LinearRegression(cluster_ids=cluster_ids).fit(X, y) + + # Pass cluster_ids in fit() + reg2 = LinearRegression().fit(X, y, cluster_ids=cluster_ids) + + # Should give same results + np.testing.assert_allclose(reg1.get_se(1), reg2.get_se(1), rtol=1e-10) + + def test_df_adjustment(self, simple_data): + """Test degrees of freedom adjustment parameter.""" + X, y, _ = simple_data + reg = LinearRegression().fit(X, y) + reg_adj = LinearRegression().fit(X, y, df_adjustment=10) + + # Adjusted df should be 10 less + assert reg_adj.df_ == reg.df_ - 10 + + # This affects inference + result = reg.get_inference(1) + result_adj = reg_adj.get_inference(1) + + # Same coefficient and SE + assert result.coefficient == result_adj.coefficient + assert result.se == result_adj.se + + # Different df affects p-value and CI (though often slightly) + assert result.df != result_adj.df + + def test_returns_self(self, simple_data): + """Test that fit() returns self for chaining.""" + X, y, _ = simple_data + reg = LinearRegression() + result = reg.fit(X, y) + + assert result is reg + + def test_matches_solve_ols(self, simple_data): + """Test that LinearRegression matches low-level solve_ols.""" + X, y, _ = simple_data + n = X.shape[0] + X_with_intercept = np.column_stack([np.ones(n), X]) + + # Use low-level function + coef, resid, fitted, vcov = solve_ols( + X_with_intercept, y, return_fitted=True, return_vcov=True + ) + + # Use LinearRegression + reg = LinearRegression(robust=True).fit(X, y) + + # Should match + np.testing.assert_allclose(reg.coefficients_, coef, rtol=1e-10) + np.testing.assert_allclose(reg.residuals_, resid, rtol=1e-10) + np.testing.assert_allclose(reg.fitted_values_, fitted, rtol=1e-10) + np.testing.assert_allclose(reg.vcov_, vcov, rtol=1e-10) + + +class TestNumericalStability: + """Tests for numerical stability with ill-conditioned matrices.""" + + def test_near_singular_matrix_stability(self): + """Test that near-singular matrices are handled correctly.""" + np.random.seed(42) + n = 100 + + # Create near-collinear design (high condition number) + X = np.random.randn(n, 3) + X[:, 2] = X[:, 0] + X[:, 1] + np.random.randn(n) * 1e-8 # Near-perfect collinearity + + y = X[:, 0] + np.random.randn(n) * 0.1 + + reg = LinearRegression(include_intercept=True).fit(X, y) + + # Should still produce finite coefficients + assert np.all(np.isfinite(reg.coefficients_)) + + # Compare with numpy's lstsq (gold standard for stability) + X_full = np.column_stack([np.ones(n), X]) + expected, _, _, _ = np.linalg.lstsq(X_full, y, rcond=None) + + # Should be close (within reasonable tolerance for ill-conditioned problem) + np.testing.assert_allclose(reg.coefficients_, expected, rtol=1e-6) + + def test_high_condition_number_matrix(self): + """Test that high condition number matrices don't lose precision.""" + np.random.seed(42) + n = 100 + k = 5 + + # Create matrix with controlled condition number + X = np.random.randn(n, k) + # Make last column nearly dependent on first + X[:, -1] = X[:, 0] * 0.9999 + np.random.randn(n) * 1e-6 + + y = X[:, 0] + 2 * X[:, 1] + np.random.randn(n) * 0.1 + + # Should complete without error + reg = LinearRegression().fit(X, y) + assert np.all(np.isfinite(reg.coefficients_)) + assert np.all(np.isfinite(reg.vcov_)) + + def test_zero_se_warning(self): + """Test that zero SE triggers a warning.""" + np.random.seed(42) + n = 50 + + # Create perfect fit scenario + X = np.random.randn(n, 2) + y = 1 + 2 * X[:, 0] + 3 * X[:, 1] # No noise + + reg = LinearRegression().fit(X, y) + + # Residuals should be near-zero (perfect fit) + assert np.allclose(reg.residuals_, 0, atol=1e-10) + + # SE should be very small, which may trigger the warning + # The important thing is it doesn't crash + for i in range(reg.n_params_): + inf = reg.get_inference(i) + assert np.isfinite(inf.coefficient) + + +class TestEstimatorIntegration: + """Integration tests verifying estimators produce correct results.""" + + def test_did_estimator_produces_valid_results(self): + """Verify DifferenceInDifferences produces valid inference.""" + from diff_diff import DifferenceInDifferences + + # Create reproducible test data + np.random.seed(42) + n = 200 + data = pd.DataFrame({ + "unit": np.repeat(range(20), 10), + "time": np.tile(range(10), 20), + "treated": np.repeat([0] * 10 + [1] * 10, 10), + "post": np.tile([0] * 5 + [1] * 5, 20), + }) + # True ATT = 2.0 + data["outcome"] = ( + np.random.randn(n) + + 2.0 * data["treated"] * data["post"] + ) + + # Fit estimator + did = DifferenceInDifferences(robust=True) + result = did.fit(data, outcome="outcome", treatment="treated", time="post") + + # Coefficient should be close to true effect (within sampling variation) + assert abs(result.att - 2.0) < 1.0 + + # SE, p-value, CI should all be valid + assert result.se > 0 + assert 0 <= result.p_value <= 1 + assert result.conf_int[0] < result.att < result.conf_int[1] + + def test_twfe_estimator_produces_valid_results(self): + """Verify TwoWayFixedEffects produces valid inference.""" + from diff_diff import TwoWayFixedEffects + + np.random.seed(42) + n_units = 30 + n_times = 6 + n = n_units * n_times + + data = pd.DataFrame({ + "unit": np.repeat(np.arange(n_units), n_times), + "time": np.tile(np.arange(n_times), n_units), + "treated": np.repeat(np.random.binomial(1, 0.5, n_units), n_times), + }) + data["post"] = (data["time"] >= 3).astype(int) + + # Add unit and time effects with true ATT = 1.5 + unit_effects = np.random.randn(n_units) + time_effects = np.random.randn(n_times) + data["y"] = ( + unit_effects[data["unit"]] + + time_effects[data["time"]] + + data["treated"] * data["post"] * 1.5 + + np.random.randn(n) * 0.5 + ) + + twfe = TwoWayFixedEffects() + result = twfe.fit( + data, outcome="y", treatment="treated", time="post", unit="unit" + ) + + # Should produce valid results + assert result.se > 0 + assert 0 <= result.p_value <= 1 + assert np.isfinite(result.att) + + def test_sun_abraham_estimator_produces_valid_results(self): + """Verify SunAbraham produces valid inference.""" + from diff_diff import SunAbraham + + np.random.seed(42) + n_units = 60 + n_times = 10 + n = n_units * n_times + + data = pd.DataFrame({ + "unit": np.repeat(np.arange(n_units), n_times), + "time": np.tile(np.arange(n_times), n_units), + }) + + # Staggered treatment timing + first_treat_map = {} + for i in range(n_units): + if i < 20: + first_treat_map[i] = np.inf # Never treated + elif i < 40: + first_treat_map[i] = 5 + else: + first_treat_map[i] = 7 + + data["first_treat"] = data["unit"].map(first_treat_map) + data["treated"] = (data["time"] >= data["first_treat"]).astype(int) + data["y"] = np.random.randn(n) + data["treated"] * 2.0 + + sa = SunAbraham(n_bootstrap=0) + result = sa.fit( + data, outcome="y", unit="unit", time="time", first_treat="first_treat" + ) + + # Should produce valid results + assert result.overall_se > 0 + assert np.isfinite(result.overall_att) + assert len(result.event_study_effects) > 0