Skip to content

Introduce a helper to reduce logs in frequently invoked codepaths. #34977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def _updated_to_seconds(updated):

def is_soft_delete_enabled(self, gcs_path):
try:
bucket_name, _ = parse_gcs_path(gcs_path)
bucket_name, _ = parse_gcs_path(gcs_path, object_optional=True)
bucket = self.get_bucket(bucket_name)
if (bucket.soft_delete_policy is not None and
bucket.soft_delete_policy.retention_duration_seconds > 0):
Expand Down
33 changes: 20 additions & 13 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils import logs
from apache_beam.utils import proto_utils

__all__ = [
Expand Down Expand Up @@ -422,9 +423,10 @@ def get_all_options(
known_args, unknown_args = parser.parse_known_args(self._flags)
if retain_unknown_options:
if unknown_args:
_LOGGER.warning(
'Unknown pipeline options received: %s. Ignore if flags are '
'used for internal purposes.' % (','.join(unknown_args)))
if logs.allow_log_once():
_LOGGER.warning(
'Unknown pipeline options received: %s. Ignore if flags are '
'used for internal purposes.' % (','.join(unknown_args)))

seen = set()

Expand Down Expand Up @@ -466,7 +468,8 @@ def add_new_arg(arg, **kwargs):
parsed_args, _ = parser.parse_known_args(self._flags)
else:
if unknown_args:
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
if logs.allow_log_once():
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
parsed_args = known_args
result = vars(parsed_args)

Expand Down Expand Up @@ -1051,7 +1054,7 @@ def _create_default_gcs_bucket(self):
return None
bucket = gcsio.get_or_create_default_gcs_bucket(self)
if bucket:
return 'gs://%s' % bucket.id
return 'gs://%s/' % bucket.id
else:
return None

Expand All @@ -1064,17 +1067,21 @@ def _warn_if_soft_delete_policy_enabled(self, arg_name):
return

gcs_path = getattr(self, arg_name, None)
if not gcs_path or not gcs_path.startswith("gs://"):
return
try:
from apache_beam.io.gcp import gcsio
if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
_LOGGER.warning(
"Bucket specified in %s has soft-delete policy enabled."
" To avoid being billed for unnecessary storage costs, turn"
" off the soft delete feature on buckets that your Dataflow"
" jobs use for temporary and staging storage. For more"
" information, see"
" https://cloud.google.com/storage/docs/use-soft-delete"
"#remove-soft-delete-policy." % arg_name)
# Use bucket name in message_id to still emit logs for different buckets
if logs.allow_log_once("soft-delete warning" + str(gcs_path)):
_LOGGER.warning(
"Bucket specified in %s has soft-delete policy enabled."
" To avoid being billed for unnecessary storage costs, turn"
" off the soft delete feature on buckets that your Dataflow"
" jobs use for temporary and staging storage. For more"
" information, see"
" https://cloud.google.com/storage/docs/use-soft-delete"
"#remove-soft-delete-policy." % arg_name)
except ImportError:
_LOGGER.warning('Unable to check soft delete policy due to import error.')

Expand Down
134 changes: 134 additions & 0 deletions sdks/python/apache_beam/utils/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Helper functions for easier logging.

This module provides a few convenient logging methods, some of which
were adopted from
https://github.com/abseil/abseil-py/blob/master/absl/logging/__init__.py
in
https://github.com/facebookresearch/detectron2/blob/main/detectron2/utils/logger.py
"""
import logging
import os
import sys
import time
from collections import Counter

from typing import Optional, Union


def _find_caller() -> tuple[str, tuple]:
"""
Returns:
str: module name of the caller
tuple: a hashable key to be used to identify different callers
"""
frame = sys._getframe(2)
while frame:
code = frame.f_code
if os.path.join("utils", "logger.") not in code.co_filename:
mod_name = frame.f_globals["__name__"]
if mod_name == "__main__":
mod_name = "apache_beam"
return mod_name, (code.co_filename, frame.f_lineno, code.co_name)
frame = frame.f_back


_LOG_COUNTER = Counter()
_LOG_TIMER = {}


def log_first_n(
lvl: int,
msg: str,
*args,
n: int = 1,
name: Optional[str] = None,
key: Union[str, tuple[str]] = "caller") -> None:
"""
Log only for the first n times.

Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
key (str or tuple[str]): the string(s) can be one of "caller" or
"message", which defines how to identify duplicated logs.
For example, if called with `n=1, key="caller"`, this function
will only log the first call from the same caller, regardless of
the message content.
If called with `n=1, key="message"`, this function will log the
same content only once, even if they are called from different
places. If called with `n=1, key=("caller", "message")`, this
function will not log only if the same caller has logged the same
message before.
"""
if isinstance(key, str):
key = (key, )
assert len(key) > 0

caller_module, caller_key = _find_caller()
hash_key = ()
if "caller" in key:
hash_key = hash_key + caller_key
if "message" in key:
hash_key = hash_key + (msg, )

_LOG_COUNTER[hash_key] += 1
if _LOG_COUNTER[hash_key] <= n:
logging.getLogger(name or caller_module).log(lvl, msg, *args)


def log_every_n(
lvl: int, msg: str, *args, n: int = 1, name: Optional[str] = None) -> None:
"""
Log once per n times.

Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
"""
caller_module, key = _find_caller()
_LOG_COUNTER[key] += 1
if n == 1 or _LOG_COUNTER[key] % n == 1:
logging.getLogger(name or caller_module).log(lvl, msg, *args)


def log_every_n_seconds(
lvl: int, msg: str, *args, n: int = 1, name: Optional[str] = None) -> None:
"""
Log no more than once per n seconds.

Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
"""
caller_module, key = _find_caller()
last_logged = _LOG_TIMER.get(key, None)
current_time = time.time()
if last_logged is None or current_time - last_logged >= n:
logging.getLogger(name or caller_module).log(lvl, msg, *args)
_LOG_TIMER[key] = current_time
84 changes: 84 additions & 0 deletions sdks/python/apache_beam/utils/logger_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import unittest
import logging
from unittest.mock import patch
from apache_beam.utils.logger import log_first_n, log_every_n, log_every_n_seconds, _LOG_COUNTER, _LOG_TIMER

import pytest


@pytest.mark.no_xdist
class TestLogFirstN(unittest.TestCase):
def setUp(self):
_LOG_COUNTER.clear()
_LOG_TIMER.clear()

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_first_n_once(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
for _ in range(5):
log_first_n(logging.INFO, "Test message", n=1)
mock_logger.log.assert_called_once()

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_first_n_multiple(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
for _ in range(5):
log_first_n(logging.INFO, "Test message", n=3)
self.assertEqual(mock_logger.log.call_count, 3)

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_first_n_with_different_callers(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
for _ in range(5):
log_first_n(logging.INFO, "Test message", n=2)

# call from another "caller" (another line)
for _ in range(5):
log_first_n(logging.INFO, "Test message", n=2)

self.assertEqual(mock_logger.log.call_count, 4)

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_first_n_with_message_key(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
log_first_n(logging.INFO, "Test message", n=1, key="message")
log_first_n(logging.INFO, "Test message", n=1, key="message")
self.assertEqual(mock_logger.log.call_count, 1)

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_first_n_with_caller_and_message_key(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
for message in ["Test message", "Another message"]:
for _ in range(5):
log_first_n(logging.INFO, message, n=1, key=("caller", "message"))
self.assertEqual(mock_logger.log.call_count, 2)

@patch('apache_beam.utils.logger.logging.getLogger')
def test_log_every_n_multiple(self, mock_get_logger):
mock_logger = mock_get_logger.return_value
for _ in range(9):
log_every_n(logging.INFO, "Test message", n=2)

self.assertEqual(mock_logger.log.call_count, 5)

@patch('apache_beam.utils.logger.logging.getLogger')
@patch('apache_beam.utils.logger.time.time')
def test_log_every_n_seconds_always(self, mock_time, mock_get_logger):
mock_logger = mock_get_logger.return_value
for i in range(3):
mock_time.return_value = i
log_every_n_seconds(logging.INFO, "Test message", n=0)
self.assertEqual(mock_logger.log.call_count, 3)

@patch('apache_beam.utils.logger.logging.getLogger')
@patch('apache_beam.utils.logger.time.time')
def test_log_every_n_seconds_multiple(self, mock_time, mock_get_logger):
mock_logger = mock_get_logger.return_value
for i in range(4):
mock_time.return_value = i
log_every_n_seconds(logging.INFO, "Test message", n=2)
self.assertEqual(mock_logger.log.call_count, 2)


if __name__ == '__main__':
unittest.main()
95 changes: 95 additions & 0 deletions sdks/python/apache_beam/utils/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import inspect
import time

_LAST_TIME_INVOCATION_ALLOWED_NS = {}


def _allow_infrequent_invocation(
min_interval_sec: int, periodic_action_id: str):

if periodic_action_id in _LAST_TIME_INVOCATION_ALLOWED_NS:
last_observed_ns = _LAST_TIME_INVOCATION_ALLOWED_NS[periodic_action_id]
if time.time_ns() < last_observed_ns + min_interval_sec * 10**9:
return False
_LAST_TIME_INVOCATION_ALLOWED_NS[periodic_action_id] = time.time_ns()
return True


def allow_infrequent_logging(
min_interval_sec: int = 5 * 60, message_id: str = None):
"""Checks whether to allow printing a log message every so often.

Sample usages:
```
if logs.allow_infrequent_logging():
_LOGGER.info("A message to log no more than once in 5 min per process")

if logs.allow_infrequent_logging(min_interval_sec=20*60,
message_id="Data plane debug logs"):
_LOGGER.info("Waiting to receive elements in input queue.")

if logs.allow_infrequent_logging(min_interval_sec=20*60,
message_id="Data plane debug logs"):
_LOGGER.info("Received elements in input queue.")

```

Args:
min_interval_sec: Minimal time interval to wait between logs, in seconds.
message_id: Optional identifier of a log message. If not provided, the
identifier is derived from the location of the caller.
Do not include the message being logged if it can be large.

Returns:
True, if a log message should be produced, False otherwise.
"""
if not message_id:
# Use a location of the code where the helper was invoked.
cf = inspect.currentframe()
message_id = f"{cf.f_back.f_code.co_filename}:{cf.f_back.f_lineno}"

return _allow_infrequent_invocation(min_interval_sec, message_id)


def allow_log_once(message_id: str = None):
"""Checks whether to allow logging a message only once per process.

Args:
message_id: An identifier of a log message. If not provided, the
identifier is derived from the location of the caller.

Returns:
True, if a log message should be produced, False otherwise.

Sample usage:
```
if logs.allow_log_once():
_LOGGER.info("Some message to log no more than once per process")
```

See also `allow_infrequent_logging` for logging a message every so often.
"""
if not message_id:
# Use a location of the code where the helper was invoked.
cf = inspect.currentframe()
message_id = f"{cf.f_back.f_code.co_filename}:{cf.f_back.f_lineno}"

return _allow_infrequent_invocation(
min_interval_sec=10**10, periodic_action_id=message_id)
Loading
Loading