@@ -14,6 +14,8 @@ use std::error::Error;
1414use uuid:: Uuid ;
1515
1616pub ( crate ) async fn get_expires ( ) -> Result < Vec < BusEventJob > , Box < dyn Error + Send + Sync > > {
17+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
18+
1719 let query = Statement :: from_sql_and_values (
1820 sea_orm:: DatabaseBackend :: MySql ,
1921 format ! (
@@ -26,8 +28,7 @@ pub(crate) async fn get_expires() -> Result<Vec<BusEventJob>, Box<dyn Error + Se
2628 AND expires_at > ?
2729 ORDER BY expires_at ASC
2830 LIMIT 10
29- "# ,
30- table_name = get_queue_config( ) ?. connection( ) . table_name( ) . to_string( )
31+ "#
3132 ) ,
3233 vec ! [
3334 Value :: String ( Some ( Box :: new( EventStatusEnum :: Processing . to_string( ) ) ) ) ,
@@ -70,6 +71,8 @@ pub(crate) async fn get_expires() -> Result<Vec<BusEventJob>, Box<dyn Error + Se
7071pub ( crate ) async fn get_id_status (
7172 status : EventStatusEnum ,
7273) -> Result < Vec < IdArchiveJob > , Box < dyn Error + Send + Sync > > {
74+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
75+
7376 get_db_conn ( ) ?
7477 . query_all ( Statement :: from_sql_and_values (
7578 sea_orm:: DatabaseBackend :: MySql ,
@@ -79,8 +82,7 @@ pub(crate) async fn get_id_status(
7982 FROM {table_name}
8083 WHERE status = ?
8184 LIMIT 10
82- "# ,
83- table_name = get_queue_config( ) ?. connection( ) . table_name( ) . to_string( )
85+ "#
8486 ) ,
8587 vec ! [ Value :: String ( Some ( Box :: new( status. to_string( ) ) ) ) ] ,
8688 ) )
@@ -104,13 +106,12 @@ pub(crate) async fn get_id_status(
104106}
105107
106108pub ( crate ) async fn delete ( id : Uuid ) -> Result < ( ) , Box < dyn Error + Send + Sync > > {
109+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
110+
107111 get_db_conn ( ) ?
108112 . execute ( Statement :: from_sql_and_values (
109113 DbBackend :: MySql ,
110- format ! (
111- r#"DELETE FROM {table_name} WHERE id = ?;"# ,
112- table_name = get_queue_config( ) ?. connection( ) . table_name( ) . to_string( )
113- ) ,
114+ format ! ( r#"DELETE FROM {table_name} WHERE id = ?;"# ) ,
114115 vec ! [ Value :: Bytes ( Some ( Box :: new( id. as_bytes( ) . to_vec( ) ) ) ) ] ,
115116 ) )
116117 . await ?;
@@ -122,6 +123,8 @@ pub(crate) async fn update_scheduled_at(
122123 scheduled_at : chrono:: Duration ,
123124 err : Box < dyn Error + Send + Sync > ,
124125) -> Result < ( ) , Box < dyn Error + Send + Sync > > {
126+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
127+
125128 get_db_conn ( ) ?
126129 . execute ( Statement :: from_sql_and_values (
127130 DbBackend :: MySql ,
@@ -135,8 +138,7 @@ pub(crate) async fn update_scheduled_at(
135138 should_start_at = ?,
136139 updated_at = (NOW() AT TIME ZONE 'UTC')
137140 WHERE id = ?
138- "# ,
139- table_name = get_queue_config( ) ?. connection( ) . table_name( ) . to_string( )
141+ "#
140142 ) ,
141143 vec ! [
142144 Value :: String ( Some ( Box :: new( EventStatusEnum :: Pending . to_string( ) ) ) ) ,
@@ -154,11 +156,8 @@ pub(crate) async fn update_scheduled_at(
154156pub ( crate ) async fn archive ( id : Uuid ) -> Result < ( ) , BusError > {
155157 let db = get_db_conn ( ) ?;
156158 let txn = db. begin ( ) . await ?;
157- let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) . to_string ( ) ;
158- let table_name_archive = get_queue_config ( ) ?
159- . connection ( )
160- . table_name_archive ( )
161- . to_string ( ) ;
159+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
160+ let table_name_archive = get_queue_config ( ) ?. connection ( ) . table_name_archive ( ) ;
162161
163162 let select_stmt = Statement :: from_sql_and_values (
164163 DbBackend :: MySql ,
@@ -169,8 +168,7 @@ pub(crate) async fn archive(id: Uuid) -> Result<(), BusError> {
169168 status, payload_json, payload_bin, latest_error, updated_at
170169 FROM {table_name}
171170 WHERE id = ?
172- "# ,
173- table_name = table_name
171+ "#
174172 ) ,
175173 vec ! [ Value :: Bytes ( Some ( Box :: new( id. as_bytes( ) . to_vec( ) ) ) ) ] ,
176174 ) ;
@@ -217,12 +215,11 @@ pub(crate) async fn archive(id: Uuid) -> Result<(), BusError> {
217215 DbBackend :: MySql ,
218216 format ! (
219217 r#"
220- INSERT INTO {table_name } (
218+ INSERT INTO {table_name_archive } (
221219 id, queue_name, type_name_event, type_name_handler,
222220 status, payload_json, payload_bin, latest_error, created_at
223221 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
224- "# ,
225- table_name = table_name_archive
222+ "#
226223 ) ,
227224 vec ! [
228225 Value :: Bytes ( Some ( Box :: new( event_archive. id. as_bytes( ) . to_vec( ) ) ) ) ,
@@ -241,10 +238,7 @@ pub(crate) async fn archive(id: Uuid) -> Result<(), BusError> {
241238
242239 let delete_stmt = Statement :: from_sql_and_values (
243240 DbBackend :: MySql ,
244- format ! (
245- r#"DELETE FROM {table_name} WHERE id = $1"# ,
246- table_name = table_name
247- ) ,
241+ format ! ( r#"DELETE FROM {table_name} WHERE id = $1"# ) ,
248242 vec ! [ Value :: Bytes ( Some ( Box :: new( id. as_bytes( ) . to_vec( ) ) ) ) ] ,
249243 ) ;
250244
@@ -258,13 +252,14 @@ pub(crate) async fn change_status(
258252 status : EventStatusEnum ,
259253 err : Option < Box < dyn Error + Send + Sync > > ,
260254) -> Result < ( ) , BusError > {
255+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
256+
261257 let mut sql = format ! (
262258 r#"
263259 UPDATE {table_name}
264260 SET status = ?,
265261 updated_at = (NOW() AT TIME ZONE 'UTC')
266- "# ,
267- table_name = get_queue_config( ) ?. connection( ) . table_name( ) . to_string( )
262+ "#
268263 ) ;
269264
270265 let mut values: Vec < Value > = vec ! [ Value :: String ( Some ( Box :: new( status. to_string( ) ) ) ) ] ;
@@ -290,7 +285,7 @@ pub(crate) async fn get_queues_items(
290285) -> Result < Vec < BusEventJob > , BusError > {
291286 let now = Utc :: now ( ) . naive_utc ( ) ;
292287 let limit = queue_config. batch_size ( ) ;
293- let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) . to_string ( ) ;
288+ let table_name = get_queue_config ( ) ?. connection ( ) . table_name ( ) ;
294289 let txn = get_db_conn ( ) ?. begin ( ) . await ?;
295290
296291 let select_ids_stmt = Statement :: from_sql_and_values (
@@ -305,8 +300,7 @@ pub(crate) async fn get_queues_items(
305300 ORDER BY should_start_at ASC
306301 LIMIT ?
307302 FOR UPDATE SKIP LOCKED
308- "# ,
309- table_name = table_name
303+ "#
310304 ) ,
311305 vec ! [
312306 Value :: String ( Some ( Box :: new( queue_config. queue_name( ) . to_string( ) ) ) ) ,
@@ -352,9 +346,7 @@ pub(crate) async fn get_queues_items(
352346 expires_at = DATE_ADD(UTC_TIMESTAMP(), INTERVAL expires_interval SECOND),
353347 updated_at = UTC_TIMESTAMP()
354348 WHERE id IN ({placeholders})
355- "# ,
356- table_name = table_name,
357- placeholders = placeholders
349+ "#
358350 ) ,
359351 update_values,
360352 ) ;
@@ -383,9 +375,7 @@ pub(crate) async fn get_queues_items(
383375 expires_interval
384376 FROM {table_name}
385377 WHERE id IN ({placeholders})
386- "# ,
387- table_name = table_name,
388- placeholders = placeholders
378+ "#
389379 ) ,
390380 select_values,
391381 ) ;
@@ -425,6 +415,7 @@ pub(crate) async fn insert_to_events(
425415 bus_event : BusEvent ,
426416) -> Result < ( ) , BusError > {
427417 let db_config = get_queue_config ( ) ?;
418+ let table_name = db_config. connection ( ) . table_name ( ) ;
428419
429420 let payload_json_value = {
430421 #[ cfg( feature = "json-payload" ) ]
@@ -447,8 +438,7 @@ pub(crate) async fn insert_to_events(
447438 archive_mode, should_start_at, expires_at, expires_interval, created_at, updated_at
448439 )
449440 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
450- "# ,
451- table_name = db_config. connection( ) . table_name( ) . to_string( )
441+ "#
452442 ) ,
453443 vec ! [
454444 Value :: Bytes ( Some ( Box :: new( bus_event. id. as_bytes( ) . to_vec( ) ) ) ) ,
0 commit comments