Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ source "https://rubygems.org"
# Specify your gem's dependencies in multiwoven-integrations.gemspec
gemspec

gem "addressable", ">= 2.8.10"

gem "rake", "~> 13.0"

gem "rspec", "~> 3.0"
Expand Down
17 changes: 17 additions & 0 deletions integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ GIT
PATH
remote: .
specs:
<<<<<<< HEAD
multiwoven-integrations (0.35.0)
=======
multiwoven-integrations (0.35.3)
>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842))
MailchimpMarketing
activesupport
async-websocket
Expand Down Expand Up @@ -58,10 +62,18 @@ GEM
drb
i18n (>= 1.6, < 2)
minitest (>= 5.1)
<<<<<<< HEAD
mutex_m
tzinfo (~> 2.0)
addressable (2.8.6)
public_suffix (>= 2.0.2, < 6.0)
=======
securerandom (>= 0.3)
tzinfo (~> 2.0, >= 2.0.5)
uri (>= 0.13.1)
addressable (2.9.0)
public_suffix (>= 2.0.2, < 8.0)
>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842))
afm (1.0.0)
ast (2.4.2)
async (2.11.0)
Expand Down Expand Up @@ -482,7 +494,12 @@ PLATFORMS

DEPENDENCIES
MailchimpMarketing
<<<<<<< HEAD
activesupport
=======
activesupport (>= 7.2.3.1)
addressable (>= 2.8.10)
>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842))
async-websocket (~> 0.8.0)
aws-sdk-athena
aws-sdk-bedrockruntime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
raw_table = sync_config.stream.name
table_name = qualify_table(connection_config[:schema], raw_table)
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)
primary_key = fetch_primary_key(db, connection_config[:schema], raw_table)

write_success = 0
write_failure = 0
Expand Down Expand Up @@ -177,6 +177,23 @@ def group_by_table(records)
end
end

def fetch_primary_key(db, schema, table)
schema = schema.presence || "public"
quoted = "\"#{schema}\".\"#{table}\""
result = db.exec(<<~SQL)
SELECT a.attname AS column_name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = '#{quoted}'::regclass
AND i.indisprimary
LIMIT 1
SQL
result.first&.dig("column_name")
rescue StandardError => e
logger.warn("POSTGRESQL:FETCH_PRIMARY_KEY:EXCEPTION #{e.message}")
nil
end

def qualify_table(schema, table)
return table if schema.blank? || schema == "public"

Expand Down
4 changes: 4 additions & 0 deletions integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

module Multiwoven
module Integrations
<<<<<<< HEAD
VERSION = "0.35.0"
=======
VERSION = "0.35.3"
>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842))

ENABLED_SOURCES = %w[
Snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
allow(PG).to receive(:connect).and_return(pg_connection)
allow(pg_connection).to receive(:close)
allow(pg_connection).to receive(:escape_string) { |str| str }
allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return(nil)
end

let(:s_config) do
Expand All @@ -197,10 +198,39 @@
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end
<<<<<<< HEAD
=======

it "generates ON CONFLICT DO NOTHING for destination_insert when primary key is present" do
allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("user_id")
expect(pg_connection).to receive(:exec).with(
a_string_matching(/ON CONFLICT.*DO NOTHING/)
).once.and_return(true)

tracking = subject.write(s_config, batch_records, "destination_insert").tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end

it "generates plain INSERT without ON CONFLICT when primary key is absent" do
config_without_pk = sync_config.deep_merge(model: { primary_key: "" })
s_config_no_pk = Multiwoven::Integrations::Protocol::SyncConfig.from_json(config_without_pk.to_json)
s_config_no_pk.sync_run_id = "50"

expect(pg_connection).to receive(:exec).with(
satisfy { |sql| sql.include?("INSERT INTO") && !sql.include?("ON CONFLICT") }
).once.and_return(true)

tracking = subject.write(s_config_no_pk, batch_records, "destination_insert").tracking
expect(tracking.success).to eql(2)
expect(tracking.failed).to eql(0)
end
>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842))
end

context "bulk upsert" do
it "generates ON CONFLICT clause for destination_update" do
allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("user_id")
expect(pg_connection).to receive(:exec).with(
a_string_matching(/ON CONFLICT.*DO UPDATE SET/)
).once.and_return(true)
Expand All @@ -211,6 +241,7 @@
end

it "generates ON CONFLICT DO NOTHING when update_cols is empty (only primary key)" do
allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("id")
records_only_pk = [
{ "id" => "1" },
{ "id" => "2" }
Expand Down Expand Up @@ -360,6 +391,35 @@
end
end

describe "#fetch_primary_key" do
before do
allow(PG).to receive(:connect).and_return(pg_connection)
end

it "returns the primary key column name from pg_catalog" do
allow(pg_connection).to receive(:exec).and_return([{ "column_name" => "customer_id" }])
result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper")
expect(result).to eq("customer_id")
end

it "returns nil when table has no primary key" do
allow(pg_connection).to receive(:exec).and_return([])
result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper")
expect(result).to be_nil
end

it "defaults schema to public when blank" do
expect(pg_connection).to receive(:exec).with(a_string_matching(/"public"\."scraper"'::regclass/)).and_return([])
client.send(:fetch_primary_key, pg_connection, nil, "scraper")
end

it "returns nil on exec error" do
allow(pg_connection).to receive(:exec).and_raise(PG::Error.new("connection error"))
result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper")
expect(result).to be_nil
end
end

describe "#meta_data" do
it "client class_name and meta name is same" do
meta_name = client.class.to_s.split("::")[-2]
Expand Down
Loading