Skip to content

Commit 22400c7

Browse files
authored
Merge pull request #1 from yugabyte/sgarg/add-ybdb-connector-to-unstructured
[GROWENG-124][GROWENG-126] Adding yugabytedb as connector in unstructured.io
2 parents dbc85eb + 1896c7e commit 22400c7

File tree

4 files changed

+182
-0
lines changed

4 files changed

+182
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ vastdb = ["requirements/connectors/vastdb.txt"]
7575
vectara = ["requirements/connectors/vectara.txt"]
7676
weaviate = ["requirements/connectors/weaviate.txt"]
7777
wikipedia = ["requirements/connectors/wikipedia.txt"]
78+
yugabytedb = ["requirements/connectors/yugabytedb.txt"]
7879
zendesk = ["requirements/connectors/zendesk.txt"]
7980

8081
# Embedders
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pandas
2+
psycopg2-yugabytedb

unstructured_ingest/processes/connectors/sql/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
from .sqlite import sqlite_destination_entry, sqlite_source_entry
1818
from .vastdb import CONNECTOR_TYPE as VASTDB_CONNECTOR_TYPE
1919
from .vastdb import vastdb_destination_entry, vastdb_source_entry
20+
from .yugabytedb import CONNECTOR_TYPE as YUGABYTE_DB_CONNECTOR_TYPE
21+
from .yugabytedb import yugabytedb_destination_entry, yugabytedb_source_entry
2022

2123
add_source_entry(source_type=SQLITE_CONNECTOR_TYPE, entry=sqlite_source_entry)
2224
add_source_entry(source_type=POSTGRES_CONNECTOR_TYPE, entry=postgres_source_entry)
2325
add_source_entry(source_type=SNOWFLAKE_CONNECTOR_TYPE, entry=snowflake_source_entry)
2426
add_source_entry(source_type=SINGLESTORE_CONNECTOR_TYPE, entry=singlestore_source_entry)
2527
add_source_entry(source_type=VASTDB_CONNECTOR_TYPE, entry=vastdb_source_entry)
28+
add_source_entry(source_type=YUGABYTE_DB_CONNECTOR_TYPE, entry=yugabytedb_source_entry)
2629

2730
add_destination_entry(destination_type=SQLITE_CONNECTOR_TYPE, entry=sqlite_destination_entry)
2831
add_destination_entry(destination_type=POSTGRES_CONNECTOR_TYPE, entry=postgres_destination_entry)
@@ -35,3 +38,4 @@
3538
entry=databricks_delta_tables_destination_entry,
3639
)
3740
add_destination_entry(destination_type=VASTDB_CONNECTOR_TYPE, entry=vastdb_destination_entry)
41+
add_destination_entry(destination_type=YUGABYTE_DB_CONNECTOR_TYPE, entry=yugabytedb_destination_entry)
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from contextlib import contextmanager
2+
from dataclasses import dataclass, field
3+
from pathlib import Path
4+
from typing import TYPE_CHECKING, Any, Generator, Optional
5+
6+
from pydantic import Field, Secret
7+
8+
from unstructured_ingest.data_types.file_data import FileData
9+
from unstructured_ingest.logger import logger
10+
from unstructured_ingest.processes.connector_registry import (
11+
DestinationRegistryEntry,
12+
SourceRegistryEntry,
13+
)
14+
from unstructured_ingest.processes.connectors.sql.sql import (
15+
SQLAccessConfig,
16+
SqlBatchFileData,
17+
SQLConnectionConfig,
18+
SQLDownloader,
19+
SQLDownloaderConfig,
20+
SQLIndexer,
21+
SQLIndexerConfig,
22+
SQLUploader,
23+
SQLUploaderConfig,
24+
SQLUploadStager,
25+
SQLUploadStagerConfig,
26+
)
27+
from unstructured_ingest.utils.dep_check import requires_dependencies
28+
29+
if TYPE_CHECKING:
30+
from psycopg2.extensions import connection as YugabyteDbConnection
31+
from psycopg2.extensions import cursor as YugabyteDbCursor
32+
33+
CONNECTOR_TYPE = "yugabytedb"
34+
35+
36+
class YugabyteDbAccessConfig(SQLAccessConfig):
37+
password: Optional[str] = Field(default=None, description="DB password")
38+
39+
40+
class YugabyteDbConnectionConfig(SQLConnectionConfig):
41+
access_config: Secret[YugabyteDbAccessConfig] = Field(
42+
default=YugabyteDbAccessConfig(), validate_default=True
43+
)
44+
database: Optional[str] = Field(
45+
default=None,
46+
description="Database name.",
47+
)
48+
username: Optional[str] = Field(default=None, description="DB username")
49+
host: Optional[str] = Field(default=None, description="DB host")
50+
port: Optional[int] = Field(default=5432, description="DB host connection port")
51+
load_balance: Optional[str] = Field(default="False", description="Load balancing strategy")
52+
topology_keys: Optional[str] = Field(default="", description="Topology keys")
53+
yb_servers_refresh_interval: Optional[int] = Field(default=300,
54+
description="YB servers refresh interval")
55+
connector_type: str = Field(default=CONNECTOR_TYPE, init=False)
56+
57+
@contextmanager
58+
@requires_dependencies(["psycopg2"], extras="yugabytedb")
59+
def get_connection(self) -> Generator["YugabyteDbConnection", None, None]:
60+
from psycopg2 import connect
61+
62+
access_config = self.access_config.get_secret_value()
63+
connection = connect(
64+
user=self.username,
65+
password=access_config.password,
66+
dbname=self.database,
67+
host=self.host,
68+
port=self.port,
69+
load_balance=self.load_balance,
70+
topology_keys=self.topology_keys,
71+
yb_servers_refresh_interval=self.yb_servers_refresh_interval,
72+
)
73+
try:
74+
yield connection
75+
finally:
76+
connection.commit()
77+
connection.close()
78+
79+
@contextmanager
80+
def get_cursor(self) -> Generator["YugabyteDbCursor", None, None]:
81+
with self.get_connection() as connection:
82+
cursor = connection.cursor()
83+
try:
84+
yield cursor
85+
finally:
86+
cursor.close()
87+
88+
89+
class YugabyteDbIndexerConfig(SQLIndexerConfig):
90+
pass
91+
92+
93+
@dataclass
94+
class YugabyteDbIndexer(SQLIndexer):
95+
connection_config: YugabyteDbConnectionConfig
96+
index_config: YugabyteDbIndexerConfig
97+
connector_type: str = CONNECTOR_TYPE
98+
99+
100+
class YugabyteDbDownloaderConfig(SQLDownloaderConfig):
101+
pass
102+
103+
104+
@dataclass
105+
class YugabyteDbDownloader(SQLDownloader):
106+
connection_config: YugabyteDbConnectionConfig
107+
download_config: YugabyteDbDownloaderConfig
108+
connector_type: str = CONNECTOR_TYPE
109+
110+
@requires_dependencies(["psycopg2"], extras="yugabytedb")
111+
def query_db(self, file_data: SqlBatchFileData) -> tuple[list[tuple], list[str]]:
112+
from psycopg2 import sql
113+
114+
table_name = file_data.additional_metadata.table_name
115+
id_column = file_data.additional_metadata.id_column
116+
ids = tuple([item.identifier for item in file_data.batch_items])
117+
118+
with self.connection_config.get_cursor() as cursor:
119+
fields = (
120+
sql.SQL(",").join(sql.Identifier(field) for field in self.download_config.fields)
121+
if self.download_config.fields
122+
else sql.SQL("*")
123+
)
124+
125+
query = sql.SQL("SELECT {fields} FROM {table_name} WHERE {id_column} IN %s").format(
126+
fields=fields,
127+
table_name=sql.Identifier(table_name),
128+
id_column=sql.Identifier(id_column),
129+
)
130+
logger.debug(f"running query: {cursor.mogrify(query, (ids,))}")
131+
cursor.execute(query, (ids,))
132+
rows = cursor.fetchall()
133+
columns = [col[0] for col in cursor.description]
134+
return rows, columns
135+
136+
137+
class YugabyteDbUploadStagerConfig(SQLUploadStagerConfig):
138+
pass
139+
140+
141+
class YugabyteDbUploadStager(SQLUploadStager):
142+
upload_stager_config: YugabyteDbUploadStagerConfig
143+
144+
145+
class YugabyteDbUploaderConfig(SQLUploaderConfig):
146+
pass
147+
148+
149+
@dataclass
150+
class YugabyteDbUploader(SQLUploader):
151+
upload_config: YugabyteDbUploaderConfig = field(default_factory=YugabyteDbUploaderConfig)
152+
connection_config: YugabyteDbConnectionConfig
153+
connector_type: str = CONNECTOR_TYPE
154+
values_delimiter: str = "%s"
155+
156+
@requires_dependencies(["pandas"], extras="yugabytedb")
157+
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
158+
super().run(path=path, file_data=file_data, **kwargs)
159+
160+
161+
yugabytedb_source_entry = SourceRegistryEntry(
162+
connection_config=YugabyteDbConnectionConfig,
163+
indexer_config=YugabyteDbIndexerConfig,
164+
indexer=YugabyteDbIndexer,
165+
downloader_config=YugabyteDbDownloaderConfig,
166+
downloader=YugabyteDbDownloader,
167+
)
168+
169+
yugabytedb_destination_entry = DestinationRegistryEntry(
170+
connection_config=YugabyteDbConnectionConfig,
171+
uploader=YugabyteDbUploader,
172+
uploader_config=YugabyteDbUploaderConfig,
173+
upload_stager=YugabyteDbUploadStager,
174+
upload_stager_config=YugabyteDbUploadStagerConfig,
175+
)

0 commit comments

Comments
 (0)