Skip to content

Commit b1f9f2e

Browse files
authored
[#34564] Fix Prism on handling empty and identity transforms (#34566)
* Fix an edge case of handling empty composite transform from Python SDK. * A more general fix to cover empty transform with identical input and output pcoll * Simplify the code with continue. * Add unit tests.
1 parent b1233b0 commit b1f9f2e

File tree

4 files changed

+132
-0
lines changed

4 files changed

+132
-0
lines changed

Diff for: sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go

+17
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ func (e *joinError) Error() string {
7272
return string(b)
7373
}
7474

75+
func getOnlyValue[K comparable, V any](in map[K]V) V {
76+
for _, v := range in {
77+
return v
78+
}
79+
panic("unreachable")
80+
}
81+
7582
func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *jobpb.PrepareJobResponse, err error) {
7683
s.mu.Lock()
7784
defer s.mu.Unlock()
@@ -248,6 +255,16 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
248255
if len(t.GetSpec().GetPayload()) == 0 {
249256
continue
250257
}
258+
// Another type of "empty" composite transforms without subtransforms but with
259+
// a non-empty payload and identical input/output pcollections
260+
if len(t.GetInputs()) == 1 && len(t.GetOutputs()) == 1 {
261+
inputID := getOnlyValue(t.GetInputs())
262+
outputID := getOnlyValue(t.GetOutputs())
263+
if inputID == outputID {
264+
slog.Warn("empty transform, with payload and identical input and output pcollection", "urn", urn, "name", t.GetUniqueName(), "pcoll", inputID)
265+
continue
266+
}
267+
}
251268
// Otherwise fail.
252269
slog.Warn("unknown transform, with payload", "urn", urn, "name", t.GetUniqueName(), "payload", t.GetSpec().GetPayload())
253270
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")

Diff for: sdks/go/pkg/beam/runners/prism/internal/preprocess.go

+9
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components, j *jobservices.
106106

107107
// If there's an unknown urn, and it's not composite, simply add it to the leaves.
108108
if len(t.GetSubtransforms()) == 0 {
109+
// However, if it is an empty transform with identical input/output pcollections,
110+
// it will be discarded.
111+
if len(t.GetInputs()) == 1 && len(t.GetOutputs()) == 1 {
112+
inputID := getOnlyValue(t.GetInputs())
113+
outputID := getOnlyValue(t.GetOutputs())
114+
if inputID == outputID {
115+
continue
116+
}
117+
}
109118
leaves[tid] = struct{}{}
110119
}
111120
continue

Diff for: sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,98 @@ func Test_preprocessor_preProcessGraph(t *testing.T) {
125125
"env1": {Urn: "env1"},
126126
},
127127
},
128+
}, {
129+
name: "ignoreEmptyAndIdentityTransform",
130+
input: &pipepb.Components{
131+
Transforms: map[string]*pipepb.PTransform{
132+
// e1 is an empty transform with identical input/output pcollections
133+
"e1": {
134+
UniqueName: "e1",
135+
Spec: &pipepb.FunctionSpec{
136+
Urn: "defaultUrn",
137+
},
138+
Inputs: map[string]string{
139+
"in": "coll1",
140+
},
141+
Outputs: map[string]string{
142+
"out": "coll1",
143+
},
144+
},
145+
"e2": {
146+
UniqueName: "e2",
147+
Spec: &pipepb.FunctionSpec{
148+
Urn: "defaultUrn",
149+
},
150+
Inputs: map[string]string{
151+
"in": "coll1",
152+
},
153+
Outputs: map[string]string{
154+
"out": "coll2",
155+
},
156+
},
157+
"e3": {
158+
UniqueName: "e3",
159+
Spec: &pipepb.FunctionSpec{
160+
Urn: "defaultUrn",
161+
},
162+
Inputs: map[string]string{
163+
"in1": "coll1",
164+
"in2": "coll2",
165+
},
166+
Outputs: map[string]string{
167+
"out": "coll3",
168+
},
169+
},
170+
},
171+
},
172+
wantStages: []*stage{
173+
{
174+
transforms: []string{"e2", "e3"}, // e1 is ignored
175+
primaryInput: "coll1",
176+
internalCols: []string{"coll2", "coll3"},
177+
},
178+
},
179+
wantComponents: &pipepb.Components{
180+
Transforms: map[string]*pipepb.PTransform{
181+
"e1": {
182+
UniqueName: "e1",
183+
Spec: &pipepb.FunctionSpec{
184+
Urn: "defaultUrn",
185+
},
186+
Inputs: map[string]string{
187+
"in": "coll1",
188+
},
189+
Outputs: map[string]string{
190+
"out": "coll1",
191+
},
192+
},
193+
"e2": {
194+
UniqueName: "e2",
195+
Spec: &pipepb.FunctionSpec{
196+
Urn: "defaultUrn",
197+
},
198+
Inputs: map[string]string{
199+
"in": "coll1",
200+
},
201+
Outputs: map[string]string{
202+
"out": "coll2",
203+
},
204+
},
205+
"e3": {
206+
UniqueName: "e3",
207+
Spec: &pipepb.FunctionSpec{
208+
Urn: "defaultUrn",
209+
},
210+
Inputs: map[string]string{
211+
"in1": "coll1",
212+
"in2": "coll2",
213+
},
214+
Outputs: map[string]string{
215+
"out": "coll3",
216+
},
217+
},
218+
},
219+
},
128220
},
129221
}
130222
for _, test := range tests {

Diff for: sdks/python/apache_beam/runners/portability/prism_runner_test.py

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from apache_beam.options.pipeline_options import PortableOptions
3535
from apache_beam.runners.portability import portable_runner_test
3636
from apache_beam.testing.util import assert_that
37+
from apache_beam.testing.util import equal_to
3738

3839
# Run as
3940
#
@@ -178,6 +179,19 @@ def test_read(self):
178179
lines = p | beam.io.ReadFromText('/etc/profile')
179180
assert_that(lines, lambda lines: len(lines) > 0)
180181

182+
def test_create_transform(self):
183+
with self.create_pipeline() as p:
184+
out = (p | beam.Create([1]))
185+
assert_that(out, equal_to([1]))
186+
187+
with self.create_pipeline() as p:
188+
out = (p | beam.Create([1, 2], reshuffle=False))
189+
assert_that(out, equal_to([1, 2]))
190+
191+
with self.create_pipeline() as p:
192+
out = (p | beam.Create([1, 2], reshuffle=True))
193+
assert_that(out, equal_to([1, 2]))
194+
181195
def test_external_transform(self):
182196
raise unittest.SkipTest("Requires an expansion service to execute.")
183197

0 commit comments

Comments
 (0)