Skip to content

Commit 39cfce3

Browse files
committed
Fix jdbc_fetch_size with postgresql
1 parent a4ac442 commit 39cfce3

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

lib/logstash/plugin_mixins/jdbc/jdbc.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def load_driver
144144
require "java"
145145
require "sequel"
146146
require "sequel/adapters/jdbc"
147+
require "sequel/adapters/jdbc/transactions"
147148

148149
load_driver_jars
149150
begin
@@ -193,6 +194,7 @@ def open_jdbc_connection
193194

194195
@database = jdbc_connect()
195196
@database.extension(:pagination)
197+
@database.extend(Sequel::JDBC::Transactions)
196198
if @jdbc_default_timezone
197199
@database.extension(:named_timezones)
198200
@database.timezone = @jdbc_default_timezone

lib/logstash/plugin_mixins/jdbc/statement_handler.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@ class NormalStatementHandler < StatementHandler
3131
# @yieldparam row [Hash{Symbol=>Object}]
3232
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
3333
query = build_query(db, sql_last_value)
34-
if jdbc_paging_enabled
35-
query.each_page(jdbc_page_size) do |paged_dataset|
36-
paged_dataset.each do |row|
34+
# Execute query in transaction cause PG driver require autocommit off for set fetch count
35+
# See: https://jdbc.postgresql.org/documentation/head/query.html
36+
db.transaction(rollback: :always) do
37+
if jdbc_paging_enabled
38+
query.each_page(jdbc_page_size) do |paged_dataset|
39+
paged_dataset.each do |row|
40+
yield row
41+
end
42+
end
43+
else
44+
query.each do |row|
3745
yield row
3846
end
3947
end
40-
else
41-
query.each do |row|
42-
yield row
43-
end
4448
end
4549
end
4650

spec/inputs/jdbc_spec.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,7 +1020,6 @@
10201020
{
10211021
"statement" => "SELECT * FROM test_table",
10221022
"jdbc_pool_timeout" => 0,
1023-
"jdbc_connection_string" => 'mock://localhost:1527/db',
10241023
"sequel_opts" => {
10251024
"max_connections" => 1
10261025
}
@@ -1076,7 +1075,7 @@
10761075
end
10771076

10781077
it "should report the statements to logging" do
1079-
expect(plugin.logger).to receive(:debug).once
1078+
expect(plugin.logger).to receive(:debug).thrice
10801079
plugin.run(queue)
10811080
end
10821081
end

0 commit comments

Comments
 (0)