Skip to content

Commit ef19afd

Browse files
committed
test
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent b937581 commit ef19afd

4 files changed

Lines changed: 37 additions & 18 deletions

File tree

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function start_rabbitmq
8585
--network "$docker_network_name" \
8686
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \
8787
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \
88-
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \
88+
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \
8989
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/advanced.config:/etc/rabbitmq/advanced.config:ro" \
9090
--volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \
9191
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \
6.37 MB
Binary file not shown.

docs/examples/async_confirmation/async_confirmation.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import (
1010

1111
func main() {
1212

13-
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
13+
// create 10 million UUIDs
14+
15+
env := rmq.NewEnvironment("amqp://test:test@192.168.1.124:5672/", &rmq.AmqpConnOptions{
16+
17+
MaxFrameSize: 2048,
18+
})
1419

1520
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
1621
amqpConnection, err := env.NewConnection(context.Background())
@@ -20,7 +25,7 @@ func main() {
2025
}
2126

2227
amqpConnection.Management().DeleteQueue(context.Background(), "test")
23-
amqpConnection.Management().DeclareQueue(context.Background(), &rmq.QuorumQueueSpecification{
28+
amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{
2429
Name: "test",
2530
})
2631

@@ -38,16 +43,23 @@ func main() {
3843

3944
switch state.(type) {
4045
case *amqp.StateAccepted:
41-
if atomic.AddInt32(&confirmed, 1)%20_000 == 0 {
46+
if atomic.AddInt32(&confirmed, 1)%50_000 == 0 {
4247
// confirmations per second
4348
rmq.Info("Confirmations per second", "value", float64(confirmed)/time.Since(startDate).Seconds())
4449
}
50+
51+
if atomic.LoadInt32(&confirmed) == 2_500_000 {
52+
// time since the start
53+
rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds())
54+
}
4555
default:
4656
panic("Message not accepted")
4757

4858
}
4959

5060
}
61+
// 1kb to bytes
62+
//bytes := make([]byte, 1_000)
5163

5264
//for i := 0; i < 500_000; i++ {
5365
// _, err := publisher.Publish(context.Background(), rmq.NewMessage(make([]byte, 1)))
@@ -58,14 +70,19 @@ func main() {
5870
// // message per second
5971
// rmq.Info("Sync Messages per second", "value", float64(i)/time.Since(startDate).Seconds())
6072
// }
73+
//
74+
// if i == 500_000-1 {
75+
// // time since the start
76+
// rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds())
77+
// }
6178
//}
6279

63-
for i := 0; i < 500_000; i++ {
64-
err := publisher.PublishAsyncConfirmation(context.Background(), rmq.NewMessage(make([]byte, 1)), f)
80+
for i := 0; i < 2_500_000; i++ {
81+
err := publisher.PublishAsyncConfirmation(context.Background(), rmq.NewMessage(make([]byte, 1_000)), f)
6582
if err != nil {
6683
rmq.Error("Error publishing message", err)
6784
}
68-
if i%20_000 == 0 {
85+
if i%50_000 == 0 {
6986
// message per second
7087
rmq.Info("Messages per second", "value", float64(i)/time.Since(startDate).Seconds())
7188
}

pkg/rabbitmqamqp/amqp_publisher.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAd
3838

3939
r := &Publisher{connection: connection, linkName: getLinkName(options),
4040
destinationAdd: destinationAdd, id: id,
41-
maxInFlight: 10_00,
41+
maxInFlight: 1_000,
4242
condition: sync.NewCond(&sync.Mutex{})}
4343
connection.entitiesTracker.storeOrReplaceProducer(r)
4444
err := r.createSender(ctx)
@@ -121,22 +121,24 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis
121121
type CallbackConfirmation func(message *amqp.Message, state DeliveryState, err error)
122122

123123
func (m *Publisher) PublishAsyncConfirmation(ctx context.Context, message *amqp.Message, callback CallbackConfirmation) error {
124-
//go func() {
125-
r, err := m.sender.Load().SendWithReceipt(ctx, message, nil)
126-
if err != nil {
127-
return err
128-
}
129-
if atomic.AddInt32(&m.pending, 1) == int32(m.maxInFlight) {
124+
if atomic.AddInt32(&m.pending, 1) >= int32(m.maxInFlight) {
130125
m.condition.L.Lock()
131126
m.condition.Wait()
132127
m.condition.L.Unlock()
133128
}
134-
go func() {
135-
state, err := r.Wait(ctx)
129+
130+
//go func() {
131+
sendReceipt, err := m.sender.Load().SendWithReceipt(ctx, message, nil)
132+
if err != nil {
133+
return err
134+
}
135+
136+
go func(sr amqp.SendReceipt) {
137+
state, err := sr.Wait(ctx)
136138
atomic.AddInt32(&m.pending, -1)
137-
//m.condition.Signal()
139+
m.condition.Signal()
138140
callback(message, state, err)
139-
}()
141+
}(sendReceipt)
140142
//}()
141143
return nil
142144
}

0 commit comments

Comments
 (0)