Skip to content

Commit ee6ca7a

Browse files
committed
Cleaned up sklearn pipeline in run_pca with some modifications.
1 parent ad9d4e5 commit ee6ca7a

5 files changed

Lines changed: 124 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
- Add group statement stats to MultiIndex DataFrame.
4040
- Add `reddwarf.data_presenter.print_repress()` for printing representative statements.
4141
- Add support for `Loader()` importing data from alternative Polis instances via `polis_instance_url` arg.
42+
- Patch sklearn with a simple `PatchedPipeline`, to allow pipeline steps to access other steps.
43+
- Modify `SparsityAwareScaler` to be able to use captured output from SparsityAware Capture.
4244

4345
### Chores
4446
- Moved agora implementation from `reddwarf.agora` to `reddwarf.implementations.agora` (deprecation warning).

reddwarf/sklearn/pipeline.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from sklearn.pipeline import Pipeline
2+
3+
4+
class PatchedPipeline(Pipeline):
5+
"""
6+
A subclass of sklearn's Pipeline that injects a `_parent_pipeline` attribute into each step.
7+
8+
This allows individual transformers in the pipeline to access their parent pipeline and,
9+
by extension, other steps within it. Useful for custom transformers that depend on
10+
intermediate results from earlier steps (e.g., SparsityAwareScaler using SparsityAwareCapturer output).
11+
12+
Example:
13+
```
14+
pipeline = PatchedPipeline([
15+
("capture", SparsityAwareCapturer()),
16+
("scale", SparsityAwareScaler(capture_step="capture")),
17+
])
18+
19+
# Inside SparsityAwareScaler.transform():
20+
# capture_step = self._parent_pipeline.named_steps["capture"]
21+
# X_sparse = capture_step.X_transformed_
22+
```
23+
24+
Note:
25+
- Steps must support attribute assignment (`__dict__`) to receive the reference.
26+
- `_parent_pipeline` is injected once during initialization.
27+
"""
28+
def __init__(self, steps, **kwargs):
29+
super().__init__(steps, **kwargs)
30+
self._patch_steps()
31+
32+
def _patch_steps(self):
33+
for _, step in self.steps:
34+
if hasattr(step, '__dict__'):
35+
step._parent_pipeline = self

reddwarf/sklearn/transformers.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
Array2D = NDArray[np.float64]
88

99
def calculate_scaling_factors(X_sparse: Array1D | Array2D) -> Array1D:
10+
"""
11+
Calculate row-based scaling factors from the sparse vote matrix.
12+
13+
(Outside estimator so available for re-use.)
14+
"""
1015
# This allows function to work for 2D (full vote_matrix) and 1D (participant_votes).
1116
# It essentially nest an 1D matrix in a 2D one.
1217
X_sparse = np.atleast_2d(X_sparse)
@@ -29,14 +34,16 @@ def calculate_scaling_factors(X_sparse: Array1D | Array2D) -> Array1D:
2934

3035
class SparsityAwareScaler(BaseEstimator, TransformerMixin):
3136
"""
32-
Scale projected points (participant/statements) based on sparsity of vote
37+
Scale projected points (participant or statements) based on sparsity of vote
3338
matrix, to account for any small number of votes by a participant and
3439
prevent those participants from bunching up in the center.
3540
3641
Attributes:
42+
capture_step (str | int | None): Name or index of the capture step in the pipeline.
3743
X_sparse (np.ndarray | None): A sparse array with shape (n_features,)
3844
"""
39-
def __init__(self, X_sparse: Optional[Array1D | Array2D] = None):
45+
def __init__(self, capture_step: Optional[str | int] = None, X_sparse: Optional[Array1D | Array2D] = None):
46+
self.capture_step = capture_step
4047
self.X_sparse = X_sparse
4148

4249
# See: https://scikit-learn.org/stable/modules/generated/sklearn.utils.Tags.html#sklearn.utils.Tags
@@ -57,10 +64,53 @@ def inverse_transform(self, X):
5764
scaling_factors = self._calculate_scaling_factors()
5865
return X / scaling_factors[:, np.newaxis]
5966

60-
def _calculate_scaling_factors(self):
61-
if self.X_sparse is None:
67+
68+
def _get_pipeline_step(self, step):
69+
"""
70+
Fetch the parent pipeline when available via PatchedPipeline usage.
71+
"""
72+
parent = getattr(self, "_parent_pipeline", None)
73+
if parent is None:
74+
raise RuntimeError(
75+
f"{self.__class__.__name__} cannot resolve `capture_step={step}` "
76+
"because it is not being used inside a `PatchedPipeline`. "
77+
"Either use a `PatchedPipeline` or pass `X_sparse` directly."
78+
)
79+
if isinstance(step, str):
80+
return parent.named_steps[step]
81+
elif isinstance(step, int):
82+
return parent.steps[step][1]
83+
else:
84+
raise ValueError("`capture_step` must be a string (name) or int (index).")
85+
86+
def _resolve_X_sparse(self):
87+
"""
88+
Resolve X_sparse (a sparse vote matrix) from argument or prior capture step.
89+
"""
90+
if self.X_sparse is not None:
91+
return self.X_sparse
92+
93+
capture = self._get_pipeline_step(self.capture_step)
94+
if not hasattr(capture, "X_captured_"):
6295
raise AttributeError(
63-
"Missing `X_sparse`. Pass `X_sparse` when initializing SparsityAwareScaler."
96+
f"Step '{self.capture_step}' does not contain `.X_captured_`. "
97+
f"Did you run `fit/transform` on the pipeline?"
6498
)
99+
return capture.X_captured_
65100

66-
return calculate_scaling_factors(X_sparse=self.X_sparse)
101+
def _calculate_scaling_factors(self):
102+
X_sparse = self._resolve_X_sparse()
103+
return calculate_scaling_factors(X_sparse=X_sparse)
104+
105+
class SparsityAwareCapturer(BaseEstimator, TransformerMixin):
106+
"""
107+
A passthrough transformer that captures and stores the X it receives in
108+
`self.X_captured_`. Useful in pipelines where a later step needs access to
109+
this intermediate result.
110+
"""
111+
def fit(self, X, y=None):
112+
return self
113+
114+
def transform(self, X):
115+
self.X_captured_ = X # Store the actual input value
116+
return X

reddwarf/utils/matrix.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,21 @@ def filter_matrix(
232232
elif unvoted_filter_type == 'zero':
233233
vote_matrix[unvoted_statement_ids] = 0
234234

235-
return vote_matrix
235+
return vote_matrix
236+
237+
def generate_virtual_vote_matrix(n_statements: int):
238+
"""
239+
Creates a matrix of virtual participants, each of whom vote agree on a
240+
single statement, with no other votes. (This is a variation of an "identity
241+
matrix", with votes going across the diagonal of a full NaN matrix.)
242+
"""
243+
# Build an basic identity matrix
244+
virtual_vote_matrix = np.eye(n_statements)
245+
246+
# Replace 1s with +1 and 0s with NaN
247+
# TODO: Why does Polis use -1 (disagree) here? is it the same? BUG?
248+
AGREE_VAL = 1
249+
MISSING_VAL = np.nan
250+
virtual_vote_matrix = np.where(virtual_vote_matrix == 1, AGREE_VAL, MISSING_VAL)
251+
252+
return virtual_vote_matrix

reddwarf/utils/pca.py

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from numpy.typing import ArrayLike
22
import pandas as pd
33
import numpy as np
4-
from reddwarf.utils.matrix import VoteMatrix
5-
from reddwarf.sklearn.transformers import SparsityAwareScaler, calculate_scaling_factors
4+
from reddwarf.utils.matrix import VoteMatrix, generate_virtual_vote_matrix
5+
from reddwarf.sklearn.transformers import SparsityAwareCapturer, SparsityAwareScaler, calculate_scaling_factors
6+
from reddwarf.sklearn.pipeline import PatchedPipeline
67
from typing import Tuple
78

89
from sklearn.decomposition import PCA
910
from sklearn.impute import SimpleImputer
10-
from sklearn.pipeline import Pipeline
11+
1112

1213
def run_pca(
1314
vote_matrix: VoteMatrix,
@@ -30,39 +31,22 @@ def run_pca(
3031
- explained_variance_ (List[float]): Explained variance of each principal component.
3132
- mean_ (list[float]): Means/centers of each column/statements/features.
3233
"""
33-
pipeline = Pipeline([
34+
pipeline = PatchedPipeline([
35+
("capture", SparsityAwareCapturer()),
3436
("impute", SimpleImputer(missing_values=np.nan, strategy="mean")),
3537
("pca", PCA(n_components=n_components)),
36-
("scale", SparsityAwareScaler()),
38+
("scale", SparsityAwareScaler(capture_step="capture")),
3739
])
3840

3941
pipeline.fit(vote_matrix.values)
40-
pca = pipeline.named_steps["pca"]
41-
42-
def generate_projections(sparse_vote_matrix, fitted_pipeline):
43-
fitted_pipeline.named_steps["scale"].X_sparse = sparse_vote_matrix
44-
X_projected = fitted_pipeline.transform(sparse_vote_matrix)
45-
46-
return X_projected
47-
48-
# Create a matrix of virtual participants that each vote once on a single statement.
49-
def generate_virtual_vote_matrix(n_statements: int):
50-
# Build an basic identity matrix
51-
virtual_vote_matrix = np.eye(n_statements)
52-
53-
# Replace 1s with +1 and 0s with NaN
54-
# TODO: Why does Polis use -1 (disagree) here? is it the same? BUG?
55-
AGREE_VAL = 1
56-
MISSING_VAL = np.nan
57-
virtual_vote_matrix = np.where(virtual_vote_matrix == 1, AGREE_VAL, MISSING_VAL)
58-
59-
return virtual_vote_matrix
6042

61-
X_participants = generate_projections(sparse_vote_matrix=vote_matrix.values, fitted_pipeline=pipeline)
43+
# Generate projections of participants.
44+
X_participants = pipeline.transform(vote_matrix.values)
6245

46+
# Generate projections of statements via virtual vote matrix.
6347
n_statements = len(vote_matrix.columns)
6448
virtual_vote_matrix = generate_virtual_vote_matrix(n_statements)
65-
X_statements = generate_projections(sparse_vote_matrix=virtual_vote_matrix, fitted_pipeline=pipeline)
49+
X_statements = pipeline.transform(virtual_vote_matrix)
6650

6751
DEFAULT_DIMENSION_LABELS = ["x", "y", "z"]
6852
dimension_labels = DEFAULT_DIMENSION_LABELS[:n_components]
@@ -79,6 +63,8 @@ def generate_virtual_vote_matrix(n_statements: int):
7963
columns=np.asarray(dimension_labels),
8064
)
8165

66+
pca = pipeline.named_steps["pca"]
67+
8268
return projected_participants, projected_statements, pca
8369

8470
# TODO: Clean up variables and docs.

0 commit comments

Comments
 (0)