Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fn Runner Watermark issue #34484

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,77 @@ def process_element(self, element):
}})


class DirectRunnerWatermarkTests(unittest.TestCase):
# Since beam 2.39 this test was failing due to
# `AssertionError: A total of 2 watermark-pending bundles did not execute.`
# Reported in https://github.com/apache/beam/issues/26190
# Andrzej note: issue due to Flatten not executing

def test_flatten_two(self):
label = "test_flatten_two"
global double_check_test_flatten_two
double_check_test_flatten_two = False

with test_pipeline.TestPipeline() as pipeline:
pc_first = (pipeline | f"{label}/Create" >> beam.Create(["input"]))

pc_a = (pc_first | f"{label}/MapA" >> beam.Map(lambda x: ("a", 1)))
pv_a = beam.pvalue.AsDict(pc_a)

pb_b = (
pc_first
| f"{label}/MapB" >> beam.Map(lambda x, y: ("b", 2), y=pv_a)
#| f"{label}/Reshuffle" >> beam.Reshuffle()
# # beam 2.38 works without Reshuffle here
)

pc_c = ((pc_a, pb_b) | f"{label}/Flatten" >> beam.Flatten())
pv_c = beam.pvalue.AsDict(pc_c)

def my_function(x, y):
global double_check_test_flatten_two
double_check_test_flatten_two = True
return (x, y["a"] + y["b"])

pc_d = (pc_first | f"{label}/MapD" >> beam.Map(my_function, y=pv_c))

assert_that(pc_d, equal_to([("input", 3)]))

self.assertTrue(double_check_test_flatten_two)

def test_flatten_three(self):

label = "test_flatten_three"
global double_check_test_flatten_three
double_check_test_flatten_three = False

with test_pipeline.TestPipeline() as pipeline:
pc_first = (pipeline | f"{label}/Create" >> beam.Create(["input"]))

pc_a = (pc_first | f"{label}/MapA" >> beam.Map(lambda x: ("a", 1)))
pv_a = beam.pvalue.AsDict(pc_a)

pc_b = (
pc_first
| f"{label}/MapB" >> beam.Map(lambda x, y: ("b", 2), y=pv_a))
pv_b = beam.pvalue.AsDict(pc_b)

pc_c = (pc_a | f"{label}/MapC" >> beam.Map(lambda x, y: ("c", 5), y=pv_b))

pc_d = ((pc_a, pc_b, pc_c) | f"{label}/Flatten" >> beam.Flatten())
pv_d = beam.pvalue.AsDict(pc_d)

def my_function(x, y):
global double_check_test_flatten_three
double_check_test_flatten_three = True
return (x, y["a"] + y["b"] + y["c"])

pc_e = (pc_first | f"{label}/MapE" >> beam.Map(my_function, y=pv_d))

assert_that(pc_e, equal_to([("input", 8)]))

self.assertTrue(double_check_test_flatten_three)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ def create_stages(
translations.lift_combiners,
translations.expand_sdf,
translations.expand_gbk,
translations.sink_flattens,
translations.fix_flatten_coders,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this added?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix_flatten_coders was added to solve yaml unit tests.
It also mimics the translations.standard_optimize_phases() used by Portable Runner

Without it, theYamlMappingTest::test_basic yields

apache_beam.testing.util.BeamAssertException: Failed assert: [
Row(label='11a', isogeny='a'), Row(label='37a', isogeny='a'), Row(label='389a', isogeny='a')] == [BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='11a', isogeny='a'), BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='37a', isogeny='a'), BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='389a', isogeny='a')], 
unexpected elements [BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='11a', isogeny='a'), BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='37a', isogeny='a'), BeamSchema_ccf257cb_1966_410e_8157_00cd826e7392(label='389a', isogeny='a')], 
missing elements [Row(label='11a', isogeny='a'), Row(label='37a', isogeny='a'), Row(label='389a', isogeny='a')
]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the PreCommit YAML is still failing, can you please take a look>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this maybe doesn't work because fix_flatten_coders assumes that the flattens will eventually be dealt with by sink_flattens. That may be what is causing the failures?

# translations.sink_flattens,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, this PR isn't fixing the underlying issue, rather it is disabling the optimization?

Can we add a comment referencing why this is disabled, with reference to the bug?

Also I am not sure if the tradeoff between disabling this optimization is worth fixing this specific edge-case, maybe @damccorm has thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to try to fix the underlying issue. This has the potential to introduce new encoding/decoding problems which it may be possible to generally avoid

translations.greedily_fuse,
translations.read_to_impulse,
translations.impulse_to_input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ def test_group_by_key_with_empty_pcoll_elements(self):
# the sampling counter.
class FnApiRunnerMetricsTest(unittest.TestCase):
def assert_has_counter(
self, mon_infos, urn, labels, value=None, ge_value=None):
self, mon_infos, urn, labels, value=None, ge_value=None, total_stages=1):
# TODO(ajamato): Consider adding a matcher framework
found = 0
matches = []
Expand All @@ -1464,7 +1464,7 @@ def assert_has_counter(
ge_value_str = {'ge_value': ge_value} if ge_value else ''
value_str = {'value': value} if value else ''
self.assertEqual(
1,
total_stages,
found,
"Found (%s, %s) Expected only 1 monitoring_info for %s." % (
found,
Expand All @@ -1473,7 +1473,15 @@ def assert_has_counter(
))

def assert_has_distribution(
self, mon_infos, urn, labels, sum=None, count=None, min=None, max=None):
self,
mon_infos,
urn,
labels,
sum=None,
count=None,
min=None,
max=None,
total_stages=1):
# TODO(ajamato): Consider adding a matcher framework
sum = _matcher_or_equal_to(sum)
count = _matcher_or_equal_to(count)
Expand Down Expand Up @@ -1508,7 +1516,7 @@ def assert_has_distribution(
increment = 0
found += increment
self.assertEqual(
1,
total_stages,
found,
"Found (%s) Expected only 1 monitoring_info for %s." % (
found,
Expand Down Expand Up @@ -1597,15 +1605,17 @@ def process(self, element):
counters,
monitoring_infos.ELEMENT_COUNT_URN,
labels,
num_source_elems)
num_source_elems,
total_stages=2)
self.assert_has_distribution(
counters,
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
labels,
min=hamcrest.greater_than(0),
max=hamcrest.greater_than(0),
sum=hamcrest.greater_than(0),
count=hamcrest.greater_than(0))
count=hamcrest.greater_than(0),
total_stages=2)

# GenerateTwoOutputs, "SecondOutput" output.
labels = {
Expand All @@ -1615,15 +1625,17 @@ def process(self, element):
counters,
monitoring_infos.ELEMENT_COUNT_URN,
labels,
2 * num_source_elems)
2 * num_source_elems,
total_stages=2)
self.assert_has_distribution(
counters,
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
labels,
min=hamcrest.greater_than(0),
max=hamcrest.greater_than(0),
sum=hamcrest.greater_than(0),
count=hamcrest.greater_than(0))
count=hamcrest.greater_than(0),
total_stages=2)

# GenerateTwoOutputs, "ThirdOutput" output.
labels = {
Expand All @@ -1633,15 +1645,17 @@ def process(self, element):
counters,
monitoring_infos.ELEMENT_COUNT_URN,
labels,
num_source_elems)
num_source_elems,
total_stages=2)
self.assert_has_distribution(
counters,
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
labels,
min=hamcrest.greater_than(0),
max=hamcrest.greater_than(0),
sum=hamcrest.greater_than(0),
count=hamcrest.greater_than(0))
count=hamcrest.greater_than(0),
total_stages=2)

# Skipping other pcollections due to non-deterministic naming for multiple
# outputs.
Expand All @@ -1653,15 +1667,17 @@ def process(self, element):
counters,
monitoring_infos.ELEMENT_COUNT_URN,
labels,
4 * num_source_elems)
4 * num_source_elems,
total_stages=2)
self.assert_has_distribution(
counters,
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
labels,
min=hamcrest.greater_than(0),
max=hamcrest.greater_than(0),
sum=hamcrest.greater_than(0),
count=hamcrest.greater_than(0))
count=hamcrest.greater_than(0),
total_stages=2)

# PassThrough, main output
labels = {
Expand Down
Loading