Skip to content

Commit 3fae964

Browse files
authored
Start using the validate_schemas hook from core (#95)
This just clean up `compute_input_schema` a bit.
1 parent 29f93f0 commit 3fae964

File tree

2 files changed

+30
-23
lines changed

2 files changed

+30
-23
lines changed

merlin/systems/dag/ops/faiss.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,6 @@ def compute_input_schema(
202202
input_schema = super().compute_input_schema(
203203
root_schema, parents_schema, deps_schema, selector
204204
)
205-
if len(input_schema.column_schemas) > 1:
206-
raise ValueError(
207-
"More than one input has been detected for this node,"
208-
/ f"inputs received: {input_schema.column_names}"
209-
)
210205
return input_schema
211206

212207
def compute_output_schema(
@@ -237,6 +232,15 @@ def compute_output_schema(
237232
]
238233
)
239234

235+
def validate_schemas(
236+
self, parents_schema, deps_schema, input_schema, output_schema, strict_dtypes=False
237+
):
238+
if len(input_schema.column_schemas) > 1:
239+
raise ValueError(
240+
"More than one input has been detected for this node,"
241+
/ f"inputs received: {input_schema.column_names}"
242+
)
243+
240244

241245
def setup_faiss(item_vector, output_path: str):
242246
"""

merlin/systems/dag/ops/session_filter.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,24 +110,6 @@ def compute_input_schema(
110110
root_schema, parents_schema, deps_schema, selector
111111
)
112112

113-
if len(parents_schema.column_schemas) > 1:
114-
raise ValueError(
115-
"More than one input has been detected for this node,"
116-
/ f"inputs received: {input_schema.column_names}"
117-
)
118-
if len(deps_schema.column_schemas) > 1:
119-
raise ValueError(
120-
"More than one dependency input has been detected"
121-
/ f"for this node, inputs received: {input_schema.column_names}"
122-
)
123-
124-
# 1 for deps and 1 for parents
125-
if len(input_schema.column_schemas) > 2:
126-
raise ValueError(
127-
"More than one input has been detected for this node,"
128-
/ f"inputs received: {input_schema.column_names}"
129-
)
130-
131113
self._input_col = parents_schema.column_names[0]
132114
self._filter_out_col = deps_schema.column_names[0]
133115

@@ -157,6 +139,27 @@ def compute_output_schema(
157139
"""
158140
return Schema([ColumnSchema("filtered_ids", dtype=np.int32, is_list=False)])
159141

142+
def validate_schemas(
143+
self, parents_schema, deps_schema, input_schema, output_schema, strict_dtypes=False
144+
):
145+
if len(parents_schema.column_schemas) > 1:
146+
raise ValueError(
147+
"More than one input has been detected for this node,"
148+
/ f"inputs received: {input_schema.column_names}"
149+
)
150+
if len(deps_schema.column_schemas) > 1:
151+
raise ValueError(
152+
"More than one dependency input has been detected"
153+
/ f"for this node, inputs received: {input_schema.column_names}"
154+
)
155+
156+
# 1 for deps and 1 for parents
157+
if len(input_schema.column_schemas) > 2:
158+
raise ValueError(
159+
"More than one input has been detected for this node,"
160+
/ f"inputs received: {input_schema.column_names}"
161+
)
162+
160163
def transform(self, df: InferenceDataFrame):
161164
"""
162165
Transform input dataframe to output dataframe using function logic.

0 commit comments

Comments
 (0)