Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester

steps:
- checkout
- run:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ tags
singer-check-tap-data
state.json
catalog.json
output.json
convert_json_csv.py
notes_frontapp.txt
4 changes: 0 additions & 4 deletions example.config.json

This file was deleted.

10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
install_requires=[
"singer-python==5.4.0",
"pendulum",
"ratelimit",
"backoff==1.3.2",
"requests==2.31.0",
"singer-python==6.1.1",
"pendulum==3.1.0",
"ratelimit==2.2.1",
"backoff==2.2.1",
"requests==2.32.3",
],
entry_points="""
[console_scripts]
Expand Down
81 changes: 27 additions & 54 deletions tap_frontapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,88 +6,61 @@

import singer
from singer import utils
from singer.catalog import Catalog, CatalogEntry, Schema
from . import streams
from singer.catalog import Catalog
from .context import Context
from .discover import discover
from .sync import sync
from . import schemas

REQUIRED_CONFIG_KEYS = ["token"]

LOGGER = singer.get_logger()

#def check_authorization(atx):
# atx.client.get('/settings')


# Some taps do discovery dynamically where the catalog is read in from a
# call to the api but with the odd frontapp structure, we won't do that
# here we never use atx in here since the schema is from file but we
# would use it if we pulled schema from the API def discover(atx):
def discover():
catalog = Catalog([])
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
#print("tap stream id=",tap_stream_id)
schema = Schema.from_dict(schemas.load_schema(tap_stream_id))
metadata = []
for field_name in schema.properties.keys():
#print("field name=",field_name)
if field_name in schemas.PK_FIELDS[tap_stream_id]:
inclusion = 'automatic'
else:
inclusion = 'available'
metadata.append({
'metadata': {
'inclusion': inclusion
},
'breadcrumb': ['properties', field_name]
})
catalog.streams.append(CatalogEntry(
stream=tap_stream_id,
tap_stream_id=tap_stream_id,
key_properties=schemas.PK_FIELDS[tap_stream_id],
schema=schema,
metadata=metadata
))
return catalog


def get_abs_path(path: str):
"""Returns absolute path for URL."""
def get_abs_path(path):
"""Returns absolute path for a given relative path."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


# this is already defined in schemas.py though w/o dependencies. do we keep this for the sync?
def load_schema(tap_stream_id):
path = "schemas/{}.json".format(tap_stream_id)
"""Loads schema from JSON file, resolving dependencies."""
path = f"schemas/{tap_stream_id}.json"
schema = utils.load_json(get_abs_path(path))
dependencies = schema.pop("tap_schema_dependencies", [])
refs = {}
for sub_stream_id in dependencies:
refs[sub_stream_id] = load_schema(sub_stream_id)
refs = {sub_stream_id: load_schema(sub_stream_id) for sub_stream_id in dependencies}
if refs:
singer.resolve_schema_references(schema, refs)
return schema


def sync(atx):
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
schemas.load_and_write_schema(tap_stream_id)

streams.sync_selected_streams(atx)
def validate_credentials(token):
"""Validates the FrontApp token using a simple API call"""
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get("https://api2.frontapp.com/me", headers=headers, timeout=10)
if response.status_code == 200:
LOGGER.info("Frontapp credentials validated successfully.")
else:
LOGGER.critical("Invalid Frontapp credentials. Status code: %s", response.status_code)
sys.exit(1)
except requests.exceptions.RequestException as err:
LOGGER.critical("Credential validation failed: %s", str(err))
sys.exit(1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to discovery.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Done, Kindly review



@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
#Validate credentials early
validate_credentials(args.config["token'])

atx = Context(args.config, args.state)

if args.discover:
# the schema is static from file so we don't need to pass in atx for connection info.
catalog = discover()
json.dump(catalog.to_dict(), sys.stdout)
else:
atx.catalog = Catalog.from_dict(args.properties) \
if args.properties else discover()
atx.catalog = Catalog.from_dict(args.properties) if args.properties else discover()
sync(atx)


if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions tap_frontapp/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import singer
from singer import metadata
from singer.catalog import Catalog, CatalogEntry, Schema
from .schemas import get_schemas

LOGGER = singer.get_logger()


def discover():
"""Run the discovery mode, prepare the catalog file and return the catalog."""
schemas, field_metadata = get_schemas()
LOGGER.info("Schemas loaded: %s , list(schemas.keys()))

catalog = Catalog([])

for stream_name, schema_dict in schemas.items():
try:
schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]
except Exception as err:
LOGGER.error(err)
LOGGER.error(f"stream_name: {stream_name}")
LOGGER.error(f"type schema_dict: {type(schema_dict)}")
raise err

key_properties = mdata.get((), {}).get("table-key-properties",[])

catalog.streams.append(
CatalogEntry(
stream=stream_name,
tap_stream_id=stream_name,
key_properties=key_properties,
schema=schema,
metadata=metadata.to_list(mdata),
)
)

return catalog
82 changes: 51 additions & 31 deletions tap_frontapp/schemas.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""Schema definitions and metadata handling for Frontapp streams."""
import os
import re

import singer
from singer import utils

from singer import metadata, utils

class IDS(object): # pylint: disable=too-few-public-methods
ACCOUNTS_TABLE = 'accounts_table'
CHANNELS_TABLE = 'channels_table'
INBOXES_TABLE = 'inboxes_table'
TAGS_TABLE = 'tags_table'
TEAMMATES_TABLE = 'teammates_table'
TEAMS_TABLE = 'teams_table'
LOGGER = singer.get_logger()

class IDS: # pylint: disable=too-few-public-methods
"""Stream identifier constants."""
ACCOUNTS_TABLE = "accounts_table"
CHANNELS_TABLE = "channels_table"
INBOXES_TABLE = "inboxes_table"
TAGS_TABLE = "tags_table"
TEAMMATES_TABLE = "teammates_table"
TEAMS_TABLE = "teams_table"

STATIC_SCHEMA_STREAM_IDS = [
IDS.ACCOUNTS_TABLE,
Expand All @@ -24,38 +25,57 @@ class IDS(object): # pylint: disable=too-few-public-methods
]

PK_FIELDS = {
IDS.ACCOUNTS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.CHANNELS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.INBOXES_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TAGS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TEAMMATES_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.TEAMS_TABLE: ['analytics_date', 'analytics_range', 'report_id', 'metric_id'],
IDS.ACCOUNTS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.CHANNELS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.INBOXES_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TAGS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TEAMMATES_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
IDS.TEAMS_TABLE: ["analytics_date", "analytics_range", "report_id", "metric_id"],
}


def normalize_fieldname(fieldname):
"""Normalize field names to snake_case."""
fieldname = fieldname.lower()
fieldname = re.sub(r'[\s\-]', '_', fieldname)
return re.sub(r'[^a-z0-9_]', '', fieldname)


# the problem with the schema we see coming from team_table is that it's a little inconsistent:
# {"t":"str","v":"All","p":"All"},{"t":"num","v":306,"p":465},{"t":"num","v":2.65,"p":2.39},...
# ,[{"t":"teammate","v":"Andrew","url":"/api/1/companies/theguild_co/team/andrew","id":253419},...
# so we see that the type = teammate is different when it covers All team members
# also we see the schema where type = num or dur is actually a triplet of type, value, and previous
# so it looks like we need to hardcode those anomalies into this file
fieldname = re.sub(r"[\s\-]", "_", fieldname)
return re.sub(r"[^a-z0-9_]", "", fieldname)

def get_abs_path(path):
"""Get absolute path for schema files."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)


def load_schema(tap_stream_id):
path = 'schemas/{}.json'.format(tap_stream_id)
# print("schema path=",path)
"""Load schema file for specified stream."""
path = f"schemas/{tap_stream_id}.json"
return utils.load_json(get_abs_path(path))


def load_and_write_schema(tap_stream_id):
"""Write schema to singer catalog."""
schema = load_schema(tap_stream_id)
singer.write_schema(tap_stream_id, schema, PK_FIELDS[tap_stream_id])

def get_schemas():
"""Load all schemas and construct metadata using Singer standards."""
schemas = {}
metadata_map = {}

for stream_id in STATIC_SCHEMA_STREAM_IDS:
schema = load_schema(stream_id)
mdata = metadata.new()

# Stream-level metadata
mdata = metadata.write(mdata, (), "inclusion", "available")
mdata = metadata.write(mdata, (), "selected-by-default", True)
mdata = metadata.write(mdata, (), "inclusion-reason", "automatic")
mdata = metadata.write(mdata, (), "table-key-properties", PK_FIELDS[stream_id])

# Field-level metadata
for field_name in schema["properties"]:
inclusion = "automatic" if field_name in PK_FIELDS[stream_id] else "available"
mdata = metadata.write(mdata, ("properties", field_name), "inclusion", inclusion)
mdata = metadata.write(mdata, ("properties", field_name), "selected-by-default", True)
mdata = metadata.write(mdata, ("properties", field_name), "inclusion-reason", "manual")

schemas[stream_id] = schema
metadata_map[stream_id] = mdata

return schemas, metadata_map
35 changes: 35 additions & 0 deletions tap_frontapp/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

import singer
from tap_frontapp.streams import sync_selected_streams
from tap_frontapp.schemas import load_and_write_schema, STATIC_SCHEMA_STREAM_IDS

LOGGER = singer.get_logger()


def update_currently_syncing(state, stream_name):
"""Update the currently syncing stream in the Singer state."""
if not stream_name and singer.get_currently_syncing(state):
del state["currently_syncing"]
else:
singer.set_currently_syncing(state, stream_name)
singer.write_state(state)


def sync(atx):
"""Main sync method to process selected streams from FrontApp."""

catalog = atx.catalog
state = atx.state

streams_to_sync = [s.tap_stream_id for s in catalog.get_selected_streams(state)]
LOGGER.info("Selected streams: %s", streams_to_sync)

last_stream = singer.get_currently_syncing(state)
LOGGER.info("Currently syncing: %s", last_stream)

for stream_name in STATIC_SCHEMA_STREAM_IDS:
load_and_write_schema(stream_name)

LOGGER.info("Starting sync of selected streams.")
sync_selected_streams(atx)
LOGGER.info("All selected streams synced successfully.")
Loading