Skip to content

Commit 5b59ac7

Browse files
authored
Payroll and Payroll ACC (#26)
1 parent 4e66950 commit 5b59ac7

6 files changed

Lines changed: 219 additions & 2 deletions

File tree

meltano.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ plugins:
4747
kind: date_iso8601
4848
label: Start Date
4949
description: Initial date to start extracting data from
50+
config:
51+
start_date: 2025-06-01
52+
select:
53+
- payroll_output.*
54+
- payroll_output_acc.*
5055

5156
loaders:
5257
- name: target-jsonl

tap_adp/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@
44

55
import decimal
66
import typing as t
7+
from functools import cached_property
78
from importlib import resources
89

910
from singer_sdk.authenticators import OAuthAuthenticator
11+
from singer_sdk.helpers._typing import TypeConformanceLevel
1012
from singer_sdk.helpers.jsonpath import extract_jsonpath
11-
from singer_sdk.pagination import BaseAPIPaginator
13+
from singer_sdk.pagination import BaseAPIPaginator
1214
from singer_sdk.streams import RESTStream
1315
from tap_adp.authenticator import ADPAuthenticator
14-
from functools import cached_property
1516

1617
if t.TYPE_CHECKING:
1718
import requests
19+
1820
from singer_sdk.helpers.types import Context
1921

2022

@@ -28,6 +30,7 @@ class ADPStream(RESTStream):
2830
next_page_token_jsonpath = None
2931
replication_key = None
3032
_LOG_REQUEST_METRIC_URLS: bool = True
33+
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY
3134

3235
@property
3336
def url_base(self) -> str:
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
{
2+
"type": ["object", "null"],
3+
"properties": {
4+
"itemID": {
5+
"type": ["string", "null"]
6+
},
7+
"payrollProcessingJobID": {
8+
"type": ["string", "null"]
9+
},
10+
"alternateJobIDs": {
11+
"type": ["array", "null"],
12+
"items": {
13+
"type": ["object", "null"],
14+
"additionalProperties": true,
15+
"properties": {}
16+
}
17+
},
18+
"payrollRegionCode": {
19+
"type": ["object", "null"],
20+
"additionalProperties": true,
21+
"properties": {}
22+
},
23+
"payrollGroupCode": {
24+
"type": ["object", "null"],
25+
"additionalProperties": true,
26+
"properties": {}
27+
},
28+
"payrollScheduleReference": {
29+
"type": ["object", "null"],
30+
"additionalProperties": true,
31+
"properties": {}
32+
},
33+
"payrollProcessingJobStatusCode": {
34+
"type": ["object", "null"],
35+
"additionalProperties": true,
36+
"properties": {}
37+
},
38+
"_sdc_payroll_item_id": {
39+
"type": ["string", "null"]
40+
},
41+
"_sdc_modified_schedule_entry_id": {
42+
"type": ["string", "null"],
43+
"format": "date-time"
44+
}
45+
},
46+
"additionalProperties": true
47+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"type": ["object", "null"],
3+
"properties": {
4+
"itemID": {
5+
"type": ["string", "null"]
6+
},
7+
"payrollProcessingJobID": {
8+
"type": ["string", "null"]
9+
},
10+
"alternateJobIDs": {
11+
"type": ["array", "null"],
12+
"items": {
13+
"type": ["object", "null"],
14+
"additionalProperties": true,
15+
"properties": {}
16+
}
17+
},
18+
"payrollRegionCode": {
19+
"type": ["object", "null"],
20+
"additionalProperties": true,
21+
"properties": {}
22+
},
23+
"payrollGroupCode": {
24+
"type": ["object", "null"],
25+
"additionalProperties": true,
26+
"properties": {}
27+
},
28+
"payrollScheduleReference": {
29+
"type": ["object", "null"],
30+
"additionalProperties": true,
31+
"properties": {}
32+
},
33+
"payrollProcessingJobStatusCode": {
34+
"type": ["object", "null"],
35+
"additionalProperties": true,
36+
"properties": {}
37+
},
38+
"associatePayments": {
39+
"type": ["array", "null"],
40+
"items": {
41+
"type": ["object", "null"],
42+
"additionalProperties": true,
43+
"properties": {}
44+
}
45+
},
46+
"_sdc_payroll_item_id": {
47+
"type": ["string", "null"]
48+
}
49+
},
50+
"additionalProperties": true
51+
}

tap_adp/streams.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from __future__ import annotations
44

55
import typing as t
6+
from datetime import datetime, timedelta, timezone
67
from http import HTTPStatus
78
from importlib import resources
9+
from urllib.parse import quote
810

911
import requests
1012

@@ -13,6 +15,9 @@
1315

1416
SCHEMAS_DIR = resources.files(__package__) / "schemas"
1517

18+
# Made for the Payroll ACC Class to skip the error when mass processing is disabled
19+
class SkippableAPIError(Exception):
20+
pass
1621

1722
class WorkersStream(PaginatedADPStream):
1823
"""Define custom stream."""
@@ -195,3 +200,102 @@ def post_process(
195200
"""
196201
row["_sdc_namecode_code"] = row["nameCode"]["code"]
197202
return row
203+
204+
class PayDataInputStream(ADPStream):
205+
name = "pay_data_input"
206+
path = "/payroll/v1/pay-data-input"
207+
primary_keys = []
208+
records_jsonpath = "$.payDataInput[*]"
209+
schema_filepath = SCHEMAS_DIR / "pay_data_input.json"
210+
211+
class PayrollOutputStream(ADPStream):
212+
name = "payroll_output"
213+
path = "/payroll/v2/payroll-output"
214+
primary_keys = ["itemID"]
215+
replication_key = "_sdc_modified_schedule_entry_id"
216+
records_jsonpath = "$.payrollOutputs[*]" #There's a root level processMessages key that has metaData about the corresponding payroll(s) might be useful, ignoring for now to move forward quickly
217+
schema_filepath = SCHEMAS_DIR / "payroll_output.json"
218+
219+
def get_child_context(self, record, context):
220+
return {
221+
"_sdc_payroll_item_id": record["itemID"]
222+
}
223+
224+
def get_url_params( # noqa: PLR6301
225+
self,
226+
context,
227+
next_page_token
228+
) -> dict[str, t.Any] | str:
229+
# Date 30 days ago
230+
date = self.get_starting_timestamp(context)
231+
date_str = date.strftime("%Y%m%d")
232+
self.logger.info(f"Payroll is using 'payPeriodEndDate ge {date_str}'")
233+
return {
234+
"$filter": f"payPeriodEndDate ge {date_str}"
235+
}
236+
237+
def post_process(self, record, context):
238+
# We subtract 30 days as recent payrolls are not available to pull
239+
# There could be a case where a payroll completes that's more recent than payrolls that havne't been completed yet so we want to play it safe and try to get them all
240+
# This gives us a good chance of pulling all the most recent payrolls
241+
record["_sdc_modified_schedule_entry_id"] = datetime.strptime(record["payrollScheduleReference"]["scheduleEntryID"][:8], "%Y%m%d")-timedelta(days=30)
242+
return record
243+
244+
class PayrollOutputAccStream(ADPStream):
245+
name = "payroll_output_acc"
246+
path = "/payroll/v2/payroll-output"
247+
primary_keys = ["itemID"]
248+
records_jsonpath = "$.payrollOutputs[*]"
249+
schema_filepath = SCHEMAS_DIR / "payroll_output_acc.json"
250+
parent_stream_type=PayrollOutputStream
251+
252+
def get_url_params( # noqa: PLR6301
253+
self,
254+
context,
255+
next_page_token
256+
) -> dict[str, t.Any] | str:
257+
# Today's date
258+
return {
259+
"level": "acc-all",
260+
"$filter": f"itemID eq {context['_sdc_payroll_item_id']}"
261+
}
262+
263+
def validate_response(self, response: requests.Response) -> None:
264+
if response.status_code == 400 and response.json().get("confirmMessage", {}).get("processMessages"):
265+
process_messages = response.json().get("confirmMessage", {}).get("processMessages")
266+
for process_message in process_messages:
267+
dev_message = process_message["developerMessage"]["messageTxt"]
268+
codeValue = process_message["developerMessage"]["codeValue"]
269+
if dev_message == "Mass Processing is currently Disabled.":
270+
exception_message = "Mass Processing is currently Disabled."
271+
self.logger.warning(exception_message)
272+
raise SkippableAPIError(exception_message)
273+
if codeValue == "PAYGEN00030": #The payroll job id provided was in an invalid state (EDL, DAT, PVE, NER, EER, etc).
274+
exception_message = f"The payroll job id provided was in an invalid state ({dev_message})."
275+
self.logger.warning(exception_message)
276+
raise SkippableAPIError(exception_message)
277+
# Default handling if this isn't hit
278+
super().validate_response(response)
279+
280+
def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
281+
"""Return a generator of record-type dictionary objects.
282+
283+
Each record emitted should be a dictionary of property names to their values.
284+
285+
Args:
286+
context: Stream partition or context dictionary.
287+
288+
Yields:
289+
One item per (possibly processed) record in the API.
290+
"""
291+
try:
292+
for record in self.request_records(context):
293+
transformed_record = self.post_process(record, context)
294+
if transformed_record is None:
295+
# Record filtered out during post_process()
296+
continue
297+
yield transformed_record
298+
# Works because this is a child stream of PayrollOutputStream and only has one record
299+
except SkippableAPIError:
300+
self.logger.warning("Mass Processing is currently Disabled.")
301+
return

tap_adp/tap.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ class TapADP(Tap):
4646
"'<tap_name>/<tap_version>'"
4747
),
4848
),
49+
th.Property(
50+
"start_date",
51+
th.DateTimeType,
52+
description="The start date to sync from",
53+
),
4954
).to_dict()
5055

5156
def discover_streams(self) -> list[streams.ADPStream]:
@@ -65,6 +70,8 @@ def discover_streams(self) -> list[streams.ADPStream]:
6570
streams.JobApplicationStream(self),
6671
streams.QuestionnaireStream(self),
6772
streams.DepartmentValidationStream(self),
73+
streams.PayrollOutputStream(self),
74+
streams.PayrollOutputAccStream(self),
6875
]
6976

7077

0 commit comments

Comments
 (0)