11import json
22import logging
3+ import re
34from dataclasses import dataclass
5+ from datetime import datetime , timezone
46from functools import partial
5- from typing import Any , Dict , Iterable , List , Optional , Set , Tuple
7+ from typing import Any , Dict , Iterable , Iterator , List , Optional , Set , Tuple
68
7- import pyodata
8- import pyodata .v2 .model
9- import pyodata .v2 .service
109from authlib .integrations .requests_client import OAuth2Session
1110from pydantic import Field , SecretStr , field_validator
1211from requests .adapters import HTTPAdapter
8685
8786logger = logging .getLogger (__name__ )
8887
88+ # SAP Analytics Cloud serializes dates in OData verbose-JSON as "/Date(<ms-since-epoch>[±<offset>])/".
89+ _SAC_JSON_DATE_PATTERN = re .compile (r"^/Date\((?P<ms>-?\d+)(?P<offset>[+-]\d+)?\)/$" )
90+
8991
9092class ConnectionMappingConfig (EnvConfigMixin ):
9193 platform : Optional [str ] = Field (
@@ -192,17 +194,19 @@ class SACSource(StatefulIngestionSourceBase, TestableSource):
192194 platform = "sac"
193195
194196 session : OAuth2Session
195- client : pyodata .Client
196197
197- ingested_dataset_entities : Set [str ] = set ()
198- ingested_upstream_dataset_keys : Set [str ] = set ()
198+ ingested_dataset_entities : Set [str ]
199+ ingested_upstream_dataset_keys : Set [str ]
199200
200201 def __init__ (self , config : SACSourceConfig , ctx : PipelineContext ):
201202 super ().__init__ (config , ctx )
202203 self .config = config
203204 self .report = SACSourceReport ()
204205
205- self .session , self .client = SACSource .get_sac_connection (self .config )
206+ self .ingested_dataset_entities = set ()
207+ self .ingested_upstream_dataset_keys = set ()
208+
209+ self .session = SACSource .get_sac_connection (self .config )
206210
207211 def close (self ) -> None :
208212 self .session .close ()
@@ -220,10 +224,16 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
220224 try :
221225 config = SACSourceConfig .model_validate (config_dict )
222226
223- # when creating the pyodata.Client, the metadata is automatically parsed and validated
224- session , _ = SACSource .get_sac_connection (config )
227+ session = SACSource .get_sac_connection (config )
228+
229+ # test the Resources API and the Data Import Service separately here, because the Data
230+ # Import Service requires specific properties when configuring the OAuth client
231+ response = session .get (
232+ url = f"{ config .tenant_url } /api/v1/Resources" ,
233+ params = {"$format" : "json" , "$top" : "1" },
234+ )
235+ response .raise_for_status ()
225236
226- # test the Data Import Service separately here, because it requires specific properties when configuring the OAuth client
227237 response = session .get (url = f"{ config .tenant_url } /api/v1/dataimport/models" )
228238 response .raise_for_status ()
229239
@@ -549,7 +559,7 @@ def get_model_workunits(
549559 @staticmethod
550560 def get_sac_connection (
551561 config : SACSourceConfig ,
552- ) -> Tuple [ OAuth2Session , pyodata . Client ] :
562+ ) -> OAuth2Session :
553563 session = OAuth2Session (
554564 client_id = config .client_id ,
555565 client_secret = config .client_secret .get_secret_value (),
@@ -586,13 +596,39 @@ def get_sac_connection(
586596 )
587597 session .fetch_token ()
588598
589- client = pyodata .Client (
590- url = f"{ config .tenant_url } /api/v1" ,
591- connection = session ,
592- config = pyodata .v2 .model .Config (retain_null = True ),
593- )
599+ return session
600+
601+ def _query_odata_entities (
602+ self , path : str , select : str , filter : Optional [str ] = None
603+ ) -> Iterator [Dict [str , Any ]]:
604+ # We query the OData endpoints directly instead of going through a metadata-driven OData
605+ # client. The "Resources" data endpoints are stable across SAC tenant generations, whereas
606+ # the $metadata document is not: newer (CAP-based) tenants no longer advertise the
607+ # "Resources" EntitySet there (it is replaced by a non-queryable "*_INDEX" catalog), which
608+ # would break any client that resolves endpoints from $metadata. See ING-2650.
609+ query : Dict [str , str ] = {"$format" : "json" , "$select" : select }
610+ if filter is not None :
611+ query ["$filter" ] = filter
612+
613+ url : Optional [str ] = f"{ self .config .tenant_url } /api/v1/{ path } "
614+ params : Optional [Dict [str , str ]] = query
615+
616+ while url is not None :
617+ response = self .session .get (url = url , params = params )
618+ response .raise_for_status ()
594619
595- return session , client
620+ # OData verbose JSON always wraps the payload in a top-level "d"; a missing key means an
621+ # unexpected response, which we want to surface rather than silently ingest nothing.
622+ payload = response .json ()["d" ]
623+ if isinstance (payload , dict ):
624+ yield from payload .get ("results" , [])
625+ # follow server-driven paging; "__next" is an absolute URL with the query baked in
626+ url = payload .get ("__next" )
627+ else :
628+ yield from payload
629+ url = None
630+
631+ params = None
596632
597633 def get_resources (self ) -> Iterable [Resource ]:
598634 import_data_model_ids = self .get_import_data_model_ids ()
@@ -609,19 +645,12 @@ def get_resources(self) -> Iterable[Resource]:
609645
610646 select = "resourceId,resourceType,resourceSubtype,storyId,name,description,createdTime,createdBy,modifiedBy,modifiedTime,openURL,ancestorPath,isMobile"
611647
612- entities : pyodata .v2 .service .ListWithTotalCount = (
613- self .client .entity_sets .Resources .get_entities ()
614- .custom ("$format" , "json" )
615- .filter (filter )
616- .select (select )
617- .execute ()
618- )
619- entity : pyodata .v2 .service .EntityProxy
620- for entity in entities :
621- resource_id : str = entity .resourceId
622- name : str = (
623- entity .name .strip () if entity .name is not None else entity .resourceId
624- )
648+ for entity in self ._query_odata_entities (
649+ "Resources" , select = select , filter = filter
650+ ):
651+ resource_id : str = entity ["resourceId" ]
652+ entity_name = entity .get ("name" )
653+ name : str = entity_name .strip () if entity_name is not None else resource_id
625654
626655 if not self .config .resource_id_pattern .allowed (
627656 resource_id
@@ -630,78 +659,84 @@ def get_resources(self) -> Iterable[Resource]:
630659
631660 ancestor_path : Optional [str ] = None
632661
633- try :
634- ancestors = json .loads (entity .ancestorPath )
635- ancestor_path = "/" .join (
636- ancestor .replace ("/" , "%2F" ) for ancestor in ancestors
637- )
638- except json .JSONDecodeError :
639- pass
662+ ancestor_path_raw = entity .get ("ancestorPath" )
663+ if ancestor_path_raw :
664+ try :
665+ ancestors = json .loads (ancestor_path_raw )
666+ ancestor_path = "/" .join (
667+ ancestor .replace ("/" , "%2F" ) for ancestor in ancestors
668+ )
669+ except json .JSONDecodeError :
670+ pass
640671
641672 if ancestor_path and not self .config .folder_pattern .allowed (ancestor_path ):
642673 continue
643674
644675 resource_models : Set [ResourceModel ] = set ()
645676
646- select = "modelId,name,description,externalId,connectionId,systemType"
647-
648- nav_entities : pyodata .v2 .service .EntitySetProxy = (
649- entity .nav ("resourceModels" )
650- .get_entities ()
651- .custom ("$format" , "json" )
652- .select (select )
653- .execute ()
677+ models_select = (
678+ "modelId,name,description,externalId,connectionId,systemType"
654679 )
655- nav_entity : pyodata .v2 .service .EntityProxy
656- for nav_entity in nav_entities :
680+
681+ # OData string keys escape a single quote by doubling it
682+ escaped_resource_id = resource_id .replace ("'" , "''" )
683+ for nav_entity in self ._query_odata_entities (
684+ f"Resources('{ escaped_resource_id } ')/resourceModels" ,
685+ select = models_select ,
686+ ):
657687 # the model id can have a different structure, commonly all model ids have a namespace (the part before the colon) and the model id itself
658688 # t.4.sap.fpa.services.userFriendlyPerfLog:ACTIVITY_LOG is a builtin model without a possiblity to get more metadata about the model
659689 # t.4.YV67EM4QBRU035A7TVKERZ786N:YV67EM4QBRU035A7TVKERZ786N is a model id where the model id itself also appears as part of the namespace
660690 # t.4:C76tt2j402o1e69wnvrwfcl79c is a model id without the model id itself as part of the namespace
661- model_id : str = nav_entity . modelId
691+ model_id : str = nav_entity [ " modelId" ]
662692 namespace , _ , model_id = model_id .partition (":" )
663693
694+ nav_name = nav_entity .get ("name" )
695+ nav_description = nav_entity .get ("description" )
696+
664697 resource_models .add (
665698 ResourceModel (
666699 namespace = namespace ,
667700 model_id = model_id ,
668- name = nav_entity . name .strip ()
669- if nav_entity . name is not None
701+ name = nav_name .strip ()
702+ if nav_name is not None
670703 else f"{ namespace } :{ model_id } " ,
671- description = nav_entity . description .strip ()
672- if nav_entity . description is not None
704+ description = nav_description .strip ()
705+ if nav_description is not None
673706 else None ,
674- system_type = nav_entity .systemType , # BW or HANA
675- connection_id = nav_entity .connectionId ,
676- external_id = nav_entity .externalId , # query:[][][query] or view:[schema][schema.namespace][view]
707+ system_type = nav_entity .get ("systemType" ), # BW or HANA
708+ connection_id = nav_entity .get ("connectionId" ),
709+ external_id = nav_entity .get (
710+ "externalId"
711+ ), # query:[][][query] or view:[schema][schema.namespace][view]
677712 is_import = model_id in import_data_model_ids ,
678713 )
679714 )
680715
681- created_by : Optional [str ] = entity .createdBy
716+ created_by : Optional [str ] = entity .get ( " createdBy" )
682717 if created_by in ("SYSTEM" , "$DELETED_USER$" ):
683718 created_by = None
684719
685- modified_by : Optional [str ] = entity .modifiedBy
720+ modified_by : Optional [str ] = entity .get ( " modifiedBy" )
686721 if modified_by in ("SYSTEM" , "$DELETED_USER$" ):
687722 modified_by = None
688723
724+ description = entity .get ("description" )
725+
689726 yield Resource (
690727 resource_id = resource_id ,
691- resource_type = entity . resourceType ,
692- resource_subtype = entity . resourceSubtype ,
693- story_id = entity . storyId ,
728+ resource_type = entity [ " resourceType" ] ,
729+ resource_subtype = entity [ " resourceSubtype" ] ,
730+ story_id = entity [ " storyId" ] ,
694731 name = name ,
695- description = entity .description .strip ()
696- if entity .description is not None
697- else None ,
698- created_time = entity .createdTime ,
732+ description = description .strip () if description is not None else None ,
733+ created_time = _parse_sac_datetime (entity ["createdTime" ]),
699734 created_by = created_by ,
700- modified_time = entity . modifiedTime ,
735+ modified_time = _parse_sac_datetime ( entity [ " modifiedTime" ]) ,
701736 modified_by = modified_by ,
702- open_url = entity . openURL ,
737+ open_url = entity [ " openURL" ] ,
703738 ancestor_path = ancestor_path ,
704- is_mobile = entity . isMobile ,
739+ is_mobile = entity [ " isMobile" ] ,
705740 resource_models = frozenset (resource_models ),
706741 )
707742
@@ -797,3 +832,13 @@ def _add_sap_sac_custom_auth_header(
797832) -> Tuple [str , Dict [str , str ], Any ]:
798833 headers ["x-sap-sac-custom-auth" ] = "true"
799834 return url , headers , body
835+
836+
837+ def _parse_sac_datetime (value : str ) -> datetime :
838+ match = _SAC_JSON_DATE_PATTERN .match (value )
839+ if match is None :
840+ raise ValueError (f"Unexpected SAP Analytics Cloud date format: { value !r} " )
841+
842+ # The millisecond value is an absolute instant (epoch-relative); an optional ±offset only
843+ # affects the displayed wall-clock time, not the instant, so it does not change the UTC value.
844+ return datetime .fromtimestamp (int (match .group ("ms" )) / 1000 , tz = timezone .utc )
0 commit comments