Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.2.0
* Exclude unauthorized streams from the catalog during discovery. [#34](https://github.com/singer-io/tap-outbrain/pull/34)

## 1.1.0
* Upgrade Python to 3.12 in CircleCI [#30](https://github.com/singer-io/tap-outbrain/pull/30)
* Upgrade `singer-python`, `requests` and `python-dateutil` to latest version
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages

setup(name="tap-outbrain",
version="1.1.0",
version="1.2.0",
description="Singer.io tap for extracting data from the Outbrain API",
author="Fishtown Analytics",
url="http://singer.io",
Expand Down
17 changes: 14 additions & 3 deletions tap_outbrain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,14 @@ def sync_campaigns(state, access_token, account_id, selected_streams):

LOGGER.info('Done!')

def do_discover():

def do_discover(client):
LOGGER.info("Starting discovery")
catalog = discover()
catalog = discover(client)
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info("Finished discover")


def do_sync(catalog: singer.Catalog, config: Dict, state):
#pylint: disable=global-statement
global DEFAULT_START_DATE
Expand Down Expand Up @@ -355,7 +357,16 @@ def main_impl():
'start_date'])

if args.discover:
do_discover()
config = args.config
access_token = config.get('access_token') or generate_token(
config.get('username'), config.get('password')
)
if access_token is None:
LOGGER.fatal("Failed to generate a new access token for discover mode.")
raise RuntimeError

client = OutbrainClient(config={**config, 'access_token': access_token})
do_discover(client)
elif args.catalog:
state = args.state or DEFAULT_STATE
do_sync(args.catalog, args.config, state)
Expand Down
13 changes: 12 additions & 1 deletion tap_outbrain/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@
LOGGER = singer.get_logger()
SESSION = requests.Session()

OUTBRAIN_API_BASE = 'https://api.outbrain.com/amplify/v0.1'


class Server429Error(Exception):
pass


class OutbrainForbiddenError(Exception):
pass


class OutbrainClient:

def __init__(self):
def __init__(self, config=None):
self._retry_after = RETRY_RATE_LIMIT_MS / 1000.0 # Conversion to seconds
self.config = config or {}

def _rate_limit_backoff(self):
"""
Expand Down Expand Up @@ -71,6 +78,10 @@ def _call():
self._retry_after = RETRY_RATE_LIMIT_MS
self._retry_after /= 1000.0 # For miliseconds conversion to seconds
raise Server429Error("Rate limit exceeded")
elif resp.status_code == 403:
raise OutbrainForbiddenError(
f"HTTP-error-code: 403, Error: {resp.content!r}"
)
Comment thread
satyendra101 marked this conversation as resolved.
elif resp.status_code >= 400:
LOGGER.error(
f"{method} {req.url} [{resp.status_code} – {resp.content!r}]"
Expand Down
71 changes: 68 additions & 3 deletions tap_outbrain/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,80 @@
from singer import metadata
from singer.catalog import Catalog, CatalogEntry, Schema

from tap_outbrain.client import OutbrainForbiddenError
from tap_outbrain.schema import get_schemas
from tap_outbrain.streams import STREAMS

LOGGER = singer.get_logger()


def discover() -> Catalog:
"""Run the discovery mode, prepare the catalog file and return the
catalog."""
def _apply_access_checks(client, schemas: dict, field_metadata: dict) -> None:
"""
Probe each stream for read access and remove inaccessible streams
(and their children) from *schemas* and *field_metadata* in place.

Note: ``check_access()`` always returns ``True`` for child streams, so
this loop effectively identifies only inaccessible parent streams by
design. Child stream removal is handled separately by
``_prune_inaccessible_children()``.

Raises ``OutbrainForbiddenError`` if no parent streams are accessible.
"""
inaccessible_streams = [
stream_name
for stream_name, stream_cls in STREAMS.items()
if stream_name in schemas
and not stream_cls(client=client).check_access()
]

for stream_name in inaccessible_streams:
schemas.pop(stream_name, None)
field_metadata.pop(stream_name, None)

_prune_inaccessible_children(schemas, field_metadata)

if not schemas:
raise OutbrainForbiddenError(
"HTTP-error-code: 403, Error: The credentials do not have"
" 'read' access to any supported streams. Please re-check configuration."
)
elif inaccessible_streams:
LOGGER.warning(
"No 'read' access to stream(s): %s. Excluded from catalog.",
", ".join(inaccessible_streams),
)


def _prune_inaccessible_children(schemas: dict, field_metadata: dict) -> None:
"""
Remove child streams from the catalog whose parent stream was excluded.
Mutates *schemas* and *field_metadata* in place.
"""
for name, stream_cls in list(STREAMS.items()):
if name in schemas and stream_cls.parent and stream_cls.parent not in schemas:
LOGGER.warning(
"Stream '%s' excluded from catalog because its parent"
" stream '%s' is not accessible.",
name,
stream_cls.parent,
)
schemas.pop(name, None)
field_metadata.pop(name, None)


def discover(client=None) -> Catalog:
"""
Run the discovery mode, prepare the catalog file and return the catalog.

When *client* is provided, access to each stream is verified using the
supplied ``OutbrainClient`` and streams the credentials cannot read are
excluded from the returned catalog.
"""
schemas, field_metadata = get_schemas()

if client is not None: # Checking for client as integration tests are written without client in discovery
_apply_access_checks(client, schemas, field_metadata)

catalog = Catalog([])
for stream_name, schema_dict in schemas.items():
try:
Expand Down
61 changes: 59 additions & 2 deletions tap_outbrain/streams.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,74 @@
class Campaign:
import singer

from tap_outbrain.client import OUTBRAIN_API_BASE, OutbrainForbiddenError

LOGGER = singer.get_logger()


class BaseStream:
"""
Abstract base class for all tap-outbrain streams.

Subclasses set class-level attributes describing the stream and may
override ``check_access()`` to probe the appropriate API endpoint.
Child streams (those with ``parent`` set) always return ``True`` from
the default ``check_access()`` implementation; their accessibility is
governed by their parent stream's check.
"""

name = None
key_properties = []
replication_keys = None
replication_method = None
parent = None

def __init__(self, client=None):
self.client = client

def check_access(self) -> bool:
"""
Verify that the API credentials have read access to this stream.
Returns True if accessible, False if a 403 Forbidden error is raised.
Child streams always return True (access is governed by the parent check).
"""
return True


class Campaign(BaseStream):
name = "campaign"
key_properties = ["id"]
replication_keys = None
replication_method = "FULL_TABLE"

def check_access(self) -> bool:
"""
Probe the campaigns endpoint to verify read access.
Returns False when the API responds with HTTP 403 Forbidden.
"""
account_id = self.client.config.get("account_id")
access_token = self.client.config.get("access_token")
headers = {"OB-TOKEN-V1": access_token}
url = f"{OUTBRAIN_API_BASE}/marketers/{account_id}/campaigns"
try:
self.client.make_request("GET", url, headers=headers, params={"limit": 1})
return True
except OutbrainForbiddenError as exc:
LOGGER.warning(
"Permission Error: Stream '%s' - %s",
self.__class__.__name__,
exc,
)
Comment thread
satyendra101 marked this conversation as resolved.
return False


class CampaignPerformance:
class CampaignPerformance(BaseStream):
name = "campaign_performance"
key_properties = ["campaignId", "fromDate"]
bookmark_properties = ["fromDate"]
replication_keys = "fromDate"
replication_method = "INCREMENTAL"
parent = "campaign"
# check_access() inherited from BaseStream always returns True for child streams


STREAMS = {
Expand Down
6 changes: 3 additions & 3 deletions tests/unittests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ def mock_file_content(filename, mode):

def test_streams_configuration(self):
"""Test that STREAMS configuration has correct parent attribute."""
# Verify Campaign class does not have parent attribute
self.assertFalse(hasattr(STREAMS['campaign'], 'parent'))
# Campaign is a root stream — parent is None (inherited from BaseStream)
self.assertIsNone(STREAMS['campaign'].parent)

# Verify CampaignPerformance class has parent attribute set correctly
self.assertTrue(hasattr(STREAMS['campaign_performance'], 'parent'))
self.assertIsNotNone(STREAMS['campaign_performance'].parent)
self.assertEqual(STREAMS['campaign_performance'].parent, 'campaign')

@patch('builtins.open', new_callable=mock_open)
Expand Down
Loading