@@ -2271,20 +2271,293 @@ class Mqtt5ClientTests: XCBaseTestCase, @unchecked Sendable {
22712271 try await stopClient ( client: client, testContext: testContext)
22722272 }
22732273
2274- // /*
2275- // * [ManualPuback-UC3] Calling acquirePublishAcknowledgement() twice on the same QoS 1 PUBLISH returns nil
2276- // */
2277- // func testMqtt5ManualPubackAcquireDoubleCallReturnsNil() async throws {
2278-
2279- // /*
2280- // * [ManualPuback-UC4] Calling acquirePublishAcknowledgement() after the callback returns also returns nil
2281- // */
2282- // func testMqtt5ManualPubackAcquirePostCallbackReturnsNil() async throws {
2283-
2284- // /*
2285- // * [ManualPuback-UC5] acquirePublishAcknowledgement is nil for QoS 0 messages
2286- // */
2287-
2274+ /*
2275+ * [ManualPuback-UC3] Calling acquirePublishAcknowledgement() twice on the same QoS 1 PUBLISH returns nil
2276+ */
2277+ func testMqtt5ManualPubackAcquireDoubleCallReturnsNil( ) async throws {
2278+ try skipIfPlatformDoesntSupportTLS ( )
2279+ let inputHost = try getEnvironmentVarOrSkipTest (
2280+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_HOST " )
2281+ let inputCert = try getEnvironmentVarOrSkipTest (
2282+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_CERT " )
2283+ let inputKey = try getEnvironmentVarOrSkipTest (
2284+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_KEY " )
2285+
2286+ let tlsOptions = try TLSContextOptions . makeMTLS (
2287+ certificatePath: inputCert,
2288+ privateKeyPath: inputKey
2289+ )
2290+ let tlsContext = try TLSContext ( options: tlsOptions, mode: . client)
2291+
2292+ let clientId = createClientId ( )
2293+ let topic = " test/MQTT5_ManualPuback_Swift_ " + clientId
2294+ let payloadData = " Hello World " . data ( using: . utf8) !
2295+
2296+ // Expectation: callback completes with both acquire calls made.
2297+ let callbackDoneExpectation = XCTestExpectation (
2298+ description: " Publish callback completed both acquire calls " )
2299+
2300+ // Actor-protected state to record the results of the two acquire calls.
2301+ actor DoubleCallResult {
2302+ var firstHandleNonNil : Bool = false
2303+ var secondHandleNil : Bool = false
2304+ func set( firstNonNil: Bool , secondNil: Bool ) {
2305+ firstHandleNonNil = firstNonNil
2306+ secondHandleNil = secondNil
2307+ }
2308+ }
2309+ let result = DoubleCallResult ( )
2310+
2311+ let onPublishReceived : OnPublishReceived = { publishData in
2312+ // Both acquire calls must be made synchronously within the callback.
2313+ let firstHandle = publishData. acquirePublishAcknowledgement ? ( )
2314+ let secondHandle = publishData. acquirePublishAcknowledgement ? ( )
2315+
2316+ if let payloadString = publishData. publishPacket. payloadAsString ( ) {
2317+ print (
2318+ " ManualPubackDoubleCall Mqtt5ClientTests: onPublishReceived. "
2319+ + " Topic:' \( publishData. publishPacket. topic) ' "
2320+ + " QoS: \( publishData. publishPacket. qos) "
2321+ + " payload:' \( payloadString) ' " )
2322+ }
2323+
2324+ Task {
2325+ await result. set (
2326+ firstNonNil: firstHandle != nil ,
2327+ secondNil: secondHandle == nil )
2328+ callbackDoneExpectation. fulfill ( )
2329+ }
2330+ }
2331+
2332+ let connectOptions = MqttConnectOptions ( clientId: clientId)
2333+ let clientOptions = MqttClientOptions (
2334+ hostName: inputHost,
2335+ port: UInt32 ( 8883 ) ,
2336+ tlsCtx: tlsContext,
2337+ connectOptions: connectOptions)
2338+
2339+ let testContext = MqttTestContext (
2340+ contextName: " ManualPubackDoubleCall " ,
2341+ onPublishReceived: onPublishReceived)
2342+ let client = try createClient ( clientOptions: clientOptions, testContext: testContext)
2343+ try await connectClient ( client: client, testContext: testContext)
2344+
2345+ // Subscribe to the topic with QoS 1
2346+ let subscribePacket = SubscribePacket ( topicFilter: topic, qos: QoS . atLeastOnce, noLocal: false )
2347+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2348+ try await client. subscribe ( subscribePacket: subscribePacket)
2349+ }
2350+
2351+ // Publish a QoS 1 message
2352+ let publishPacket = PublishPacket (
2353+ qos: QoS . atLeastOnce, topic: topic, payload: payloadData)
2354+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2355+ try await client. publish ( publishPacket: publishPacket)
2356+ }
2357+
2358+ // Wait for the callback to complete both acquire calls
2359+ let callbackResult = await awaitExpectationResult ( [ callbackDoneExpectation] , 10 )
2360+ XCTAssertEqual ( callbackResult, . completed, " Timed out waiting for publish callback to complete " )
2361+
2362+ let firstNonNil = await result. firstHandleNonNil
2363+ let secondNil = await result. secondHandleNil
2364+ XCTAssertTrue (
2365+ firstNonNil,
2366+ " First call to acquirePublishAcknowledgement() should return a non-nil handle " )
2367+ XCTAssertTrue (
2368+ secondNil,
2369+ " Second call to acquirePublishAcknowledgement() on the same message should return nil " )
2370+
2371+ try await stopClient ( client: client, testContext: testContext)
2372+ }
2373+
2374+ /*
2375+ * [ManualPuback-UC4] Calling acquirePublishAcknowledgement() after the callback returns also returns nil
2376+ */
2377+ func testMqtt5ManualPubackAcquirePostCallbackReturnsNil( ) async throws {
2378+ try skipIfPlatformDoesntSupportTLS ( )
2379+ let inputHost = try getEnvironmentVarOrSkipTest (
2380+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_HOST " )
2381+ let inputCert = try getEnvironmentVarOrSkipTest (
2382+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_CERT " )
2383+ let inputKey = try getEnvironmentVarOrSkipTest (
2384+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_KEY " )
2385+
2386+ let tlsOptions = try TLSContextOptions . makeMTLS (
2387+ certificatePath: inputCert,
2388+ privateKeyPath: inputKey
2389+ )
2390+ let tlsContext = try TLSContext ( options: tlsOptions, mode: . client)
2391+
2392+ let clientId = createClientId ( )
2393+ let topic = " test/MQTT5_ManualPuback_Swift_ " + clientId
2394+ let payloadData = " Hello World " . data ( using: . utf8) !
2395+
2396+ // Expectation: callback has returned.
2397+ let callbackDoneExpectation = XCTestExpectation (
2398+ description: " Publish callback has returned " )
2399+
2400+ // Save the acquirePublishAcknowledgement closure so we can call it after the callback returns.
2401+ // The closure is saved synchronously in the callback body.
2402+ actor SavedAcquireFn {
2403+ var fn : ( @Sendable ( ) -> PublishAcknowledgementHandle ? ) ? = nil
2404+ func set( _ value: ( @Sendable ( ) -> PublishAcknowledgementHandle ? ) ? ) { fn = value }
2405+ }
2406+ let savedAcquireFn = SavedAcquireFn ( )
2407+
2408+ let onPublishReceived : OnPublishReceived = { publishData in
2409+ // Save the closure synchronously but do NOT call it.
2410+ // The closure itself becomes invalid once the callback returns.
2411+ let acquireFn = publishData. acquirePublishAcknowledgement
2412+
2413+ if let payloadString = publishData. publishPacket. payloadAsString ( ) {
2414+ print (
2415+ " ManualPubackPostCallback Mqtt5ClientTests: onPublishReceived. "
2416+ + " Topic:' \( publishData. publishPacket. topic) ' "
2417+ + " QoS: \( publishData. publishPacket. qos) "
2418+ + " payload:' \( payloadString) ' " )
2419+ }
2420+
2421+ Task {
2422+ await savedAcquireFn. set ( acquireFn)
2423+ callbackDoneExpectation. fulfill ( )
2424+ // Callback has now returned, acquirePublishAcknowledgement should return nil if called
2425+ }
2426+ }
2427+
2428+ let connectOptions = MqttConnectOptions ( clientId: clientId)
2429+ let clientOptions = MqttClientOptions (
2430+ hostName: inputHost,
2431+ port: UInt32 ( 8883 ) ,
2432+ tlsCtx: tlsContext,
2433+ connectOptions: connectOptions)
2434+
2435+ let testContext = MqttTestContext (
2436+ contextName: " ManualPubackPostCallback " ,
2437+ onPublishReceived: onPublishReceived)
2438+ let client = try createClient ( clientOptions: clientOptions, testContext: testContext)
2439+ try await connectClient ( client: client, testContext: testContext)
2440+
2441+ // Subscribe to the topic with QoS 1
2442+ let subscribePacket = SubscribePacket ( topicFilter: topic, qos: QoS . atLeastOnce, noLocal: false )
2443+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2444+ try await client. subscribe ( subscribePacket: subscribePacket)
2445+ }
2446+
2447+ // Publish a QoS 1 message
2448+ let publishPacket = PublishPacket (
2449+ qos: QoS . atLeastOnce, topic: topic, payload: payloadData)
2450+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2451+ try await client. publish ( publishPacket: publishPacket)
2452+ }
2453+
2454+ // Wait for the callback to complete
2455+ let callbackResult = await awaitExpectationResult ( [ callbackDoneExpectation] , 10 )
2456+ XCTAssertEqual ( callbackResult, . completed, " Timed out waiting for publish callback to complete " )
2457+
2458+ // Give the callback Task a moment to fully return before calling acquirePublishAcknowledgement
2459+ try await Task . sleep ( nanoseconds: 100_000_000 ) // 100ms
2460+
2461+ // Now call acquirePublishAcknowledgement() after the callback has returned
2462+ let acquireFn = await savedAcquireFn. fn
2463+ XCTAssertNotNil ( acquireFn, " acquirePublishAcknowledgement closure should have been saved " )
2464+ let lateHandle = acquireFn ? ( )
2465+ XCTAssertNil (
2466+ lateHandle,
2467+ " acquirePublishAcknowledgement() should return nil after the callback has returned " )
2468+
2469+ try await stopClient ( client: client, testContext: testContext)
2470+ }
2471+
2472+ /*
2473+ * [ManualPuback-UC5] acquirePublishAcknowledgement is nil for QoS 0 messages
2474+ */
2475+ func testMqtt5ManualPubackQoS0AcquireIsNil( ) async throws {
2476+ try skipIfPlatformDoesntSupportTLS ( )
2477+ let inputHost = try getEnvironmentVarOrSkipTest (
2478+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_HOST " )
2479+ let inputCert = try getEnvironmentVarOrSkipTest (
2480+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_CERT " )
2481+ let inputKey = try getEnvironmentVarOrSkipTest (
2482+ environmentVarName: " AWS_TEST_MQTT5_IOT_CORE_RSA_KEY " )
2483+
2484+ let tlsOptions = try TLSContextOptions . makeMTLS (
2485+ certificatePath: inputCert,
2486+ privateKeyPath: inputKey
2487+ )
2488+ let tlsContext = try TLSContext ( options: tlsOptions, mode: . client)
2489+
2490+ let clientId = createClientId ( )
2491+ let topic = " test/MQTT5_ManualPuback_Swift_ " + clientId
2492+ let payloadData = " Hello World " . data ( using: . utf8) !
2493+
2494+ // Expectation: callback completes.
2495+ let callbackDoneExpectation = XCTestExpectation (
2496+ description: " Publish callback completed " )
2497+
2498+ // Actor-protected state to record whether acquirePublishAcknowledgement was nil.
2499+ actor AcquireResult {
2500+ var acquirePropertyWasNil : Bool = false
2501+ func set( _ value: Bool ) { acquirePropertyWasNil = value }
2502+ }
2503+ let acquireResult = AcquireResult ( )
2504+
2505+ let onPublishReceived : OnPublishReceived = { publishData in
2506+ // For QoS 0, acquirePublishAcknowledgement should be nil.
2507+ let isNil = publishData. acquirePublishAcknowledgement == nil
2508+
2509+ if let payloadString = publishData. publishPacket. payloadAsString ( ) {
2510+ print (
2511+ " ManualPubackQoS0 Mqtt5ClientTests: onPublishReceived. "
2512+ + " Topic:' \( publishData. publishPacket. topic) ' "
2513+ + " QoS: \( publishData. publishPacket. qos) "
2514+ + " payload:' \( payloadString) ' " )
2515+ }
2516+
2517+ Task {
2518+ await acquireResult. set ( isNil)
2519+ callbackDoneExpectation. fulfill ( )
2520+ }
2521+ }
2522+
2523+ let connectOptions = MqttConnectOptions ( clientId: clientId)
2524+ let clientOptions = MqttClientOptions (
2525+ hostName: inputHost,
2526+ port: UInt32 ( 8883 ) ,
2527+ tlsCtx: tlsContext,
2528+ connectOptions: connectOptions)
2529+
2530+ let testContext = MqttTestContext (
2531+ contextName: " ManualPubackQoS0 " ,
2532+ onPublishReceived: onPublishReceived)
2533+ let client = try createClient ( clientOptions: clientOptions, testContext: testContext)
2534+ try await connectClient ( client: client, testContext: testContext)
2535+
2536+ // Subscribe with QoS 1 (so the broker delivers at QoS 0 downgraded from our sub)
2537+ let subscribePacket = SubscribePacket ( topicFilter: topic, qos: QoS . atLeastOnce, noLocal: false )
2538+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2539+ try await client. subscribe ( subscribePacket: subscribePacket)
2540+ }
2541+
2542+ // Publish at QoS 0 — there is no PUBACK involved
2543+ let publishPacket = PublishPacket (
2544+ qos: QoS . atMostOnce, topic: topic, payload: payloadData)
2545+ _ = try await withTimeout ( client: client, seconds: 5 ) {
2546+ try await client. publish ( publishPacket: publishPacket)
2547+ }
2548+
2549+ // Wait for the callback to complete (with timeout to prevent hanging)
2550+ let callbackResult = await awaitExpectationResult ( [ callbackDoneExpectation] , 10 )
2551+ XCTAssertEqual ( callbackResult, . completed, " Timed out waiting for publish callback to complete " )
2552+
2553+ let wasNil = await acquireResult. acquirePropertyWasNil
2554+ XCTAssertTrue (
2555+ wasNil,
2556+ " acquirePublishAcknowledgement should be nil for QoS 0 messages " )
2557+
2558+ try await stopClient ( client: client, testContext: testContext)
2559+ }
2560+
22882561 /*===============================================================
22892562 RETAIN TESTS
22902563 =================================================================*/
0 commit comments