Skip to content

Commit 728bb27

Browse files
committed
Improve Go Aeron tests and coverage
1 parent a38e16e commit 728bb27

File tree

4 files changed

+410
-57
lines changed

4 files changed

+410
-57
lines changed

go/aeron_transport.go

Lines changed: 76 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,79 @@ type AeronStats struct {
3636

3737
type AeronTransport struct {
3838
config AeronConfig
39-
handle *C.epoch_aeron_transport_t
39+
handle aeronHandle
4040
closed bool
4141
}
4242

4343
const aeronFrameLength = 56
4444
const aeronFrameVersion = 1
4545

46+
type aeronHandle = unsafe.Pointer
47+
48+
var (
49+
aeronOpen = func(config AeronConfig, errBuf []byte) aeronHandle {
50+
cChannel := C.CString(config.Channel)
51+
defer C.free(unsafe.Pointer(cChannel))
52+
var cDir *C.char
53+
if config.AeronDirectory != "" {
54+
cDir = C.CString(config.AeronDirectory)
55+
defer C.free(unsafe.Pointer(cDir))
56+
}
57+
cConfig := C.epoch_aeron_config_t{
58+
channel: cChannel,
59+
stream_id: C.int32_t(config.StreamID),
60+
aeron_directory: cDir,
61+
fragment_limit: C.int32_t(config.FragmentLimit),
62+
offer_max_attempts: C.int32_t(config.OfferMaxAttempts),
63+
}
64+
handle := C.epoch_aeron_open(
65+
&cConfig,
66+
(*C.char)(unsafe.Pointer(&errBuf[0])),
67+
C.size_t(len(errBuf)),
68+
)
69+
return unsafe.Pointer(handle)
70+
}
71+
aeronSend = func(handle aeronHandle, frame []byte, errBuf []byte) int {
72+
return int(C.epoch_aeron_send(
73+
(*C.epoch_aeron_transport_t)(handle),
74+
(*C.uint8_t)(unsafe.Pointer(&frame[0])),
75+
C.size_t(len(frame)),
76+
(*C.char)(unsafe.Pointer(&errBuf[0])),
77+
C.size_t(len(errBuf)),
78+
))
79+
}
80+
aeronPoll = func(handle aeronHandle, frameBuf []byte, max int, errBuf []byte) (int, int) {
81+
var count C.size_t
82+
result := C.epoch_aeron_poll(
83+
(*C.epoch_aeron_transport_t)(handle),
84+
(*C.uint8_t)(unsafe.Pointer(&frameBuf[0])),
85+
C.size_t(max),
86+
&count,
87+
(*C.char)(unsafe.Pointer(&errBuf[0])),
88+
C.size_t(len(errBuf)),
89+
)
90+
return int(result), int(count)
91+
}
92+
aeronStats = func(handle aeronHandle, out *AeronStats) int {
93+
var stats C.epoch_aeron_stats_t
94+
if C.epoch_aeron_stats((*C.epoch_aeron_transport_t)(handle), &stats) < 0 {
95+
return -1
96+
}
97+
out.SentCount = int64(stats.sent_count)
98+
out.ReceivedCount = int64(stats.received_count)
99+
out.OfferBackPressure = int64(stats.offer_back_pressure)
100+
out.OfferNotConnected = int64(stats.offer_not_connected)
101+
out.OfferAdminAction = int64(stats.offer_admin_action)
102+
out.OfferClosed = int64(stats.offer_closed)
103+
out.OfferMaxPosition = int64(stats.offer_max_position)
104+
out.OfferFailed = int64(stats.offer_failed)
105+
return 0
106+
}
107+
aeronClose = func(handle aeronHandle) {
108+
C.epoch_aeron_close((*C.epoch_aeron_transport_t)(handle))
109+
}
110+
)
111+
46112
func encodeAeronFrame(message Message) ([]byte, error) {
47113
buffer := make([]byte, aeronFrameLength)
48114
buffer[0] = aeronFrameVersion
@@ -75,26 +141,8 @@ func decodeAeronFrame(buffer []byte) (Message, error) {
75141
}
76142

77143
func NewAeronTransport(config AeronConfig) *AeronTransport {
78-
cChannel := C.CString(config.Channel)
79-
defer C.free(unsafe.Pointer(cChannel))
80-
var cDir *C.char
81-
if config.AeronDirectory != "" {
82-
cDir = C.CString(config.AeronDirectory)
83-
defer C.free(unsafe.Pointer(cDir))
84-
}
85-
cConfig := C.epoch_aeron_config_t{
86-
channel: cChannel,
87-
stream_id: C.int32_t(config.StreamID),
88-
aeron_directory: cDir,
89-
fragment_limit: C.int32_t(config.FragmentLimit),
90-
offer_max_attempts: C.int32_t(config.OfferMaxAttempts),
91-
}
92144
errBuf := make([]byte, 256)
93-
handle := C.epoch_aeron_open(
94-
&cConfig,
95-
(*C.char)(unsafe.Pointer(&errBuf[0])),
96-
C.size_t(len(errBuf)),
97-
)
145+
handle := aeronOpen(config, errBuf)
98146
if handle == nil {
99147
panic(fmt.Errorf("aeron open failed: %s", trimCString(errBuf)))
100148
}
@@ -110,13 +158,7 @@ func (t *AeronTransport) Send(message Message) {
110158
panic(err)
111159
}
112160
errBuf := make([]byte, 256)
113-
result := C.epoch_aeron_send(
114-
t.handle,
115-
(*C.uint8_t)(unsafe.Pointer(&frame[0])),
116-
C.size_t(len(frame)),
117-
(*C.char)(unsafe.Pointer(&errBuf[0])),
118-
C.size_t(len(errBuf)),
119-
)
161+
result := aeronSend(t.handle, frame, errBuf)
120162
if result < 0 {
121163
panic(fmt.Errorf("aeron send failed: %s", trimCString(errBuf)))
122164
}
@@ -128,20 +170,12 @@ func (t *AeronTransport) Poll(max int) []Message {
128170
}
129171
frameBuf := make([]byte, max*aeronFrameLength)
130172
errBuf := make([]byte, 256)
131-
var count C.size_t
132-
result := C.epoch_aeron_poll(
133-
t.handle,
134-
(*C.uint8_t)(unsafe.Pointer(&frameBuf[0])),
135-
C.size_t(max),
136-
&count,
137-
(*C.char)(unsafe.Pointer(&errBuf[0])),
138-
C.size_t(len(errBuf)),
139-
)
173+
result, count := aeronPoll(t.handle, frameBuf, max, errBuf)
140174
if result < 0 {
141175
panic(fmt.Errorf("aeron poll failed: %s", trimCString(errBuf)))
142176
}
143-
out := make([]Message, 0, int(count))
144-
for i := 0; i < int(count); i++ {
177+
out := make([]Message, 0, count)
178+
for i := 0; i < count; i++ {
145179
offset := i * aeronFrameLength
146180
message, err := decodeAeronFrame(frameBuf[offset : offset+aeronFrameLength])
147181
if err != nil {
@@ -158,7 +192,7 @@ func (t *AeronTransport) Close() {
158192
}
159193
t.closed = true
160194
if t.handle != nil {
161-
C.epoch_aeron_close(t.handle)
195+
aeronClose(t.handle)
162196
t.handle = nil
163197
}
164198
}
@@ -167,20 +201,11 @@ func (t *AeronTransport) Stats() AeronStats {
167201
if t.handle == nil {
168202
return AeronStats{}
169203
}
170-
var stats C.epoch_aeron_stats_t
171-
if C.epoch_aeron_stats(t.handle, &stats) < 0 {
204+
var stats AeronStats
205+
if aeronStats(t.handle, &stats) < 0 {
172206
return AeronStats{}
173207
}
174-
return AeronStats{
175-
SentCount: int64(stats.sent_count),
176-
ReceivedCount: int64(stats.received_count),
177-
OfferBackPressure: int64(stats.offer_back_pressure),
178-
OfferNotConnected: int64(stats.offer_not_connected),
179-
OfferAdminAction: int64(stats.offer_admin_action),
180-
OfferClosed: int64(stats.offer_closed),
181-
OfferMaxPosition: int64(stats.offer_max_position),
182-
OfferFailed: int64(stats.offer_failed),
183-
}
208+
return stats
184209
}
185210

186211
func trimCString(buffer []byte) string {

0 commit comments

Comments
 (0)