Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,24 @@ This tap:

- Pulls raw data from FrontApp's [API](https://dev.frontapp.com/)
- Extracts the following resources from FrontApp
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
- [Analytics](https://dev.frontapp.com/#analytics)
=======
- [Analytics](https://dev.frontapp.com#analytics)
>>>>>>> cleaned up http.py and added handling for when no data returned.
=======
- [Analytics](https://dev.frontapp.com#analytics)
>>>>>>> cleaned up http.py and added handling for when no data returned.
=======
- [Analytics](https://dev.frontapp.com#analytics)
>>>>>>> 1f95e607f623bdeea55b53cc16f1f3f007dda690
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove/resolve these merge conflict markers. It looks like the unchanged line should just be kept and the added lines removed?

- Hourly/Daily analytics of metrics
- team_table
- [Conversations](https://dev.frontapp.com/#list-conversations)
- List of all conversations by day
- conversations (enriches the data with recipient contact email)
- Outputs the schema for each resource

## Setup
Expand All @@ -21,7 +36,7 @@ python3 ./setup.py install

## Configuration

This tap requires a `config.json` which specifies details regarding [API authentication](https://dev.frontapp.com/#authentication), a cutoff date for syncing historical data, and a time period range [daily,hourly] to control what incremental extract date ranges are. See [config.sample.json](config.sample.json) for an example.
This tap requires a `config.json` which specifies details regarding [API authentication](https://dev.frontapp.com/#authentication) by using a token, a cutoff date for syncing historical data (date format of YYYY-MM-DDTHH:MI:SSZ), and a time period range [daily,hourly] to control what incremental extract date ranges are. See [example.config.json](example.config.json) for an example.

Create the catalog:

Expand All @@ -32,13 +47,14 @@ Create the catalog:
Then to run the extract:

```bash
› tap-frontapp --config config.json --catalog catalog.json --state state.json
› tap-frontapp --config config.json --catalog catalog.json --state state.json
```

Note that a typical state file looks like this:

```json
{"bookmarks": {"team_table": {"date_to_resume": "2018-08-01 00:00:00"}}}
{"bookmarks": {"conversations": {"date_to_resume": "2018-08-01 00:00:00"}}}
```

---
Expand Down
57 changes: 57 additions & 0 deletions stitch_setup_documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# FrontApp
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate documentation and will likely go out of date. The source of truth for setting up a Front connection in Stitch is here https://www.stitchdata.com/docs/integrations/saas/front

Please remove this file.


This tap is for pulling [Analytics](https://dev.frontapp.com/#analytics) data from the FrontApp API. Its current developed scope is limited to the teams table, but it is easily expandable to the other Analytics data sets.

## Connecting FrontApp

### FrontApp Setup Requirements

To set up FrontApp in Stitch, you need to get your JSON web token directly from Front (go to > Plugins & API > API).

### Setup FrontApp as a Stitch source

1. [Sign into your Stitch account](https://app.stitchdata.com/)

2. On the Stitch Dashboard page, click the **Add Integration** button.

3. Click the **FrontApp** icon.

4. Enter a name for the integration. This is the name that will display on the Stitch Dashboard for the integration; it’ll also be used to create the schema in your destination. For example, the name "Stitch FrontApp" would create a schema called `stitch_frontapp` in the destination. **Note**: Schema names cannot be changed after you save the integration.

5. In the **Token** field, enter your FrontApp web token.

6. In the **Metric** field, enter the Analytics metric needed. The only schema supported in this tap right now is the team_table metric.

7. In the **Incremental Range** field, enter the desired aggregation frame (daily or hourly).

8. In the **Start Date** field, enter the minimum, beginning start date for FrontApp Analytics (e.g. 2017-01-1).

---

## FrontApp Replication

With each run of the integration, the following data set is extracted and replicated to the data warehouse:

- **Team Table**: Daily or hourly aggregated team member statistics since the last_update (last completed run of the integration) through the most recent day or hour respectively. On the first run, ALL increments since the **Start Date** will be replicated.
- **Conversations**: Conversation data enriched with recipient email handle from [Contacts API](https://dev.frontapp.com/#contacts) since the last_update (last completed run of the integration) through the most recent day or hour respectively. On the first run, ALL increments since the **Start Date** will be replicated.

---

## FrontApp Table Schemas

### team_table

- Table name: team_table
- Description: A list of team members and their event statistics during the course of the day/hour starting from the analytics_date.
- Primary key: analytics_date, analytics_range, teammate_id
- Replicated incrementally
- Bookmark column: analytics_date (written as resume_date in the state records)
- API endpoint documentation: [Analytics](https://dev.frontapp.com/#analytics)

---

## Troubleshooting / Other Important Info

- **Team_table Data**: The first record is for the teammate = "ALL" and so is an aggregated record across all team members. Also, the API supports pulling specific teams by using a slightly different endpoint, but we have set it up to pull members from all teams.

- **Timestamps**: All timestamp columns and resume_date state parameter are Unix timestamps.
3 changes: 3 additions & 0 deletions tap_frontapp/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def catalog(self, catalog):
for stream in catalog.streams:
mdata = metadata.to_map(stream.metadata)
root_metadata = mdata.get(())

self.selected_stream_ids.add(stream.tap_stream_id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like test code to always select all streams. It should be removed.

if root_metadata and root_metadata.get('selected') is True:
self.selected_stream_ids.add(stream.tap_stream_id)

Expand Down
23 changes: 15 additions & 8 deletions tap_frontapp/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class Client(object):
def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.session = requests.Session()

self.calls_remaining = None
self.limit_reset = None

Expand All @@ -44,7 +43,6 @@ def request(self, method, path, **kwargs):
kwargs['headers']['Authorization'] = self.token

kwargs['headers']['Content-Type'] = 'application/json'

if 'endpoint' in kwargs:
endpoint = kwargs['endpoint']
del kwargs['endpoint']
Expand All @@ -56,14 +54,20 @@ def request(self, method, path, **kwargs):
#so here we just run the request again
time.sleep(2)
response = requests.request(method, self.url(path), **kwargs)

elif path == '/conversations' or '/contacts/' in path:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the rate limiting should already be handled based on this code. Could you just ensure that conversations and contacts requests are not caught by the if and else clauses here?

It would also be helpful if you could supply (redacted) logs showing these requests completing successfully over a minute without a 429 error.

#conversations and contacts don't need to produce a report, sleep for shorter
response = requests.request(method, self.url(path), **kwargs)
time.sleep(.65)
else:
response = requests.request(method, self.url(path), **kwargs)
time.sleep(2)
response = requests.request(method, self.url(path), **kwargs)

#print('final3 url=',response.url)
self.calls_remaining = int(response.headers['X-Ratelimit-Remaining'])
# print('final3 url=',response.url,flush=True)
try:
self.calls_remaining = int(response.headers['X-Ratelimit-Remaining'])
except:
time.sleep(2)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for this? It could use a comment.

  • Does the API sometimes return non-int values for this header? (Resulting in a conversion error to int)
    • If so, they should be handled explicitly or documented in a comment.
  • Does the API sometimes not return this header? (Resulting in a KeyError on [])
    • In that case, it's less clear what action to take. A sleep may be appropriate, but it all depends on the situation.

self.limit_reset = int(float(response.headers['X-Ratelimit-Reset']))

if response.status_code in [429, 503]:
Expand All @@ -76,10 +80,13 @@ def request(self, method, path, **kwargs):
LOGGER.error('{} - {}'.format(response.status_code, response.text))
raise

if len(response.json()['metrics']) > 0:
return response.json()['metrics'][0]['rows']
if path == '/conversations' or '/contacts/' in path:
return response.json()
else:
return {}
if len(response.json()['metrics']) > 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be an elif

return response.json()['metrics'][0]['rows']
else:
return {}

def get(self, path, **kwargs):
return self.request('get', path, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions tap_frontapp/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

class IDS(object): # pylint: disable=too-few-public-methods
TEAM_TABLE = 'team_table'
CONVERSATIONS = 'conversations'

STATIC_SCHEMA_STREAM_IDS = [
IDS.TEAM_TABLE
IDS.TEAM_TABLE,
IDS.CONVERSATIONS
]

PK_FIELDS = {
IDS.TEAM_TABLE: ['analytics_date', 'analytics_range', 'teammate_v']
IDS.TEAM_TABLE: ['analytics_date', 'analytics_range', 'teammate_v'],
IDS.CONVERSATIONS: 'id'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a list? The Singer spec requires primary keys to be a list in Schema messages.

}

def normalize_fieldname(fieldname):
Expand Down
54 changes: 54 additions & 0 deletions tap_frontapp/schemas/conversations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"type": ["null", "object"],
"selected": true,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All instances of selected should be removed from schemas, it's not a valid JSON Schema key. Stream and field selection is now performed via metadata in the catalog.

"additionalProperties": false,
"properties": {
"_links.self": {
"type": ["null", "object"],
"selected": true
},
"id": {
"type": ["null", "string"],
"selected": true
},
"subject": {
"type": ["null", "string"],
"selected": true
},
"status": {
"type": ["null", "string"],
"selected": true
},
"assignee": {
"type": ["null", "object"],
"selected": true
},
"last_message": {
"type": ["null", "string"],
"selected": true
},
"created_at": {
"type": ["null", "string"],
"format": "date",
"selected": true
},
"is_private": {
"type": ["null", "boolean"],
"selected": true
},
"recipient": {
"type": ["null", "object"],
"selected": true
},
"tags": {
"type": ["null", "string"],
"selected": true
},
"contact_email": {
"type": ["null", "string"],
"selected": true
}
},
"key_properties": ["id"],
"bookmark_properties": ["created_at"]
}
90 changes: 72 additions & 18 deletions tap_frontapp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LOGGER = singer.get_logger()

MAX_METRIC_JOB_TIME = 1800
MAX_CONVERSATIONS_JOB_TIME = 7200
METRIC_JOB_POLL_SLEEP = 1

def count(tap_stream_id, records):
Expand Down Expand Up @@ -65,30 +66,55 @@ def select_fields(mdata, obj):
@on_exception(constant, MetricsRateLimitException, max_tries=5, interval=60)
@on_exception(expo, RateLimitException, max_tries=5)
@sleep_and_retry
@limits(calls=1, period=61) # 60 seconds needed to be padded by 1 second to work
def get_metric(atx, metric, start_date, end_date):
LOGGER.info('Metrics query - metric: {} start_date: {} end_date: {} '.format(
metric,
start_date,
end_date))
return atx.client.get('/analytics', params={'start': start_date, \
'end': end_date, 'metrics[]':metric}, endpoint='analytics')
@limits(calls=100, period=61) # 60 seconds needed to be padded by 1 second to work
def get_metric(atx, metric, start_date, end_date, page_token = ''):
if metric == 'conversations':
LOGGER.info('Conversations query - metric: {} start_date: {} end_date: {} '.format(
metric,
start_date,
end_date))
return atx.client.get('/conversations', \
params={'q[before]':end_date,'q[after]':start_date,'page_token':page_token,'limit':100}, \
endpoint='conversations')
else:
LOGGER.info('Metrics query - metric: {} start_date: {} end_date: {} '.format(
metric,
start_date,
end_date))
return atx.client.get('/analytics', params={'start': start_date, \
'end': end_date, 'metrics[]':metric}, endpoint='analytics')

def sync_metric(atx, metric, incremental_range, start_date, end_date):
with singer.metrics.job_timer('daily_aggregated_metric'):
start = time.monotonic()
start_date_formatted = datetime.datetime.utcfromtimestamp(start_date).strftime('%Y-%m-%d')
# we've really moved this functionality to the request in the http script
#so we don't expect that this will actually have to run mult times
while True:
if (time.monotonic() - start) >= MAX_METRIC_JOB_TIME:
raise Exception('Metric job timeout ({} secs)'.format(
MAX_METRIC_JOB_TIME))
data = get_metric(atx, metric, start_date, end_date)
if data != '':
break
else:
time.sleep(METRIC_JOB_POLL_SLEEP)

if metric == 'team_table':
while True:
if (time.monotonic() - start) >= MAX_METRIC_JOB_TIME:
raise Exception('Metric job timeout ({} secs)'.format(
MAX_METRIC_JOB_TIME))
data = get_metric(atx, metric, start_date, end_date)
if data != '':
break
else:
time.sleep(METRIC_JOB_POLL_SLEEP)
if metric == 'conversations':
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing a pattern of "if team_table... if conversations..." It would be safer if the existing sync_metric (and related functions) was renamed to sync_team_table and the conversations metric just had its own sync functions. That way they can share state logic and the sync code can check at the spot linked below to dispatch between the metrics (and potential future metrics).

sync_metric(atx, metric, incremental_range, ut_current_date, ut_next_date)

It also has the effect of reducing branching, which makes this code in its current state very hard to follow.

data = []
pagination_string = ''
while True:
if (time.monotonic() - start) >= MAX_CONVERSATIONS_JOB_TIME:
raise Exception('Metric job timeout ({} secs)'.format(
MAX_METRIC_JOB_TIME))
response = get_metric(atx, metric, start_date, end_date, pagination_string)
if len(response['_results']) > 0:
pagination_string = response['_pagination']['next'].split('page_token=')[1]
for i,entry in enumerate(response['_results']):
data.append(response['_results'][i])
else:
break

data_rows = []
# transform the team_table data
Expand Down Expand Up @@ -148,7 +174,33 @@ def sync_metric(atx, metric, incremental_range, start_date, end_date):
"num_composed_v": row[8]['v'],
"num_composed_p": row[8]['p']
})

elif metric == 'conversations':
for ind,row in enumerate(data):
if ind % 100 == 0:
LOGGER.info('Conversations progress: {} of {}'.format(ind,len(data)))
try:
contact_id = row['recipient']['_links']['related']['contact'].split('contacts/')[1]
path = '/'.join(['/contacts',contact_id])
contact = atx.client.get(path)
contact_email = None
for handles_row in contact['handles']:
if handles_row['source'] == 'email':
contact_email = handles_row['handle']
except:
pass
if contact['custom_fields']:
custom_field_name = atx.config['custom_field']
data_rows.append({
"created_at":datetime.datetime.utcfromtimestamp(row['created_at']).strftime('%Y-%m-%d %H:%M:%S'),
"is_private":row['is_private'],
"id":row['id'],
"subject":row['subject'],
"status":row['status'],
"assignee":row['assignee'],
"recipient":row['recipient'],
"contact_email": contact_email,
"tags": ','.join([tag['name'] for tag in row['tags']])
})
write_records(metric, data_rows)

def write_metrics_state(atx, metric, date_to_resume):
Expand Down Expand Up @@ -222,5 +274,7 @@ def sync_selected_streams(atx):

if IDS.TEAM_TABLE in selected_streams:
sync_metrics(atx, 'team_table')
if IDS.CONVERSATIONS in selected_streams:
sync_metrics(atx, 'conversations')

# add additional analytics here