11"""notion target sink class, which handles writing streams."""
22
33from __future__ import annotations
4+
45import os
6+
57from caseconverter import snakecase
68from notion_client import Client
79from notion_client .errors import HTTPResponseError
@@ -18,18 +20,20 @@ def __init__(self, **kwargs) -> None: # noqa: ANN003
1820 """Initialize the sink."""
1921 super ().__init__ (** kwargs )
2022 self .client = Client (auth = self .config ["api_key" ])
21- database_mapping = {
22- mapping ["extractor_namespace" ]: {mapping ["stream_name" ]: mapping ["database_id " ]}
23+ data_source_mapping = {
24+ mapping ["extractor_namespace" ]: {mapping ["stream_name" ]: mapping ["data_source_id " ]}
2325 for mapping in self .config ["streams" ]
2426 }
25- self .database_id = database_mapping .get (os .environ .get ("MELTANO_EXTRACTOR_NAMESPACE" ), {}).get (self .stream_name )
26- if not self .database_id :
27- msg = f"Database ID not found for stream { self .stream_name } ."
27+ self .data_source_id = data_source_mapping .get (os .environ .get ("MELTANO_EXTRACTOR_NAMESPACE" ), {}).get (
28+ self .stream_name
29+ )
30+ if not self .data_source_id :
31+ msg = f"Data source ID not found for stream { self .stream_name } ."
2832 raise ValueError (msg )
29- self .database_schema = self .get_database_schema ()
33+ self .data_source_schema = self .get_data_source_schema ()
3034 self .key_property = self .key_properties [0 ]
3135 self .snake_key_property = snakecase (self .key_property )
32- self .database_key_property = self .database_schema [self .snake_key_property ]["name" ]
36+ self .data_source_key_property = self .data_source_schema [self .snake_key_property ]["name" ]
3337
3438 def process_batch (self , context : dict ) -> None :
3539 """Process a batch with the given batch context.
@@ -54,26 +58,26 @@ def process_batch(self, context: dict) -> None:
5458 self .create_page (record )
5559
5660 def get_existing_pages (self , records : list [dict ]) -> list :
57- """Get existing pages in the database ."""
61+ """Get existing pages in the data_source ."""
5862 _filter = {
5963 "or" : [
60- {"property" : self .database_key_property , "title" : {"equals" : record [self .snake_key_property ]}}
64+ {"property" : self .data_source_key_property , "title" : {"equals" : record [self .snake_key_property ]}}
6165 for record in records
6266 ]
6367 }
6468 has_more = True
6569 start_cursor = None
6670 existing_pages = {}
6771 while has_more :
68- pages = self .client .databases .query (
69- database_id = self .database_id ,
72+ pages = self .client .data_sources .query (
73+ data_source_id = self .data_source_id ,
7074 start_cursor = start_cursor ,
7175 filter_properties = [],
7276 filter = _filter ,
7377 )
7478 existing_pages .update (
7579 {
76- page ["properties" ][self .database_key_property ]["rich_text" ][0 ]["text" ]["content" ]: page ["id" ]
80+ page ["properties" ][self .data_source_key_property ]["rich_text" ][0 ]["text" ]["content" ]: page ["id" ]
7781 for page in pages ["results" ]
7882 }
7983 )
@@ -90,19 +94,17 @@ def create_page(self, record: dict) -> None:
9094 context: Stream partition or context dictionary.
9195 """
9296 self .client .pages .create (
93- parent = {"database_id " : self .database_id },
97+ parent = {"data_source_id " : self .data_source_id },
9498 properties = self .create_page_properties (record ),
9599 )
96100
97- def get_database_schema (self ) -> dict :
98- """Get the database schema.
101+ def get_data_source_schema (self ) -> dict :
102+ """Get the data_source schema.
99103
100104 Returns:
101- dict: The database schema.
105+ dict: The data_source schema.
102106 """
103- db = self .client .databases .retrieve (self .database_id )
104- data_source_id = db ["data_sources" ][0 ]["id" ]
105- data_source = self .client .data_sources .retrieve (data_source_id )
107+ data_source = self .client .data_sources .retrieve (self .data_source_id )
106108 return {
107109 snakecase (name ): {"name" : name , "type" : _property ["type" ]}
108110 for name , _property in data_source ["properties" ].items ()
@@ -118,11 +120,11 @@ def create_page_properties(self, record: dict) -> dict:
118120 dict: The page properties.
119121 """
120122 return {
121- self .database_schema .get (key , {})["name" ]: self .create_page_property (
122- self .database_schema .get (key , {})["type" ], value
123+ self .data_source_schema .get (key , {})["name" ]: self .create_page_property (
124+ self .data_source_schema .get (key , {})["type" ], value
123125 )
124126 for key , value in record .items ()
125- if key in self .database_schema and value
127+ if key in self .data_source_schema and value
126128 }
127129
128130 def create_page_property (self , _type : str , value ) -> dict :
0 commit comments