Skip to content

Commit 873e543

Browse files
committed
⚡️ Update defaults for processor/batching configuration
1 parent ae7c365 commit 873e543

8 files changed

+26
-113
lines changed

lib/sequin/runtime/azure_event_hub_pipeline.ex

+4-12
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,13 @@ defmodule Sequin.Runtime.AzureEventHubPipeline do
1616
end
1717

1818
@impl SinkPipeline
19-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
20-
[
21-
default: [
22-
concurrency: max_waiting,
23-
max_demand: 100
24-
]
25-
]
26-
end
19+
def batchers_config(_consumer) do
20+
concurrency = min(System.schedulers_online() * 2, 80)
2721

28-
@impl SinkPipeline
29-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
3022
[
3123
default: [
32-
concurrency: 1,
33-
batch_size: batch_size,
24+
concurrency: concurrency,
25+
batch_size: 10,
3426
batch_timeout: 50
3527
]
3628
]

lib/sequin/runtime/gcp_pubsub_pipeline.ex

+4-12
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,14 @@ defmodule Sequin.Runtime.GcpPubsubPipeline do
1515
Map.put(context, :pubsub_client, GcpPubsubSink.pubsub_client(sink))
1616
end
1717

18-
@impl SinkPipeline
19-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
20-
[
21-
default: [
22-
concurrency: max_waiting,
23-
max_demand: 100
24-
]
25-
]
26-
end
27-
2818
@impl SinkPipeline
2919
def batchers_config(_consumer) do
20+
concurrency = min(System.schedulers_online() * 2, 80)
21+
3022
[
3123
default: [
32-
concurrency: 100,
33-
batch_size: 100,
24+
concurrency: concurrency,
25+
batch_size: 10,
3426
batch_timeout: 50
3527
]
3628
]

lib/sequin/runtime/http_push_pipeline.ex

-22
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,6 @@ defmodule Sequin.Runtime.HttpPushPipeline do
2525
|> Map.put(:features, features)
2626
end
2727

28-
@impl SinkPipeline
29-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
30-
[
31-
default: [
32-
concurrency: max_waiting,
33-
max_demand: 10,
34-
min_demand: 5
35-
]
36-
]
37-
end
38-
39-
@impl SinkPipeline
40-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
41-
[
42-
default: [
43-
concurrency: 1,
44-
batch_size: batch_size,
45-
batch_timeout: 50
46-
]
47-
]
48-
end
49-
5028
@impl SinkPipeline
5129
def handle_batch(:default, messages, _batch_info, context) do
5230
%{

lib/sequin/runtime/nats_pipeline.ex

+4-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ defmodule Sequin.Runtime.NatsPipeline do
22
@moduledoc false
33
@behaviour Sequin.Runtime.SinkPipeline
44

5-
alias Sequin.Consumers.SinkConsumer
65
alias Sequin.Error
76
alias Sequin.Runtime.SinkPipeline
87
alias Sequin.Sinks.Nats
@@ -15,22 +14,13 @@ defmodule Sequin.Runtime.NatsPipeline do
1514
end
1615

1716
@impl SinkPipeline
18-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
19-
[
20-
default: [
21-
concurrency: max_waiting,
22-
max_demand: 10,
23-
min_demand: 5
24-
]
25-
]
26-
end
17+
def batchers_config(_consumer) do
18+
concurrency = min(System.schedulers_online() * 2, 80)
2719

28-
@impl SinkPipeline
29-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
3020
[
3121
default: [
32-
concurrency: 1,
33-
batch_size: batch_size,
22+
concurrency: concurrency,
23+
batch_size: 10,
3424
batch_timeout: 50
3525
]
3626
]

lib/sequin/runtime/rabbitmq_pipeline.ex

+4-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ defmodule Sequin.Runtime.RabbitMqPipeline do
22
@moduledoc false
33
@behaviour Sequin.Runtime.SinkPipeline
44

5-
alias Sequin.Consumers.SinkConsumer
65
alias Sequin.Error
76
alias Sequin.Runtime.SinkPipeline
87
alias Sequin.Sinks.RabbitMq
@@ -15,22 +14,13 @@ defmodule Sequin.Runtime.RabbitMqPipeline do
1514
end
1615

1716
@impl SinkPipeline
18-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
19-
[
20-
default: [
21-
concurrency: max_waiting,
22-
max_demand: 10,
23-
min_demand: 5
24-
]
25-
]
26-
end
17+
def batchers_config(_consumer) do
18+
concurrency = min(System.schedulers_online() * 2, 80)
2719

28-
@impl SinkPipeline
29-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
3020
[
3121
default: [
32-
concurrency: 1,
33-
batch_size: batch_size,
22+
concurrency: concurrency,
23+
batch_size: 10,
3424
batch_timeout: 50
3525
]
3626
]

lib/sequin/runtime/redis_pipeline.ex

-22
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,6 @@ defmodule Sequin.Runtime.RedisPipeline do
1414
context
1515
end
1616

17-
@impl SinkPipeline
18-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
19-
[
20-
default: [
21-
concurrency: max_waiting,
22-
max_demand: 10,
23-
min_demand: 5
24-
]
25-
]
26-
end
27-
28-
@impl SinkPipeline
29-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
30-
[
31-
default: [
32-
concurrency: 1,
33-
batch_size: batch_size,
34-
batch_timeout: 50
35-
]
36-
]
37-
end
38-
3917
@impl SinkPipeline
4018
def handle_batch(:default, messages, _batch_info, context) do
4119
%{consumer: %SinkConsumer{sink: %RedisSink{} = sink}, test_pid: test_pid} = context

lib/sequin/runtime/sink_pipeline.ex

+6-4
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ defmodule Sequin.Runtime.SinkPipeline do
188188
defp processors_config(pipeline_mod, consumer) do
189189
default = [
190190
default: [
191-
concurrency: 1,
191+
concurrency: System.schedulers_online(),
192192
max_demand: 10,
193193
min_demand: 5
194194
]
@@ -202,11 +202,13 @@ defmodule Sequin.Runtime.SinkPipeline do
202202
end
203203

204204
defp batchers_config(pipeline_mod, consumer) do
205+
concurrency = min(System.schedulers_online() * 2, 80)
206+
205207
default = [
206208
default: [
207-
concurrency: 1,
208-
batch_size: 1,
209-
batch_timeout: 0
209+
concurrency: concurrency,
210+
batch_size: consumer.batch_size,
211+
batch_timeout: 50
210212
]
211213
]
212214

lib/sequin/runtime/sqs_pipeline.ex

+4-13
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,13 @@ defmodule Sequin.Runtime.SqsPipeline do
1616
end
1717

1818
@impl SinkPipeline
19-
def processors_config(%SinkConsumer{max_waiting: max_waiting}) do
20-
[
21-
default: [
22-
concurrency: max_waiting,
23-
max_demand: 10,
24-
min_demand: 5
25-
]
26-
]
27-
end
19+
def batchers_config(_consumer) do
20+
concurrency = min(System.schedulers_online() * 2, 80)
2821

29-
@impl SinkPipeline
30-
def batchers_config(%SinkConsumer{batch_size: batch_size}) do
3122
[
3223
default: [
33-
concurrency: 1,
34-
batch_size: batch_size,
24+
concurrency: concurrency,
25+
batch_size: 10,
3526
batch_timeout: 50
3627
]
3728
]

0 commit comments

Comments
 (0)