diff --git a/integrations/Gemfile b/integrations/Gemfile index 7b1a3365b..506f7487d 100644 --- a/integrations/Gemfile +++ b/integrations/Gemfile @@ -5,6 +5,8 @@ source "https://rubygems.org" # Specify your gem's dependencies in multiwoven-integrations.gemspec gemspec +gem "addressable", ">= 2.8.10" + gem "rake", "~> 13.0" gem "rspec", "~> 3.0" diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index ba46adbee..e4bbec19b 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,11 @@ GIT PATH remote: . specs: +<<<<<<< HEAD multiwoven-integrations (0.35.0) +======= + multiwoven-integrations (0.35.3) +>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842)) MailchimpMarketing activesupport async-websocket @@ -58,10 +62,18 @@ GEM drb i18n (>= 1.6, < 2) minitest (>= 5.1) +<<<<<<< HEAD mutex_m tzinfo (~> 2.0) addressable (2.8.6) public_suffix (>= 2.0.2, < 6.0) +======= + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + addressable (2.9.0) + public_suffix (>= 2.0.2, < 8.0) +>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842)) afm (1.0.0) ast (2.4.2) async (2.11.0) @@ -482,7 +494,12 @@ PLATFORMS DEPENDENCIES MailchimpMarketing +<<<<<<< HEAD activesupport +======= + activesupport (>= 7.2.3.1) + addressable (>= 2.8.10) +>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842)) async-websocket (~> 0.8.0) aws-sdk-athena aws-sdk-bedrockruntime diff --git a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb index cfed42c05..54efb8318 100644 --- a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb +++ b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb @@ -51,8 +51,8 @@ def write(sync_config, records, action = "destination_insert") connection_config = sync_config.destination.connection_specification.with_indifferent_access raw_table = sync_config.stream.name table_name = qualify_table(connection_config[:schema], raw_table) - primary_key = sync_config.model.primary_key db = create_connection(connection_config) + primary_key = fetch_primary_key(db, connection_config[:schema], raw_table) write_success = 0 write_failure = 0 @@ -177,6 +177,23 @@ def group_by_table(records) end end + def fetch_primary_key(db, schema, table) + schema = schema.presence || "public" + quoted = "\"#{schema}\".\"#{table}\"" + result = db.exec(<<~SQL) + SELECT a.attname AS column_name + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = '#{quoted}'::regclass + AND i.indisprimary + LIMIT 1 + SQL + result.first&.dig("column_name") + rescue StandardError => e + logger.warn("POSTGRESQL:FETCH_PRIMARY_KEY:EXCEPTION #{e.message}") + nil + end + def qualify_table(schema, table) return table if schema.blank? || schema == "public" diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index ae1dd8962..61b185419 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,11 @@ module Multiwoven module Integrations +<<<<<<< HEAD VERSION = "0.35.0" +======= + VERSION = "0.35.3" +>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842)) ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb index 62c0c28d2..1c99957a7 100644 --- a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb @@ -175,6 +175,7 @@ allow(PG).to receive(:connect).and_return(pg_connection) allow(pg_connection).to receive(:close) allow(pg_connection).to receive(:escape_string) { |str| str } + allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return(nil) end let(:s_config) do @@ -197,10 +198,39 @@ expect(tracking.success).to eql(2) expect(tracking.failed).to eql(0) end +<<<<<<< HEAD +======= + + it "generates ON CONFLICT DO NOTHING for destination_insert when primary key is present" do + allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("user_id") + expect(pg_connection).to receive(:exec).with( + a_string_matching(/ON CONFLICT.*DO NOTHING/) + ).once.and_return(true) + + tracking = subject.write(s_config, batch_records, "destination_insert").tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end + + it "generates plain INSERT without ON CONFLICT when primary key is absent" do + config_without_pk = sync_config.deep_merge(model: { primary_key: "" }) + s_config_no_pk = Multiwoven::Integrations::Protocol::SyncConfig.from_json(config_without_pk.to_json) + s_config_no_pk.sync_run_id = "50" + + expect(pg_connection).to receive(:exec).with( + satisfy { |sql| sql.include?("INSERT INTO") && !sql.include?("ON CONFLICT") } + ).once.and_return(true) + + tracking = subject.write(s_config_no_pk, batch_records, "destination_insert").tracking + expect(tracking.success).to eql(2) + expect(tracking.failed).to eql(0) + end +>>>>>>> 9712b5cc1 (fix(CE): added a logic to fetch primary key dynamically (#1842)) end context "bulk upsert" do it "generates ON CONFLICT clause for destination_update" do + allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("user_id") expect(pg_connection).to receive(:exec).with( a_string_matching(/ON CONFLICT.*DO UPDATE SET/) ).once.and_return(true) @@ -211,6 +241,7 @@ end it "generates ON CONFLICT DO NOTHING when update_cols is empty (only primary key)" do + allow_any_instance_of(described_class).to receive(:fetch_primary_key).and_return("id") records_only_pk = [ { "id" => "1" }, { "id" => "2" } @@ -360,6 +391,35 @@ end end + describe "#fetch_primary_key" do + before do + allow(PG).to receive(:connect).and_return(pg_connection) + end + + it "returns the primary key column name from pg_catalog" do + allow(pg_connection).to receive(:exec).and_return([{ "column_name" => "customer_id" }]) + result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper") + expect(result).to eq("customer_id") + end + + it "returns nil when table has no primary key" do + allow(pg_connection).to receive(:exec).and_return([]) + result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper") + expect(result).to be_nil + end + + it "defaults schema to public when blank" do + expect(pg_connection).to receive(:exec).with(a_string_matching(/"public"\."scraper"'::regclass/)).and_return([]) + client.send(:fetch_primary_key, pg_connection, nil, "scraper") + end + + it "returns nil on exec error" do + allow(pg_connection).to receive(:exec).and_raise(PG::Error.new("connection error")) + result = client.send(:fetch_primary_key, pg_connection, "destination", "scraper") + expect(result).to be_nil + end + end + describe "#meta_data" do it "client class_name and meta name is same" do meta_name = client.class.to_s.split("::")[-2]