Skip to content

Feature/sac 27535 frontapp #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester

steps:
- checkout
- run:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ tags
singer-check-tap-data
state.json
catalog.json
output.json
convert_json_csv.py
notes_frontapp.txt
4 changes: 0 additions & 4 deletions example.config.json

This file was deleted.

12 changes: 6 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
install_requires=[
"singer-python==5.4.0",
"pendulum",
"ratelimit",
"backoff==1.3.2",
"requests==2.31.0",
"singer-python==6.1.1",
"pendulum==3.1.0",
"ratelimit==2.2.1",
"backoff==2.2.1",
"requests==2.32.3",
],
entry_points="""
[console_scripts]
Expand All @@ -25,4 +25,4 @@
"schemas": ["tap_frontapp/schemas/*.json"]
},
include_package_data=True
)
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EOF

66 changes: 11 additions & 55 deletions tap_frontapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,88 +6,44 @@

import singer
from singer import utils
from singer.catalog import Catalog, CatalogEntry, Schema
from . import streams
from singer.catalog import Catalog
from .context import Context
from .discover import discover
from .sync import sync
from . import schemas

REQUIRED_CONFIG_KEYS = ["token"]

LOGGER = singer.get_logger()

#def check_authorization(atx):
# atx.client.get('/settings')


# Some taps do discovery dynamically where the catalog is read in from a
# call to the api but with the odd frontapp structure, we won't do that
# here we never use atx in here since the schema is from file but we
# would use it if we pulled schema from the API def discover(atx):
def discover():
catalog = Catalog([])
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
#print("tap stream id=",tap_stream_id)
schema = Schema.from_dict(schemas.load_schema(tap_stream_id))
metadata = []
for field_name in schema.properties.keys():
#print("field name=",field_name)
if field_name in schemas.PK_FIELDS[tap_stream_id]:
inclusion = 'automatic'
else:
inclusion = 'available'
metadata.append({
'metadata': {
'inclusion': inclusion
},
'breadcrumb': ['properties', field_name]
})
catalog.streams.append(CatalogEntry(
stream=tap_stream_id,
tap_stream_id=tap_stream_id,
key_properties=schemas.PK_FIELDS[tap_stream_id],
schema=schema,
metadata=metadata
))
return catalog


def get_abs_path(path: str):
"""Returns absolute path for URL."""
def get_abs_path(path):
"""Returns absolute path for a given relative path."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


# this is already defined in schemas.py though w/o dependencies. do we keep this for the sync?
def load_schema(tap_stream_id):
path = "schemas/{}.json".format(tap_stream_id)
"""Loads schema from JSON file, resolving dependencies."""
path = f"schemas/{tap_stream_id}.json"
schema = utils.load_json(get_abs_path(path))
dependencies = schema.pop("tap_schema_dependencies", [])
refs = {}
for sub_stream_id in dependencies:
refs[sub_stream_id] = load_schema(sub_stream_id)
refs = {sub_stream_id: load_schema(sub_stream_id) for sub_stream_id in dependencies}
if refs:
singer.resolve_schema_references(schema, refs)
return schema


def sync(atx):
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
schemas.load_and_write_schema(tap_stream_id)

streams.sync_selected_streams(atx)


@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
atx = Context(args.config, args.state)

if args.discover:
# the schema is static from file so we don't need to pass in atx for connection info.
catalog = discover()
json.dump(catalog.to_dict(), sys.stdout)
else:
atx.catalog = Catalog.from_dict(args.properties) \
if args.properties else discover()
atx.catalog = Catalog.from_dict(args.properties) if args.properties else discover()
sync(atx)


if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions tap_frontapp/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import singer
from singer import metadata
from singer.catalog import Catalog, CatalogEntry, Schema
from .schemas import get_schemas

LOGGER = singer.get_logger()


def discover():
"""Run the discovery mode, prepare the catalog file and return the catalog."""
schemas, field_metadata = get_schemas()
print("Schemas loaded:", schemas.keys())

catalog = Catalog([])

for stream_name, schema_dict in schemas.items():
try:
schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]
except Exception as err:
LOGGER.error(err)
LOGGER.error(f"stream_name: {stream_name}")
LOGGER.error(f"type schema_dict: {type(schema_dict)}")
raise err

key_properties = mdata.get((), {}).get("table-key-properties",[])

catalog.streams.append(
CatalogEntry(
stream=stream_name,
tap_stream_id=stream_name,
key_properties=key_properties,
schema=schema,
metadata=metadata.to_list(mdata),
)
)

return catalog
82 changes: 51 additions & 31 deletions tap_frontapp/schemas.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""Schema definitions and metadata handling for Frontapp streams."""
import os
import re

import singer
from singer import utils

from singer import metadata, utils

class IDS(object): # pylint: disable=too-few-public-methods
ACCOUNTS_TABLE = 'accounts_table'
CHANNELS_TABLE = 'channels_table'
INBOXES_TABLE = 'inboxes_table'
TAGS_TABLE = 'tags_table'
TEAMMATES_TABLE = 'teammates_table'
TEAMS_TABLE = 'teams_table'
LOGGER = singer.get_logger()

class IDS: # pylint: disable=too-few-public-methods
"""Stream identifier constants."""
ACCOUNTS_TABLE = "accounts_table"
CHANNELS_TABLE = "channels_table"
INBOXES_TABLE = "inboxes_table"
TAGS_TABLE = "tags_table"
TEAMMATES_TABLE = "teammates_table"
TEAMS_TABLE = "teams_table"

STATIC_SCHEMA_STREAM_IDS = [
IDS.ACCOUNTS_TABLE,
Expand All @@ -24,38 +25,57 @@ class IDS(object): # pylint: disable=too-few-public-methods
]

PK_FIELDS = {
IDS.ACCOUNTS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.CHANNELS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.INBOXES_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TAGS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TEAMMATES_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TEAMS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.ACCOUNTS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.CHANNELS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.INBOXES_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TAGS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TEAMMATES_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TEAMS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
}


def normalize_fieldname(fieldname):
"""Normalize field names to snake_case."""
fieldname = fieldname.lower()
fieldname = re.sub(r'[\s\-]', '_', fieldname)
return re.sub(r'[^a-z0-9_]', '', fieldname)


# the problem with the schema we see coming from team_table is that it's a little inconsistent:
# {"t":"str","v":"All","p":"All"},{"t":"num","v":306,"p":465},{"t":"num","v":2.65,"p":2.39},...
# ,[{"t":"teammate","v":"Andrew","url":"/api/1/companies/theguild_co/team/andrew","id":253419},...
# so we see that the type = teammate is different when it covers All team members
# also we see the schema where type = num or dur is actually a triplet of type, value, and previous
# so it looks like we need to hardcode those anomalies into this file
fieldname = re.sub(r"[\s\-]", "_", fieldname)
return re.sub(r"[^a-z0-9_]", "", fieldname)

def get_abs_path(path):
"""Get absolute path for schema files."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


def load_schema(tap_stream_id):
path = 'schemas/{}.json'.format(tap_stream_id)
# print("schema path=",path)
"""Load schema file for specified stream."""
path = f"schemas/{tap_stream_id}.json"
return utils.load_json(get_abs_path(path))


def load_and_write_schema(tap_stream_id):
"""Write schema to singer catalog."""
schema = load_schema(tap_stream_id)
singer.write_schema(tap_stream_id, schema, PK_FIELDS[tap_stream_id])

def get_schemas():
"""Load all schemas and construct metadata using Singer standards."""
schemas = {}
metadata_map = {}

for stream_id in STATIC_SCHEMA_STREAM_IDS:
schema = load_schema(stream_id)
mdata = metadata.new()

# Stream-level metadata
mdata = metadata.write(mdata, (), "inclusion", "available")
mdata = metadata.write(mdata, (), "selected-by-default", True)
mdata = metadata.write(mdata, (), "inclusion-reason", "automatic")
mdata = metadata.write(mdata, (), "table-key-properties", PK_FIELDS[stream_id])

# Field-level metadata
for field_name in schema["properties"]:
inclusion = "automatic" if field_name in PK_FIELDS[stream_id] else "available"
mdata = metadata.write(mdata, ("properties", field_name), "inclusion", inclusion)
mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True)
mdata = metadata.write(mdata, ("properties", field_name), "inclusion-reason", "manual")

schemas[stream_id] = schema
metadata_map[stream_id] = mdata

return schemas, metadata_map
35 changes: 35 additions & 0 deletions tap_frontapp/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

import singer
from tap_frontapp.streams import sync_selected_streams
from tap_frontapp.schemas import load_and_write_schema, STATIC_SCHEMA_STREAM_IDS

LOGGER = singer.get_logger()


def update_currently_syncing(state, stream_name):
"""Update the currently syncing stream in the Singer state."""
if not stream_name and singer.get_currently_syncing(state):
del state["currently_syncing"]
else:
singer.set_currently_syncing(state, stream_name)
singer.write_state(state)


def sync(atx):
"""Main sync method to process selected streams from FrontApp."""

catalog = atx.catalog
state = atx.state

streams_to_sync = [s.tap_stream_id for s in catalog.get_selected_streams(state)]
LOGGER.info("Selected streams: %s", streams_to_sync)

last_stream = singer.get_currently_syncing(state)
LOGGER.info("Currently syncing: %s", last_stream)

for stream_name in STATIC_SCHEMA_STREAM_IDS:
load_and_write_schema(stream_name)

LOGGER.info("Starting sync of selected streams.")
sync_selected_streams(atx)
LOGGER.info("All selected streams synced successfully.")
Loading