@@ -2079,11 +2079,12 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
20792079
20802080 let onPublishReceived : OnPublishReceived = { publishData in
20812081 let receivedPayload = publishData. publishPacket. payload ?? Data ( )
2082+ // Acquire the handle synchronously within the callback before the callback returns
2083+ let handle = publishData. acquirePublishAcknowledgement ? ( )
20822084 Task {
20832085 let isFirst = await !state. firstDelivered
20842086 if isFirst {
2085- // First delivery: acquire manual publish acknowledgement control to hold the acknowledgement
2086- let handle = publishData. acquirePublishAcknowledgement ? ( )
2087+ // First delivery: hold the acknowledgement by keeping the handle
20872088 await state. setFirstDelivery ( payload: receivedPayload, handle: handle)
20882089 await firstDeliverySemaphore. signal ( )
20892090 } else {
@@ -2195,11 +2196,12 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
21952196
21962197 let onPublishReceived : OnPublishReceived = { publishData in
21972198 let receivedPayload = publishData. publishPacket. payload ?? Data ( )
2199+ // Acquire the handle synchronously within the callback before the callback returns
2200+ let handle = publishData. acquirePublishAcknowledgement ? ( )
21982201 Task {
21992202 let isFirst = await !state. firstDelivered
22002203 if isFirst {
22012204 // First delivery: acquire manual PUBACK control
2202- let handle = publishData. acquirePublishAcknowledgement ? ( )
22032205 await state. setFirstDelivered ( handle: handle)
22042206 await firstDeliverySemaphore. signal ( )
22052207 } else if receivedPayload == payloadData {
@@ -2303,17 +2305,17 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
23032305 let callbackDoneSemaphore = TestSemaphore ( value: 0 )
23042306
23052307 let onPublishReceived : OnPublishReceived = { publishData in
2308+ // Both acquire calls must be made synchronously within the callback (before it returns),
2309+ // as required by the acquirePublishAcknowledgement contract.
2310+ let firstHandle = publishData. acquirePublishAcknowledgement ? ( )
2311+ let secondHandle = publishData. acquirePublishAcknowledgement ? ( )
23062312 Task {
2307- // First call should succeed and return a non-nil handle
2308- guard let firstHandle = publishData. acquirePublishAcknowledgement ? ( ) else {
2313+ guard firstHandle != nil else {
23092314 await doubleCallResult. set ( " first_call_failed " )
23102315 await callbackDoneSemaphore. signal ( )
23112316 return
23122317 }
2313- _ = firstHandle // suppress unused warning
2314-
23152318 // Second call on the same message should return nil (already acquired)
2316- let secondHandle = publishData. acquirePublishAcknowledgement ? ( )
23172319 if secondHandle == nil {
23182320 await doubleCallResult. set ( " double_call_returned_nil " )
23192321 } else {
@@ -2347,7 +2349,15 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
23472349 try await client. publish ( publishPacket: publishPacket)
23482350 }
23492351
2350- await callbackDoneSemaphore. wait ( )
2352+ let callbackCompleted = await withTaskGroup ( of: Bool . self) { group in
2353+ group. addTask { await callbackDoneSemaphore. wait ( ) ; return true }
2354+ group. addTask { try ? await Task . sleep ( nanoseconds: 10_000_000_000 ) ; return false }
2355+ let completed = await group. next ( ) !
2356+ group. cancelAll ( )
2357+ return completed
2358+ }
2359+ XCTAssertTrue ( callbackCompleted, " Timed out waiting for publish callback to complete " )
2360+
23512361 let result = await doubleCallResult. result
23522362 XCTAssertEqual (
23532363 result, " double_call_returned_nil " ,
@@ -2421,8 +2431,15 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
24212431 try await client. publish ( publishPacket: publishPacket)
24222432 }
24232433
2424- // Wait for the callback to complete
2425- await callbackDoneSemaphore. wait ( )
2434+ // Wait for the callback to complete (with timeout to prevent hanging in CI)
2435+ let callbackCompleted = await withTaskGroup ( of: Bool . self) { group in
2436+ group. addTask { await callbackDoneSemaphore. wait ( ) ; return true }
2437+ group. addTask { try ? await Task . sleep ( nanoseconds: 10_000_000_000 ) ; return false }
2438+ let completed = await group. next ( ) !
2439+ group. cancelAll ( )
2440+ return completed
2441+ }
2442+ XCTAssertTrue ( callbackCompleted, " Timed out waiting for publish callback to complete " )
24262443
24272444 // Give the callback Task a moment to fully return before we call acquirePublishAcknowledgement
24282445 try await Task . sleep ( nanoseconds: 100_000_000 ) // 100ms
@@ -2506,7 +2523,15 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
25062523 try await client. publish ( publishPacket: publishPacket)
25072524 }
25082525
2509- await callbackDoneSemaphore. wait ( )
2526+ let callbackCompleted = await withTaskGroup ( of: Bool . self) { group in
2527+ group. addTask { await callbackDoneSemaphore. wait ( ) ; return true }
2528+ group. addTask { try ? await Task . sleep ( nanoseconds: 10_000_000_000 ) ; return false }
2529+ let completed = await group. next ( ) !
2530+ group. cancelAll ( )
2531+ return completed
2532+ }
2533+ XCTAssertTrue ( callbackCompleted, " Timed out waiting for publish callback to complete " )
2534+
25102535 let wasNil = await acquireResult. acquirePropertyWasNil
25112536 XCTAssertTrue ( wasNil, " acquirePublishAcknowledgement should be nil for QoS 0 messages " )
25122537
0 commit comments