@@ -13,11 +13,13 @@ use postgres_stream::types::{EventIdentifier, PgLsn, StreamId, TriggeredEvent};
1313use uuid:: Uuid ;
1414
1515/// Creates a test event with the given payload key.
16+ /// Includes an `id` field in payload (as users would do via payload_extensions).
1617fn make_test_event ( key : & str ) -> TriggeredEvent {
18+ let id = Uuid :: new_v4 ( ) . to_string ( ) ;
1719 TriggeredEvent {
18- id : EventIdentifier :: new ( Uuid :: new_v4 ( ) . to_string ( ) , Utc :: now ( ) ) ,
20+ id : EventIdentifier :: new ( id . clone ( ) , Utc :: now ( ) ) ,
1921 stream_id : StreamId :: default ( ) ,
20- payload : serde_json:: json!( { "key" : key, "value" : "test_data" } ) ,
22+ payload : serde_json:: json!( { "id" : id , " key": key, "value" : "test_data" } ) ,
2123 metadata : Some ( serde_json:: json!( { "source" : "test" } ) ) ,
2224 lsn : Some ( PgLsn :: from ( 12345u64 ) ) ,
2325 }
@@ -109,14 +111,15 @@ async fn test_meilisearch_sink_indexes_only_payload() {
109111 . expect ( "Failed to create sink" ) ;
110112
111113 // Create event with metadata.
114+ // Include `id` in payload (as users would do via payload_extensions).
115+ let event_id = Uuid :: new_v4 ( ) . to_string ( ) ;
112116 let event = TriggeredEvent {
113- id : EventIdentifier :: new ( Uuid :: new_v4 ( ) . to_string ( ) , Utc :: now ( ) ) ,
117+ id : EventIdentifier :: new ( event_id . clone ( ) , Utc :: now ( ) ) ,
114118 stream_id : StreamId :: default ( ) ,
115- payload : serde_json:: json!( { "action" : "created" , "user_id" : 456 } ) ,
119+ payload : serde_json:: json!( { "id" : event_id , " action": "created" , "user_id" : 456 } ) ,
116120 metadata : Some ( serde_json:: json!( { "source" : "api" } ) ) ,
117121 lsn : Some ( PgLsn :: from ( 99999u64 ) ) ,
118122 } ;
119- let event_id = event. id . id . clone ( ) ;
120123
121124 sink. publish_events ( vec ! [ event] )
122125 . await
@@ -131,10 +134,10 @@ async fn test_meilisearch_sink_indexes_only_payload() {
131134 . await
132135 . expect ( "Failed to get document" ) ;
133136
134- // Only payload fields + injected id should be present .
137+ // Only payload fields should be present ( id from payload, not injected) .
135138 assert_eq ! ( doc[ "action" ] , "created" ) ;
136139 assert_eq ! ( doc[ "user_id" ] , 456 ) ;
137- assert_eq ! ( doc[ "id" ] , event_id) ; // Injected for primary key .
140+ assert_eq ! ( doc[ "id" ] , event_id) ; // From payload, not injected .
138141 // No envelope fields.
139142 assert ! ( doc. get( "created_at" ) . is_none( ) ) ;
140143 assert ! ( doc. get( "metadata" ) . is_none( ) ) ;
@@ -206,14 +209,15 @@ async fn test_meilisearch_sink_uses_index_from_metadata() {
206209 . expect ( "Failed to create sink" ) ;
207210
208211 // Create event with index in metadata.
212+ // Include `id` in payload (as users would do via payload_extensions).
213+ let event_id = Uuid :: new_v4 ( ) . to_string ( ) ;
209214 let event = TriggeredEvent {
210- id : EventIdentifier :: new ( Uuid :: new_v4 ( ) . to_string ( ) , Utc :: now ( ) ) ,
215+ id : EventIdentifier :: new ( event_id . clone ( ) , Utc :: now ( ) ) ,
211216 stream_id : StreamId :: default ( ) ,
212- payload : serde_json:: json!( { "routed" : true } ) ,
217+ payload : serde_json:: json!( { "id" : event_id , " routed": true } ) ,
213218 metadata : Some ( serde_json:: json!( { "index" : metadata_index } ) ) ,
214219 lsn : None ,
215220 } ;
216- let event_id = event. id . id . clone ( ) ;
217221
218222 sink. publish_events ( vec ! [ event] )
219223 . await
0 commit comments