Skip to content

Commit 8c216b2

Browse files
committed
Accomodate addtional stage reporting monitoring data
1 parent e6f8714 commit 8c216b2

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

sdks/python/apache_beam/runners/direct/direct_runner_test.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ def test_flatten_two(self):
255255
pb_b = (
256256
pc_first
257257
| f"{label}/MapB" >> beam.Map(lambda x, y: ("b", 2), y=pv_a)
258-
#| f"{label}/Reshuffle" >> beam.Reshuffle() # beam 2.38 works without Reshuffle here
258+
#| f"{label}/Reshuffle" >> beam.Reshuffle()
259+
# # beam 2.38 works without Reshuffle here
259260
)
260261

261262
pc_c = ((pc_a, pb_b) | f"{label}/Flatten" >> beam.Flatten())

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py

+28-12
Original file line numberDiff line numberDiff line change
@@ -1446,7 +1446,7 @@ def test_group_by_key_with_empty_pcoll_elements(self):
14461446
# the sampling counter.
14471447
class FnApiRunnerMetricsTest(unittest.TestCase):
14481448
def assert_has_counter(
1449-
self, mon_infos, urn, labels, value=None, ge_value=None):
1449+
self, mon_infos, urn, labels, value=None, ge_value=None, total_stages=1):
14501450
# TODO(ajamato): Consider adding a matcher framework
14511451
found = 0
14521452
matches = []
@@ -1464,7 +1464,7 @@ def assert_has_counter(
14641464
ge_value_str = {'ge_value': ge_value} if ge_value else ''
14651465
value_str = {'value': value} if value else ''
14661466
self.assertEqual(
1467-
1,
1467+
total_stages,
14681468
found,
14691469
"Found (%s, %s) Expected only 1 monitoring_info for %s." % (
14701470
found,
@@ -1473,7 +1473,15 @@ def assert_has_counter(
14731473
))
14741474

14751475
def assert_has_distribution(
1476-
self, mon_infos, urn, labels, sum=None, count=None, min=None, max=None):
1476+
self,
1477+
mon_infos,
1478+
urn,
1479+
labels,
1480+
sum=None,
1481+
count=None,
1482+
min=None,
1483+
max=None,
1484+
total_stages=1):
14771485
# TODO(ajamato): Consider adding a matcher framework
14781486
sum = _matcher_or_equal_to(sum)
14791487
count = _matcher_or_equal_to(count)
@@ -1508,7 +1516,7 @@ def assert_has_distribution(
15081516
increment = 0
15091517
found += increment
15101518
self.assertEqual(
1511-
1,
1519+
total_stages,
15121520
found,
15131521
"Found (%s) Expected only 1 monitoring_info for %s." % (
15141522
found,
@@ -1597,15 +1605,17 @@ def process(self, element):
15971605
counters,
15981606
monitoring_infos.ELEMENT_COUNT_URN,
15991607
labels,
1600-
num_source_elems)
1608+
num_source_elems,
1609+
total_stages=2)
16011610
self.assert_has_distribution(
16021611
counters,
16031612
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
16041613
labels,
16051614
min=hamcrest.greater_than(0),
16061615
max=hamcrest.greater_than(0),
16071616
sum=hamcrest.greater_than(0),
1608-
count=hamcrest.greater_than(0))
1617+
count=hamcrest.greater_than(0),
1618+
total_stages=2)
16091619

16101620
# GenerateTwoOutputs, "SecondOutput" output.
16111621
labels = {
@@ -1615,15 +1625,17 @@ def process(self, element):
16151625
counters,
16161626
monitoring_infos.ELEMENT_COUNT_URN,
16171627
labels,
1618-
2 * num_source_elems)
1628+
2 * num_source_elems,
1629+
total_stages=2)
16191630
self.assert_has_distribution(
16201631
counters,
16211632
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
16221633
labels,
16231634
min=hamcrest.greater_than(0),
16241635
max=hamcrest.greater_than(0),
16251636
sum=hamcrest.greater_than(0),
1626-
count=hamcrest.greater_than(0))
1637+
count=hamcrest.greater_than(0),
1638+
total_stages=2)
16271639

16281640
# GenerateTwoOutputs, "ThirdOutput" output.
16291641
labels = {
@@ -1633,15 +1645,17 @@ def process(self, element):
16331645
counters,
16341646
monitoring_infos.ELEMENT_COUNT_URN,
16351647
labels,
1636-
num_source_elems)
1648+
num_source_elems,
1649+
total_stages=2)
16371650
self.assert_has_distribution(
16381651
counters,
16391652
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
16401653
labels,
16411654
min=hamcrest.greater_than(0),
16421655
max=hamcrest.greater_than(0),
16431656
sum=hamcrest.greater_than(0),
1644-
count=hamcrest.greater_than(0))
1657+
count=hamcrest.greater_than(0),
1658+
total_stages=2)
16451659

16461660
# Skipping other pcollections due to non-deterministic naming for multiple
16471661
# outputs.
@@ -1653,15 +1667,17 @@ def process(self, element):
16531667
counters,
16541668
monitoring_infos.ELEMENT_COUNT_URN,
16551669
labels,
1656-
4 * num_source_elems)
1670+
4 * num_source_elems,
1671+
total_stages=2)
16571672
self.assert_has_distribution(
16581673
counters,
16591674
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
16601675
labels,
16611676
min=hamcrest.greater_than(0),
16621677
max=hamcrest.greater_than(0),
16631678
sum=hamcrest.greater_than(0),
1664-
count=hamcrest.greater_than(0))
1679+
count=hamcrest.greater_than(0),
1680+
total_stages=2)
16651681

16661682
# PassThrough, main output
16671683
labels = {

0 commit comments

Comments
 (0)