Skip to content

Feature( connector-sdk ) : Example for fetching data from IBM DB2 database #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ There are several examples available under `/examples`:
- This example shows how to sync data from Google Cloud Pub/Sub using the Connector SDK.
- **[hubspot](https://github.com/fivetran/fivetran_connector_sdk/tree/main/examples/source_examples/hubspot)**
- This example shows how to connect and sync specific event type Event data from Hubspot using Connector SDK
- **[ibm_db2](https://github.com/fivetran/fivetran_connector_sdk/tree/main/examples/source_examples/ibm_db2)**
- This example shows how to connect and sync data from IBM DB2 using Connector SDK. It uses the `ibm_db` library to connect to the database and fetch data.
- **[newsapi](https://github.com/fivetran/fivetran_connector_sdk/tree/main/examples/source_examples/newsapi)**
- This is a simple example of how to sync data from NewsAPI using Connector SDK.
- **[oauth2_and_accelo_api_connector_multithreading_enabled](https://github.com/fivetran/fivetran_connector_sdk/tree/main/examples/source_examples/oauth2_and_accelo_api_connector_multithreading_enabled)**
Expand Down
42 changes: 42 additions & 0 deletions examples/source_examples/ibm_db2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# IBM DB2 Connector

This connector allows you to sync data from IBM DB2 to a destination using the Fivetran Connector SDK.

The IBM DB2 connector establishes a connection to your DB2 database, reads data from tables, and incrementally syncs changes using timestamp-based tracking. This example connector demonstrates extracting product data but can be modified to work with any DB2 tables.

### Configuration

Update `configuration.json` with your IBM DB2 details:

```json
{
"hostname" : "<YOUR_DB2_HOSTNAME>",
"port" : "<YOUR_DB2_PORT>",
"database" : "<YOUR_DB2_DATABASE_NAME>",
"user_id" : "<YOUR_DB2_USER_ID>",
"password" : "<YOUR_DB2_PASSWORD>",
"protocol" : "<YOUR_DB2_PROTOCOL>"
}
```
The configuration parameters are:
- `hostname`: The hostname of your IBM DB2 server
- `port`: The port number for the DB2 connection
- `database`: The name of the DB2 database
- `user_id`: The username to authenticate with DB2
- `password`: The password to authenticate with DB2
- `protocol`: The protocol to use for the connection (default is `TCPIP`)

### Customizing for Your Use Case

To adapt this connector for your needs:

1. **Database Connection**: Replace the placeholders in `configuration.json` with your actual DB2 connection details.
2. **Table Selection**: Update the `schema()` method to define the tables. The example uses a `products` table, but you can modify it to include your tables.
3. **Upsert Logic**: Modify the `update()` method to match your upsert logic. The example uses a simple fetch and upsert approach, but you can customize it to fit your needs.
4. You can add more tables to the `schema()` method and implement the corresponding upsert logic in the `update()` method.
5. The connector has methods for creating sample database and table and inserting test data into the table, but these are only for testing purposes. In a real-world scenario, you would typically have the tables already created in your destination. These methods are:
- `sample_data_to_insert()`: This method is used to create sample data to be inserted into the table.
- `create_sample_table_into_db()`: This method is used to create a sample table in the database.
- `insert_sample_data_into_table()`: This method is used to insert sample data into the table.

> IMPORTANT: This example code has not been fully tested in all environments. If you encounter any issues implementing this connector, please use Fivetran's [Save Me Time](https://support.fivetran.com/hc/en-us/requests/new?isSdkIssue=true) service. Our support team will be happy to help troubleshoot and finalize any issues with this example.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? We should at least check our examples thoroughly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testing has been done using the local DB2 server ( setting host as localhost ). It works fine.
I could not test it with fivetran deploy due to lack of credentials.
Discussed with Alison : https://fivetran.height.app/T-930742#activityId=526703f3-cd44-40a3-b9a0-cf826debb036
Requested for a free tier IBM Cloud credentials. Once it is there, we can test it with the deploy command

8 changes: 8 additions & 0 deletions examples/source_examples/ibm_db2/configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"hostname" : "<YOUR_DB2_HOSTNAME>",
"port" : "<YOUR_DB2_PORT>",
"database" : "<YOUR_DB2_DATABASE_NAME>",
"user_id" : "<YOUR_DB2_USER_ID>",
"password" : "<YOUR_DB2_PASSWORD>",
"protocol" : "<YOUR_DB2_PROTOCOL>"
}
291 changes: 291 additions & 0 deletions examples/source_examples/ibm_db2/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# This is a simple example for how to work with the fivetran_connector_sdk module.
# It defines an `update` method, which upserts data from an IBM DB2 database.
# See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update)
# and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details

# Import required classes from fivetran_connector_sdk
from fivetran_connector_sdk import Connector # For supporting Connector operations like Update() and Schema()
from fivetran_connector_sdk import Logging as log # For enabling Logs in your connector code
from fivetran_connector_sdk import Operations as op # For supporting Data operations like Upsert(), Update(), Delete() and checkpoint()

# Import the ibm_db module for connecting to IBM DB2
# The ibm_db contains:
# ibm_db driver: Python driver for IBM Db2 for LUW and IBM Db2 for z/OS databases.
# Uses the IBM Data Server Driver for ODBC and CLI APIs to connect to IBM Db2 for LUW.
# ibm_db_dbi: Python driver for IBM Db2 for LUW that complies to the DB-API 2.0 specification.
import ibm_db
import json


# Define the schema function which lets you configure the schema your connector delivers.
# See the technical reference documentation for more details on the schema function:
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema
# The schema function takes one parameter:
# - configuration: a dictionary that holds the configuration settings for the connector.
def schema(configuration: dict):
# Check if the configuration dictionary has all the required keys
required_keys = ["hostname", "port", "database", "user_id", "password", "protocol"]
for key in required_keys:
if key not in configuration:
if key=="protocol":
log.warning("Missing protocol configuration key, defaulting to TCPIP")
else:
raise ValueError(f"Missing required configuration key: {key}")

return [
{
"table": "products",
"primary_key": ["product_id"]
}
]


# This method is used to create a connection string for the IBM DB2 database.
# This takes the configuration dictionary as an argument and extracts the necessary parameters to create the connection string.
def get_connection_string(configuration: dict):
# Extract the necessary parameters from the configuration dictionary
hostname = configuration.get("hostname")
port = configuration.get("port")
database = configuration.get("database")
user_id = configuration.get("user_id")
password = configuration.get("password")
protocol = configuration.get("protocol", "TCPIP")

# return the connection string
return (
f"DATABASE={database};"
f"HOSTNAME={hostname};"
f"PORT={port};"
f"PROTOCOL={protocol};"
f"UID={user_id};"
f"PWD={password};"
)


# This method is used to establish a connection to the IBM DB2 database.
# It takes the configuration dictionary as an argument and uses the get_connection_string method to create the connection string.
# It then attempts to connect to the database using the ibm_db module.
# If the connection is successful, it returns the connection object.
# If the connection fails, it raises a RuntimeError with the error message.
def connect_to_db(configuration: dict):
# Get the connection string
conn_str = get_connection_string(configuration)

# Connect to the database
try:
conn = ibm_db.connect(conn_str, "", "")
log.info("Connected to database successfully!")
return conn
except Exception as e:
log.severe(f"Connection failed: {e}")
raise RuntimeError("Connection failed") from e


# This method is used to create a sample data set to be inserted into the database.
# It returns a list of dictionaries, where each dictionary represents a product
# This is a test data to successfully run the connector.
# This method should be removed in production code.
def sample_data_to_insert():
products = [
{
"id": 1,
"name": "Smartphone XS",
"desc": "Latest smartphone with advanced camera and long battery life. Features include 5G connectivity, water resistance, and a powerful processor.",
"price": 799.99,
"stock": 120,
"release": '2024-09-15',
"updated": '2025-04-01 10:30:00',
"featured": True
},
{
"id": 2,
"name": "Laptop Pro",
"desc": "High-performance laptop for professionals and gamers.",
"price": 1299.99,
"stock": 45,
"release": '2024-07-22',
"updated": '2025-03-15 14:45:22',
"featured": True
},
{
"id": 3,
"name": "Wireless Earbuds",
"desc": "Noise-cancelling earbuds with crystal clear sound quality.",
"price": 149.50,
"stock": 200,
"release": '2024-12-01',
"updated": '2025-04-10 09:15:30',
"featured": False
},
{
"id": 4,
"name": "Smart Watch",
"desc": "Health monitoring smartwatch with GPS and fitness tracking capabilities.",
"price": 249.99,
"stock": 75,
"release": '2025-01-10',
"updated": '2025-04-05 16:20:15',
"featured": True
}
]

return products


# This method is used to create a sample table in the database.
# It checks if the table already exists in the database.
# If it does, it skips the creation process.
# If it doesn't, it creates a new table named "products" with various data types.
# This is a test method to successfully run the connector.
# This method should be removed in production code.
# This is because, in production code, the connector should be able to work with any existing table in the database.
def create_sample_table_into_db(conn):
# Check if the table already exists
check_table_sql = """
SELECT 1 FROM SYSCAT.TABLES
WHERE TABSCHEMA = CURRENT_SCHEMA
AND TABNAME = 'PRODUCTS'
"""

stmt = ibm_db.exec_immediate(conn, check_table_sql)
result = ibm_db.fetch_tuple(stmt)

if result:
log.info("Table 'products' already exists, skipping creation")
else:
# Create a table with multiple data types if it doesn't exist
create_table_sql = """
CREATE TABLE products (
product_id INTEGER NOT NULL PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
description CLOB,
price DECIMAL(10,2) NOT NULL,
in_stock SMALLINT NOT NULL,
release_date DATE,
last_updated TIMESTAMP,
is_featured BOOLEAN
)
"""

# Create the table and commit the changes
ibm_db.exec_immediate(conn, create_table_sql)
ibm_db.commit(conn)
log.info("Table 'products' created successfully")


# This method is used to insert sample data into the "products" table in the database.
# It first creates the sample table using the create_sample_table_into_db method.
# Then it prepares an SQL statement to insert data into the table.
# It iterates over the sample data and binds the parameters to the SQL statement.
# Finally, it executes the statement for each product and commits the changes to the database.
# This is a test method to successfully run the connector.
# This method should be removed in production code.
# This is because, in production code, the connector should be able to work with any existing table in the database.
def insert_sample_data_into_table(conn):
create_sample_table_into_db(conn)
insert_sql = """
INSERT INTO products (product_id, product_name, description, price, in_stock,
release_date, last_updated, is_featured)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""

stmt = ibm_db.prepare(conn, insert_sql)
products = sample_data_to_insert()
for product in products:
ibm_db.bind_param(stmt, 1, product["id"])
ibm_db.bind_param(stmt, 2, product["name"])
ibm_db.bind_param(stmt, 3, product["desc"])
ibm_db.bind_param(stmt, 4, product["price"])
ibm_db.bind_param(stmt, 5, product["stock"])
ibm_db.bind_param(stmt, 6, product["release"])
ibm_db.bind_param(stmt, 7, product["updated"])
ibm_db.bind_param(stmt, 8, product["featured"])
ibm_db.execute(stmt)

ibm_db.commit(conn)
log.info(f"Inserted {len(products)} rows into 'products' table successfully")


# Define the update function, which is a required function, and is called by Fivetran during each sync.
# See the technical reference documentation for more details on the update function
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update
# The function takes two parameters:
# - configuration: dictionary contains any secrets or payloads you configure when deploying the connector
# - state: a dictionary contains whatever state you have chosen to checkpoint during the prior sync
# The state dictionary is empty for the first sync or for any full re-sync
def update(configuration: dict, state: dict):
log.warning("Example: Source Examples - IBM DB2")

# Connect to the IBM DB2 database
conn = connect_to_db(configuration)

# Insert sample data into the products table
# This is a test method to successfully run the connector.
# This method creates a sample table and inserts sample data into it.
# This data will be fetched from the table and upserted using the fivetran_connector_sdk module.
# This method should be removed in production code.
insert_sample_data_into_table(conn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should have this function and data insertion logic in this file.
We can move it to a seperate file and mention these comments there. @varundhall pls suggest.


# Load the state from the state dictionary
last_updated = state.get("last_updated", "1990-01-01T00:00:00")

# The SQL query to select all records from the products table after the last updated timestamp
sql = f"SELECT * FROM products WHERE LAST_UPDATED > '{last_updated}'"
stmt = ibm_db.exec_immediate(conn, sql)
# Fetch the first record from the result set
# The ibm_db.fetch_assoc method fetches the next row from the result set as a dictionary
dictionary = ibm_db.fetch_assoc(stmt)

# Iterate over the result set and upsert each record until there are no more records
while dictionary:
# The yield statement returns a generator object.
# This generator will yield an upsert operation to the Fivetran connector.
# The op.upsert method is called with two arguments:
# - The first argument is the name of the table to upsert the data into, in this case, "products".
# - The second argument is a dictionary containing the data to be upserted,
yield op.upsert(table="products", data=dictionary)

# Update the state with the last updated timestamp
# This is important for ensuring that the sync process can resume from the correct position in case of next sync or interruptions
# The last updated timestamp is fetched from dictionary, converted to ISO string and compared with the current last_updated value
last_modified_from_data = dictionary.get("LAST_UPDATED").isoformat()
if last_modified_from_data > last_updated:
last_updated = last_modified_from_data

# fetch the next record from the result set
dictionary = ibm_db.fetch_assoc(stmt)

# commit the changes to the database
ibm_db.commit(conn)
log.info("Upserted all records from the products table")

# Close the database connection after the operation is complete
if 'conn' in locals() and conn:
ibm_db.close(conn)
log.info("Connection closed")

# update the state with the last updated timestamp
# This state is used to keep track of the last updated timestamp for the next sync
# This state will be checkpointed using op.checkpoint() method
state["last_updated"] = last_updated

# Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume
# from the correct position in case of next sync or interruptions.
# Learn more about how and where to checkpoint by reading our best practices documentation
# (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation).
yield op.checkpoint(state)


# This creates the connector object that will use the update function defined in this connector.py file.
connector = Connector(update=update, schema=schema)

# Check if the script is being run as the main module.
# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button.
# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production.
# Please test using the Fivetran debug command prior to finalizing and deploying your connector.
if __name__ == "__main__":
# Open the configuration.json file and load its contents into a dictionary.
with open("configuration.json", 'r') as f:
configuration = json.load(f)
# Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE:
connector.debug(configuration=configuration)
1 change: 1 addition & 0 deletions examples/source_examples/ibm_db2/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ibm_db==3.2.6