Skip to content

Commit e4711f3

Browse files
authored
Merge pull request #181 from rabbitmq/issue-178
Add a CloseDeadline function to Connection
2 parents 3acf42c + 9a62e97 commit e4711f3

File tree

7 files changed

+135
-10
lines changed

7 files changed

+135
-10
lines changed

.github/workflows/tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,6 @@ jobs:
4949
go-version: ${{ matrix.go-version }}
5050
check-latest: true
5151
- name: Tests
52+
env:
53+
RABBITMQ_RABBITMQCTL_PATH: DOCKER:${{ job.services.rabbitmq.id }}
5254
run: make check-fmt tests

CONTRIBUTING.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ Here is the recommended workflow:
99
1. Run Static Checks
1010
1. Run integration tests (see below)
1111
1. **Implement tests**
12-
1. Implement fixs
13-
1. Commit your changes (`git commit -am 'Add some feature'`)
12+
1. Implement fixes
13+
1. Commit your changes. Use a [good, descriptive, commit message][good-commit].
1414
1. Push to a branch (`git push -u origin my-new-feature`)
1515
1. Submit a pull request
1616

17+
[good-commit]: https://cbea.ms/git-commit/
18+
1719
## Running Static Checks
1820

1921
golangci-lint must be installed to run the static checks. See [installation
@@ -43,6 +45,18 @@ The integration tests can be run via:
4345
make tests
4446
```
4547

48+
Some tests require access to `rabbitmqctl` CLI. Use the environment variable
49+
`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests.
50+
51+
If you have Docker available in your machine, you can run:
52+
53+
```shell
54+
make tests-docker
55+
```
56+
57+
This target will start a RabbitMQ container, run the test suite with the environment
58+
variable setup, and stop RabbitMQ container after a successful run.
59+
4660
All integration tests should use the `integrationConnection(...)` test
4761
helpers defined in `integration_test.go` to setup the integration environment
4862
and logging.

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ fmt: ## Run go fmt against code
1919
tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test
2020
go test -race -v -tags integration $(GO_TEST_FLAGS)
2121

22+
.PHONY: tests-docker
23+
tests-docker: rabbitmq-server
24+
RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS)
25+
$(MAKE) stop-rabbitmq-server
26+
2227
.PHONY: check
2328
check:
2429
golangci-lint run ./...

connection.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,12 +399,47 @@ func (c *Connection) Close() error {
399399
)
400400
}
401401

402+
// CloseDeadline requests and waits for the response to close this AMQP connection.
403+
//
404+
// Accepts a deadline for waiting the server response. The deadline is passed
405+
// to the low-level connection i.e. network socket.
406+
//
407+
// Regardless of the error returned, the connection is considered closed, and it
408+
// should not be used after calling this function.
409+
//
410+
// In the event of an I/O timeout, connection-closed listeners are NOT informed.
411+
//
412+
// After returning from this call, all resources associated with this connection,
413+
// including the underlying io, Channels, Notify listeners and Channel consumers
414+
// will also be closed.
415+
func (c *Connection) CloseDeadline(deadline time.Time) error {
416+
if c.IsClosed() {
417+
return ErrClosed
418+
}
419+
420+
defer c.shutdown(nil)
421+
422+
err := c.setDeadline(deadline)
423+
if err != nil {
424+
return err
425+
}
426+
427+
return c.call(
428+
&connectionClose{
429+
ReplyCode: replySuccess,
430+
ReplyText: "kthxbai",
431+
},
432+
&connectionCloseOk{},
433+
)
434+
}
435+
402436
func (c *Connection) closeWith(err *Error) error {
403437
if c.IsClosed() {
404438
return ErrClosed
405439
}
406440

407441
defer c.shutdown(err)
442+
408443
return c.call(
409444
&connectionClose{
410445
ReplyCode: uint16(err.Code),
@@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool {
420455
return atomic.LoadInt32(&c.closed) == 1
421456
}
422457

458+
// setDeadline is a wrapper to type assert Connection.conn and set an I/O
459+
// deadline in the underlying TCP connection socket, by calling
460+
// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails,
461+
// although this should never happen.
462+
func (c *Connection) setDeadline(t time.Time) error {
463+
con, ok := c.conn.(net.Conn)
464+
if !ok {
465+
return errInvalidTypeAssertion
466+
}
467+
return con.SetDeadline(t)
468+
}
469+
423470
func (c *Connection) send(f frame) error {
424471
if c.IsClosed() {
425472
return ErrClosed

connection_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@
99
package amqp091
1010

1111
import (
12+
"context"
1213
"crypto/tls"
1314
"net"
15+
"os"
16+
"os/exec"
1417
"regexp"
18+
"strings"
1519
"sync"
1620
"testing"
1721
"time"
1822
)
1923

24+
const rabbitmqctlEnvKey = "RABBITMQ_RABBITMQCTL_PATH"
25+
2026
func TestRequiredServerLocale(t *testing.T) {
2127
conn := integrationConnection(t, "AMQP 0-9-1 required server locale")
2228
t.Cleanup(func() { conn.Close() })
@@ -332,3 +338,56 @@ func TestNewConnectionProperties_HasDefaultProperties(t *testing.T) {
332338
t.Fatalf("Version in NewConnectionProperties is not a valid semver value: %s", version)
333339
}
334340
}
341+
342+
// Connection and channels should be closeable when a memory alarm is active.
343+
// https://github.com/rabbitmq/amqp091-go/issues/178
344+
func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) {
345+
err := rabbitmqctl(t, "set_vm_memory_high_watermark", "0.0001")
346+
if err != nil {
347+
t.Fatal(err)
348+
}
349+
t.Cleanup(func() {
350+
_ = rabbitmqctl(t, "set_vm_memory_high_watermark", "0.4")
351+
conn, ch := integrationQueue(t, t.Name())
352+
integrationQueueDelete(t, ch, t.Name())
353+
_ = ch.Close()
354+
_ = conn.Close()
355+
})
356+
357+
conn, ch := integrationQueue(t, t.Name())
358+
359+
go func() {
360+
// simulate a producer
361+
// required to block the connection
362+
_ = ch.PublishWithContext(context.Background(), "", t.Name(), false, false, Publishing{
363+
Body: []byte("this is a test"),
364+
})
365+
}()
366+
<-time.After(time.Second * 1)
367+
368+
err = conn.CloseDeadline(time.Now().Add(time.Second * 2))
369+
if err == nil {
370+
t.Fatal("expected error, got nil")
371+
}
372+
if !conn.IsClosed() {
373+
t.Fatal("expected connection to be closed")
374+
}
375+
}
376+
377+
func rabbitmqctl(t *testing.T, args ...string) error {
378+
rabbitmqctlPath, found := os.LookupEnv(rabbitmqctlEnvKey)
379+
if !found {
380+
t.Skipf("variable for %s for rabbitmqctl not found, skipping", rabbitmqctlEnvKey)
381+
}
382+
383+
var cmd *exec.Cmd
384+
if strings.HasPrefix(rabbitmqctlPath, "DOCKER:") {
385+
containerName := strings.Split(rabbitmqctlPath, ":")[1]
386+
cmd = exec.Command("docker", "exec", containerName, "rabbitmqctl")
387+
cmd.Args = append(cmd.Args, args...)
388+
} else {
389+
cmd = exec.Command(rabbitmqctlPath, args...)
390+
}
391+
392+
return cmd.Run()
393+
}

integration_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,13 +2008,6 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
20082008
* Support for integration tests
20092009
*/
20102010

2011-
func loggedConnection(t *testing.T, conn *Connection, name string) *Connection {
2012-
if name != "" {
2013-
conn.conn = &logIO{t, name, conn.conn}
2014-
}
2015-
return conn
2016-
}
2017-
20182011
// Returns a connection to the AMQP if the AMQP_URL environment
20192012
// variable is set and a connection can be established.
20202013
func integrationConnection(t *testing.T, name string) *Connection {
@@ -2023,7 +2016,7 @@ func integrationConnection(t *testing.T, name string) *Connection {
20232016
t.Fatalf("cannot dial integration server. Is the rabbitmq-server service running? %s", err)
20242017
return nil
20252018
}
2026-
return loggedConnection(t, conn, name)
2019+
return conn
20272020
}
20282021

20292022
// Returns a connection, channel and declares a queue when the AMQP_URL is in the environment

types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ var (
6363
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
6464
)
6565

66+
// internal errors used inside the library
67+
var (
68+
errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true}
69+
)
70+
6671
// Error captures the code and reason a channel or connection has been closed
6772
// by the server.
6873
type Error struct {

0 commit comments

Comments
 (0)