Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 73 additions & 16 deletions tap_outbrain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
max_tries=5,
giveup=singer.requests.giveup_on_http_4xx_except_429,
interval=30)
def request(url, access_token, params):
def request(url, access_token, params=None):
# Optional query parameters
if params is None:
params = dict()

LOGGER.info("Making request: GET {} {}".format(url, params))
headers = {'OB-TOKEN-V1': access_token}
if 'user_agent' in CONFIG:
Expand Down Expand Up @@ -115,7 +119,7 @@ def get_date_ranges(start, end, interval_in_days):
'from_date': interval_start,
'to_date': min(end,
(interval_start + datetime.timedelta(
days=interval_in_days-1)))
days=interval_in_days - 1)))
})

interval_start = interval_start + datetime.timedelta(
Expand Down Expand Up @@ -161,7 +165,7 @@ def sync_performance(state, access_token, account_id, table_name, state_sub_id,
# sync 2 days before last saved date, or DEFAULT_START_DATE
from_date = datetime.datetime.strptime(
state.get(table_name, {})
.get(state_sub_id, DEFAULT_START_DATE),
.get(state_sub_id, DEFAULT_START_DATE),
'%Y-%m-%d').date() - datetime.timedelta(days=2)

to_date = datetime.date.today()
Expand All @@ -175,10 +179,10 @@ def sync_performance(state, access_token, account_id, table_name, state_sub_id,
for date_range in date_ranges:
LOGGER.info(
'Pulling {} for {} from {} to {}'
.format(table_name,
extra_persist_fields,
date_range.get('from_date'),
date_range.get('to_date')))
.format(table_name,
extra_persist_fields,
date_range.get('from_date'),
date_range.get('to_date')))

params = {
'from': date_range.get('from_date'),
Expand All @@ -196,11 +200,14 @@ def sync_performance(state, access_token, account_id, table_name, state_sub_id,
access_token,
params).json()
if REPORTS_MARKETERS_PERIODIC_MAX_LIMIT < response.get('totalResults'):
LOGGER.warn('More performance data (`{}`) than the tap can currently retrieve (`{}`)'.format(
response.get('totalResults'), REPORTS_MARKETERS_PERIODIC_MAX_LIMIT))
LOGGER.warn(
'More performance data (`{}`) than the tap can currently retrieve (`{}`)'.format(
response.get('totalResults'), REPORTS_MARKETERS_PERIODIC_MAX_LIMIT))
else:
LOGGER.info('Syncing `{}` rows of performance data for campaign `{}`. Requested `{}`.'.format(
response.get('totalResults'), state_sub_id, REPORTS_MARKETERS_PERIODIC_MAX_LIMIT))
LOGGER.info(
'Syncing `{}` rows of performance data for campaign `{}`. Requested `{}`.'.format(
response.get('totalResults'), state_sub_id,
REPORTS_MARKETERS_PERIODIC_MAX_LIMIT))
last_request_end = utils.now()

LOGGER.info('Done in {} sec'.format(
Expand All @@ -222,12 +229,12 @@ def sync_performance(state, access_token, account_id, table_name, state_sub_id,
from_date = new_from_date

if last_request_start is not None and \
(time.time() - last_request_end.timestamp()) < 30:
(time.time() - last_request_end.timestamp()) < 30:
to_sleep = 30 - (time.time() - last_request_end.timestamp())
LOGGER.info(
'Limiting to 2 requests per minute. Sleeping {} sec '
'before making the next reporting request.'
.format(to_sleep))
.format(to_sleep))
time.sleep(to_sleep)


Expand Down Expand Up @@ -296,8 +303,53 @@ def sync_campaigns(state, access_token, account_id):
LOGGER.info('Done!')


def parse_marketer(marketer):
return {
'id': str(marketer['id']),
'name': str(marketer['name']),
'enabled': bool(marketer['enabled']),
'currency': str(marketer['currency']),
'creationTime': parse_datetime(marketer['creationTime']),
'lastModified': parse_datetime(marketer['lastModified']),
'blockedSites': str(marketer['blockedSites']),
'useFirstPartyCookie': bool(marketer['useFirstPartyCookie']),
}


def get_marketers(account_id, access_token):
"""
Retrieve all Marketers associated with the current user
"""

url = '{}/marketers'.format(BASE_URL, account_id)

marketers = request(url, access_token).json()['marketers']

LOGGER.info('Retrieved %s marketers', len(marketers))

return marketers


def sync_marketers(access_token, account_id):
LOGGER.info('Syncing marketers.')

# Retrieve account data
marketers = get_marketers(account_id, access_token)

# Parse data types
marketers = map(parse_marketer, marketers)

# Emit rows
for marketer in marketers:
singer.write_record('marketers', marketer, time_extracted=utils.now())

LOGGER.info('Done!')

return marketers


def do_sync(args):
#pylint: disable=global-statement
# pylint: disable=global-statement
global DEFAULT_START_DATE
state = DEFAULT_STATE

Expand Down Expand Up @@ -343,7 +395,7 @@ def do_sync(args):
# NEVER RAISE THIS ABOVE DEBUG!
LOGGER.debug('Using access token `{}`'.format(access_token))


singer.write_schema('marketers', schemas.marketer, key_properties=['id'])
singer.write_schema('campaigns',
schemas.campaign,
key_properties=["id"])
Expand All @@ -352,7 +404,12 @@ def do_sync(args):
key_properties=["campaignId", "fromDate"],
bookmark_properties=["fromDate"])

sync_campaigns(state, access_token, account_id)
# Retrieve all accounts that the authenticated account has access to
marketers = sync_marketers(access_token, account_id)

# Iterate over all these customer accounts
for marketer in marketers:
sync_campaigns(state, access_token, marketer['id'])


def main_impl():
Expand Down
36 changes: 33 additions & 3 deletions tap_outbrain/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#pylint: disable=invalid-name
# pylint: disable=invalid-name
link = {
'type': 'object',
'properties': {
Expand Down Expand Up @@ -82,7 +82,6 @@
}
}


campaign = {
'type': 'object',
'properties': {
Expand Down Expand Up @@ -214,7 +213,6 @@
}
}


campaign_performance = {
'type': 'object',
'properties': {
Expand Down Expand Up @@ -337,3 +335,35 @@
}
}
}

marketer = {
'type': 'object',
'properties': {
'id': {
'type': 'number'
},
'name': {
'type': 'string'
},
'enabled': {
'type': 'boolean'
},
'currency': {
'type': 'string'
},
'creationTime': {
'type': 'string',
'format': 'date-time',
},
'lastModified': {
'type': 'string',
'format': 'date-time',
},
'blockedSites': {
'type': 'string'
},
'useFirstPartyCookie': {
'type': 'boolean'
},
},
}