1use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, *};
9use chrono::{DateTime, Utc};
10use eyre::Result;
11use std::sync::Arc;
12use tracing::{debug, error, info, instrument, warn};
13
14use crate::{
15 constants::{FINAL_TRANSACTION_STATUSES, WORKER_TRANSACTION_CLEANUP_RETRIES},
16 jobs::handle_result,
17 models::{DefaultAppState, RelayerRepoModel, TransactionRepoModel},
18 repositories::{Repository, TransactionRepository},
19};
20
21const MAX_CONCURRENT_RELAYERS: usize = 10;
23
24const MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER: usize = 50;
26
27#[instrument(
44 level = "debug",
45 skip(job, data),
46 fields(
47 job_type = "transaction_cleanup",
48 attempt = %attempt.current(),
49 ),
50 err
51)]
52pub async fn transaction_cleanup_handler(
53 job: TransactionCleanupCronReminder,
54 data: Data<ThinData<DefaultAppState>>,
55 attempt: Attempt,
56) -> Result<(), Error> {
57 let result = handle_request(job, data, attempt.clone()).await;
58
59 handle_result(
60 result,
61 attempt,
62 "TransactionCleanup",
63 WORKER_TRANSACTION_CLEANUP_RETRIES,
64 )
65}
66
67#[derive(Default, Debug, Clone)]
69pub struct TransactionCleanupCronReminder();
70
71async fn handle_request(
81 _job: TransactionCleanupCronReminder,
82 data: Data<ThinData<DefaultAppState>>,
83 _attempt: Attempt,
84) -> Result<()> {
85 let now = Utc::now();
86 info!(
87 timestamp = %now.to_rfc3339(),
88 "executing transaction cleanup from storage"
89 );
90
91 let transaction_repo = data.transaction_repository();
92 let relayer_repo = data.relayer_repository();
93
94 let relayers = relayer_repo.list_all().await.map_err(|e| {
96 error!(
97 error = %e,
98 "failed to fetch relayers for cleanup"
99 );
100 eyre::eyre!("Failed to fetch relayers: {}", e)
101 })?;
102
103 info!(
104 relayer_count = relayers.len(),
105 "found relayers to process for cleanup"
106 );
107
108 let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
110
111 report_cleanup_results(cleanup_results).await
113}
114
115async fn process_relayers_in_batches(
125 relayers: Vec<RelayerRepoModel>,
126 transaction_repo: Arc<impl TransactionRepository>,
127 now: DateTime<Utc>,
128) -> Vec<RelayerCleanupResult> {
129 use futures::stream::{self, StreamExt};
130
131 let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
133 .map(|relayer| {
134 let repo_clone = Arc::clone(&transaction_repo);
135 async move { process_single_relayer(relayer, repo_clone, now).await }
136 })
137 .buffer_unordered(MAX_CONCURRENT_RELAYERS)
138 .collect()
139 .await;
140
141 results
142}
143
144#[derive(Debug)]
146struct RelayerCleanupResult {
147 relayer_id: String,
148 cleaned_count: usize,
149 error: Option<String>,
150}
151
152async fn process_single_relayer(
162 relayer: RelayerRepoModel,
163 transaction_repo: Arc<impl TransactionRepository>,
164 now: DateTime<Utc>,
165) -> RelayerCleanupResult {
166 debug!(
167 relayer_id = %relayer.id,
168 "processing cleanup for relayer"
169 );
170
171 match fetch_final_transactions(&relayer.id, &transaction_repo).await {
172 Ok(final_transactions) => {
173 debug!(
174 transaction_count = final_transactions.len(),
175 relayer_id = %relayer.id,
176 "found transactions with final statuses"
177 );
178
179 let cleaned_count = process_transactions_for_cleanup(
180 final_transactions,
181 &transaction_repo,
182 &relayer.id,
183 now,
184 )
185 .await;
186
187 if cleaned_count > 0 {
188 info!(
189 cleaned_count,
190 relayer_id = %relayer.id,
191 "cleaned up expired transactions"
192 );
193 }
194
195 RelayerCleanupResult {
196 relayer_id: relayer.id,
197 cleaned_count,
198 error: None,
199 }
200 }
201 Err(e) => {
202 error!(
203 error = %e,
204 relayer_id = %relayer.id,
205 "failed to fetch final transactions"
206 );
207 RelayerCleanupResult {
208 relayer_id: relayer.id,
209 cleaned_count: 0,
210 error: Some(e.to_string()),
211 }
212 }
213 }
214}
215
216async fn fetch_final_transactions(
225 relayer_id: &str,
226 transaction_repo: &Arc<impl TransactionRepository>,
227) -> Result<Vec<TransactionRepoModel>> {
228 transaction_repo
229 .find_by_status(relayer_id, FINAL_TRANSACTION_STATUSES)
230 .await
231 .map_err(|e| {
232 eyre::eyre!(
233 "Failed to fetch final transactions for relayer {}: {}",
234 relayer_id,
235 e
236 )
237 })
238}
239
240async fn process_transactions_for_cleanup(
254 transactions: Vec<TransactionRepoModel>,
255 transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
256 relayer_id: &str,
257 now: DateTime<Utc>,
258) -> usize {
259 use futures::stream::{self, StreamExt};
260
261 if transactions.is_empty() {
262 return 0;
263 }
264
265 debug!(
266 transaction_count = transactions.len(),
267 relayer_id = %relayer_id,
268 "processing transactions in parallel"
269 );
270
271 let expired_transactions: Vec<TransactionRepoModel> = transactions
273 .into_iter()
274 .filter(|tx| should_delete_transaction(tx, now))
275 .collect();
276
277 if expired_transactions.is_empty() {
278 debug!(
279 relayer_id = %relayer_id,
280 "no expired transactions found"
281 );
282 return 0;
283 }
284
285 debug!(
286 expired_count = expired_transactions.len(),
287 relayer_id = %relayer_id,
288 "found expired transactions to delete"
289 );
290
291 let deletion_results: Vec<bool> = stream::iter(expired_transactions)
293 .map(|transaction| {
294 let repo_clone = Arc::clone(transaction_repo);
295 let relayer_id = relayer_id.to_string();
296 async move {
297 match delete_expired_transaction(&transaction, &repo_clone, &relayer_id).await {
298 Ok(()) => true,
299 Err(e) => {
300 error!(
301 tx_id = %transaction.id,
302 error = %e,
303 "failed to delete expired transaction"
304 );
305 false
306 }
307 }
308 }
309 })
310 .buffer_unordered(MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER)
311 .collect()
312 .await;
313
314 let cleaned_count = deletion_results.iter().filter(|&&success| success).count();
316
317 debug!(
318 cleaned_count,
319 total_attempted = deletion_results.len(),
320 relayer_id = %relayer_id,
321 "successfully deleted expired transactions"
322 );
323
324 cleaned_count
325}
326
327fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
336 transaction
337 .delete_at
338 .as_ref()
339 .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
340 .map(|delete_at| {
341 let is_expired = now >= delete_at.with_timezone(&Utc);
342 if is_expired {
343 debug!(
344 tx_id = %transaction.id,
345 expired_at = %delete_at.to_rfc3339(),
346 "transaction is expired"
347 );
348 }
349 is_expired
350 })
351 .unwrap_or_else(|| {
352 if transaction.delete_at.is_some() {
353 warn!(
354 tx_id = %transaction.id,
355 "transaction has invalid delete_at timestamp"
356 );
357 }
358 false
359 })
360}
361
362async fn delete_expired_transaction(
372 transaction: &TransactionRepoModel,
373 transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
374 relayer_id: &str,
375) -> Result<()> {
376 if !FINAL_TRANSACTION_STATUSES.contains(&transaction.status) {
378 return Err(eyre::eyre!(
379 "Transaction {} is not in a final state (current: {:?})",
380 transaction.id,
381 transaction.status
382 ));
383 }
384
385 debug!(
386 tx_id = %transaction.id,
387 status = ?transaction.status,
388 relayer_id = %relayer_id,
389 "deleting expired transaction"
390 );
391
392 transaction_repo
393 .delete_by_id(transaction.id.clone())
394 .await
395 .map_err(|e| eyre::eyre!("Failed to delete transaction {}: {}", transaction.id, e))?;
396
397 info!(
398 tx_id = %transaction.id,
399 status = ?transaction.status,
400 relayer_id = %relayer_id,
401 "successfully deleted expired transaction"
402 );
403
404 Ok(())
405}
406
407async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
415 let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
416 let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
417 let total_relayers = cleanup_results.len();
418
419 for result in &cleanup_results {
421 if let Some(error) = &result.error {
422 error!(
423 relayer_id = %result.relayer_id,
424 error = %error,
425 "failed to cleanup transactions for relayer"
426 );
427 }
428 }
429
430 if total_errors > 0 {
431 warn!(
432 total_errors,
433 total_relayers, total_cleaned, "transaction cleanup completed with errors"
434 );
435
436 Err(eyre::eyre!(
439 "Cleanup completed with {} errors out of {} relayers",
440 total_errors,
441 total_relayers
442 ))
443 } else {
444 info!(
445 total_cleaned,
446 total_relayers, "transaction cleanup completed successfully"
447 );
448 Ok(())
449 }
450}
451
452#[cfg(test)]
453mod tests {
454
455 use super::*;
456 use crate::{
457 models::{
458 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
459 TransactionRepoModel, TransactionStatus,
460 },
461 repositories::{InMemoryTransactionRepository, Repository},
462 utils::mocks::mockutils::create_mock_transaction,
463 };
464 use chrono::{Duration, Utc};
465
466 fn create_test_transaction(
467 id: &str,
468 relayer_id: &str,
469 status: TransactionStatus,
470 delete_at: Option<String>,
471 ) -> TransactionRepoModel {
472 let mut tx = create_mock_transaction();
473 tx.id = id.to_string();
474 tx.relayer_id = relayer_id.to_string();
475 tx.status = status;
476 tx.delete_at = delete_at;
477 tx
478 }
479
480 #[tokio::test]
481 async fn test_should_delete_transaction_expired() {
482 let now = Utc::now();
483 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
484
485 let transaction = create_test_transaction(
486 "test-tx",
487 "test-relayer",
488 TransactionStatus::Confirmed,
489 Some(expired_delete_at),
490 );
491
492 assert!(should_delete_transaction(&transaction, now));
493 }
494
495 #[tokio::test]
496 async fn test_should_delete_transaction_not_expired() {
497 let now = Utc::now();
498 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
499
500 let transaction = create_test_transaction(
501 "test-tx",
502 "test-relayer",
503 TransactionStatus::Confirmed,
504 Some(future_delete_at),
505 );
506
507 assert!(!should_delete_transaction(&transaction, now));
508 }
509
510 #[tokio::test]
511 async fn test_should_delete_transaction_no_delete_at() {
512 let now = Utc::now();
513
514 let transaction = create_test_transaction(
515 "test-tx",
516 "test-relayer",
517 TransactionStatus::Confirmed,
518 None,
519 );
520
521 assert!(!should_delete_transaction(&transaction, now));
522 }
523
524 #[tokio::test]
525 async fn test_should_delete_transaction_invalid_timestamp() {
526 let now = Utc::now();
527
528 let transaction = create_test_transaction(
529 "test-tx",
530 "test-relayer",
531 TransactionStatus::Confirmed,
532 Some("invalid-timestamp".to_string()),
533 );
534
535 assert!(!should_delete_transaction(&transaction, now));
536 }
537
538 #[tokio::test]
539 async fn test_process_transactions_for_cleanup_parallel() {
540 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
541 let relayer_id = "test-relayer";
542 let now = Utc::now();
543
544 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
546 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
547
548 let expired_tx = create_test_transaction(
549 "expired-tx",
550 relayer_id,
551 TransactionStatus::Confirmed,
552 Some(expired_delete_at),
553 );
554 let future_tx = create_test_transaction(
555 "future-tx",
556 relayer_id,
557 TransactionStatus::Failed,
558 Some(future_delete_at),
559 );
560 let no_delete_tx = create_test_transaction(
561 "no-delete-tx",
562 relayer_id,
563 TransactionStatus::Canceled,
564 None,
565 );
566
567 transaction_repo.create(expired_tx.clone()).await.unwrap();
569 transaction_repo.create(future_tx.clone()).await.unwrap();
570 transaction_repo.create(no_delete_tx.clone()).await.unwrap();
571
572 let transactions = vec![expired_tx, future_tx, no_delete_tx];
573
574 let cleaned_count =
576 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
577 .await;
578
579 assert_eq!(cleaned_count, 1);
581
582 assert!(transaction_repo
584 .get_by_id("expired-tx".to_string())
585 .await
586 .is_err());
587
588 assert!(transaction_repo
590 .get_by_id("future-tx".to_string())
591 .await
592 .is_ok());
593 assert!(transaction_repo
594 .get_by_id("no-delete-tx".to_string())
595 .await
596 .is_ok());
597 }
598
599 #[tokio::test]
600 async fn test_delete_expired_transaction() {
601 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
602 let relayer_id = "test-relayer";
603
604 let transaction = create_test_transaction(
605 "test-tx",
606 relayer_id,
607 TransactionStatus::Confirmed, Some(Utc::now().to_rfc3339()),
609 );
610
611 transaction_repo.create(transaction.clone()).await.unwrap();
613
614 assert!(transaction_repo
616 .get_by_id("test-tx".to_string())
617 .await
618 .is_ok());
619
620 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
622 assert!(result.is_ok());
623
624 assert!(transaction_repo
626 .get_by_id("test-tx".to_string())
627 .await
628 .is_err());
629 }
630
631 #[tokio::test]
632 async fn test_delete_expired_transaction_validates_final_status() {
633 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
634 let relayer_id = "test-relayer";
635
636 let transaction = create_test_transaction(
637 "test-tx",
638 relayer_id,
639 TransactionStatus::Pending, Some(Utc::now().to_rfc3339()),
641 );
642
643 transaction_repo.create(transaction.clone()).await.unwrap();
645
646 assert!(transaction_repo
648 .get_by_id("test-tx".to_string())
649 .await
650 .is_ok());
651
652 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
654 assert!(result.is_err());
655
656 let error_message = result.unwrap_err().to_string();
657 assert!(error_message.contains("is not in a final state"));
658 assert!(error_message.contains("Pending"));
659
660 assert!(transaction_repo
662 .get_by_id("test-tx".to_string())
663 .await
664 .is_ok());
665 }
666
667 #[tokio::test]
668 async fn test_delete_expired_transaction_validates_all_final_statuses() {
669 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
670 let relayer_id = "test-relayer";
671
672 let final_statuses = [
674 TransactionStatus::Confirmed,
675 TransactionStatus::Failed,
676 TransactionStatus::Canceled,
677 TransactionStatus::Expired,
678 ];
679
680 for (i, status) in final_statuses.iter().enumerate() {
681 let tx_id = format!("test-tx-{}", i);
682 let transaction = create_test_transaction(
683 &tx_id,
684 relayer_id,
685 status.clone(),
686 Some(Utc::now().to_rfc3339()),
687 );
688
689 transaction_repo.create(transaction.clone()).await.unwrap();
691
692 let result =
694 delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
695 assert!(
696 result.is_ok(),
697 "Failed to delete transaction with status: {:?}",
698 status
699 );
700
701 assert!(transaction_repo.get_by_id(tx_id).await.is_err());
703 }
704 }
705
706 #[tokio::test]
707 async fn test_fetch_final_transactions() {
708 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
709 let relayer_id = "test-relayer";
710
711 let confirmed_tx = create_test_transaction(
713 "confirmed-tx",
714 relayer_id,
715 TransactionStatus::Confirmed,
716 None,
717 );
718 let pending_tx =
719 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
720 let failed_tx =
721 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
722
723 transaction_repo.create(confirmed_tx).await.unwrap();
725 transaction_repo.create(pending_tx).await.unwrap();
726 transaction_repo.create(failed_tx).await.unwrap();
727
728 let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
730 .await
731 .unwrap();
732
733 assert_eq!(final_transactions.len(), 2);
735 let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
736 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
737 assert!(final_ids.contains(&&"failed-tx".to_string()));
738 assert!(!final_ids.contains(&&"pending-tx".to_string()));
739 }
740
741 #[tokio::test]
742 async fn test_report_cleanup_results_success() {
743 let results = vec![
744 RelayerCleanupResult {
745 relayer_id: "relayer-1".to_string(),
746 cleaned_count: 2,
747 error: None,
748 },
749 RelayerCleanupResult {
750 relayer_id: "relayer-2".to_string(),
751 cleaned_count: 1,
752 error: None,
753 },
754 ];
755
756 let result = report_cleanup_results(results).await;
757 assert!(result.is_ok());
758 }
759
760 #[tokio::test]
761 async fn test_report_cleanup_results_with_errors() {
762 let results = vec![
763 RelayerCleanupResult {
764 relayer_id: "relayer-1".to_string(),
765 cleaned_count: 2,
766 error: None,
767 },
768 RelayerCleanupResult {
769 relayer_id: "relayer-2".to_string(),
770 cleaned_count: 0,
771 error: Some("Database error".to_string()),
772 },
773 ];
774
775 let result = report_cleanup_results(results).await;
776 assert!(result.is_err());
777 }
778
779 #[tokio::test]
780 async fn test_process_single_relayer_success() {
781 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
782 let relayer = RelayerRepoModel {
783 id: "test-relayer".to_string(),
784 name: "Test Relayer".to_string(),
785 network: "ethereum".to_string(),
786 paused: false,
787 network_type: NetworkType::Evm,
788 signer_id: "test-signer".to_string(),
789 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
790 address: "0x1234567890123456789012345678901234567890".to_string(),
791 notification_id: None,
792 system_disabled: false,
793 custom_rpc_urls: None,
794 ..Default::default()
795 };
796 let now = Utc::now();
797
798 let expired_tx = create_test_transaction(
800 "expired-tx",
801 &relayer.id,
802 TransactionStatus::Confirmed,
803 Some((now - Duration::hours(1)).to_rfc3339()),
804 );
805 let future_tx = create_test_transaction(
806 "future-tx",
807 &relayer.id,
808 TransactionStatus::Failed,
809 Some((now + Duration::hours(1)).to_rfc3339()),
810 );
811
812 transaction_repo.create(expired_tx).await.unwrap();
813 transaction_repo.create(future_tx).await.unwrap();
814
815 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
816
817 assert_eq!(result.relayer_id, relayer.id);
818 assert_eq!(result.cleaned_count, 1);
819 assert!(result.error.is_none());
820 }
821
822 #[tokio::test]
823 async fn test_process_single_relayer_no_transactions() {
824 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
826 let relayer = RelayerRepoModel {
827 id: "empty-relayer".to_string(),
828 name: "Empty Relayer".to_string(),
829 network: "ethereum".to_string(),
830 paused: false,
831 network_type: NetworkType::Evm,
832 signer_id: "test-signer".to_string(),
833 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
834 address: "0x1234567890123456789012345678901234567890".to_string(),
835 notification_id: None,
836 system_disabled: false,
837 custom_rpc_urls: None,
838 ..Default::default()
839 };
840 let now = Utc::now();
841
842 let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
844
845 assert_eq!(result.relayer_id, relayer.id);
846 assert_eq!(result.cleaned_count, 0);
847 assert!(result.error.is_none()); }
849
850 #[tokio::test]
851 async fn test_process_transactions_with_empty_list() {
852 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
853 let relayer_id = "test-relayer";
854 let now = Utc::now();
855 let transactions = vec![];
856
857 let cleaned_count =
858 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
859 .await;
860
861 assert_eq!(cleaned_count, 0);
862 }
863
864 #[tokio::test]
865 async fn test_process_transactions_with_no_expired() {
866 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
867 let relayer_id = "test-relayer";
868 let now = Utc::now();
869
870 let future_tx1 = create_test_transaction(
872 "future-tx-1",
873 relayer_id,
874 TransactionStatus::Confirmed,
875 Some((now + Duration::hours(1)).to_rfc3339()),
876 );
877 let future_tx2 = create_test_transaction(
878 "future-tx-2",
879 relayer_id,
880 TransactionStatus::Failed,
881 Some((now + Duration::hours(2)).to_rfc3339()),
882 );
883 let no_delete_tx = create_test_transaction(
884 "no-delete-tx",
885 relayer_id,
886 TransactionStatus::Canceled,
887 None,
888 );
889
890 let transactions = vec![future_tx1, future_tx2, no_delete_tx];
891
892 let cleaned_count =
893 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
894 .await;
895
896 assert_eq!(cleaned_count, 0);
897 }
898
899 #[tokio::test]
900 async fn test_should_delete_transaction_exactly_at_expiry_time() {
901 let now = Utc::now();
902 let exact_expiry_time = now.to_rfc3339();
903
904 let transaction = create_test_transaction(
905 "test-tx",
906 "test-relayer",
907 TransactionStatus::Confirmed,
908 Some(exact_expiry_time),
909 );
910
911 assert!(should_delete_transaction(&transaction, now));
913 }
914
915 #[tokio::test]
916 async fn test_parallel_processing_with_mixed_results() {
917 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
918 let relayer_id = "test-relayer";
919 let now = Utc::now();
920
921 let expired_tx1 = create_test_transaction(
923 "expired-tx-1",
924 relayer_id,
925 TransactionStatus::Confirmed,
926 Some((now - Duration::hours(1)).to_rfc3339()),
927 );
928 let expired_tx2 = create_test_transaction(
929 "expired-tx-2",
930 relayer_id,
931 TransactionStatus::Failed,
932 Some((now - Duration::hours(2)).to_rfc3339()),
933 );
934 let expired_tx3 = create_test_transaction(
935 "expired-tx-3",
936 relayer_id,
937 TransactionStatus::Canceled,
938 Some((now - Duration::hours(3)).to_rfc3339()),
939 );
940
941 transaction_repo.create(expired_tx1.clone()).await.unwrap();
943 transaction_repo.create(expired_tx2.clone()).await.unwrap();
944 let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
947
948 let cleaned_count =
949 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
950 .await;
951
952 assert_eq!(cleaned_count, 2);
954 }
955
956 #[tokio::test]
957 async fn test_delete_expired_transaction_repository_error() {
958 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
959 let relayer_id = "test-relayer";
960
961 let transaction = create_test_transaction(
962 "nonexistent-tx",
963 relayer_id,
964 TransactionStatus::Confirmed,
965 Some(Utc::now().to_rfc3339()),
966 );
967
968 let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
970
971 assert!(result.is_err());
972 let error_message = result.unwrap_err().to_string();
973 assert!(error_message.contains("Failed to delete transaction"));
974 }
975
976 #[tokio::test]
977 async fn test_report_cleanup_results_empty() {
978 let results = vec![];
979 let result = report_cleanup_results(results).await;
980 assert!(result.is_ok());
981 }
982
983 #[tokio::test]
984 async fn test_fetch_final_transactions_with_mixed_statuses() {
985 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
986 let relayer_id = "test-relayer";
987
988 let confirmed_tx = create_test_transaction(
990 "confirmed-tx",
991 relayer_id,
992 TransactionStatus::Confirmed,
993 None,
994 );
995 let failed_tx =
996 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
997 let canceled_tx =
998 create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
999 let expired_tx =
1000 create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1001 let pending_tx =
1002 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1003 let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1004
1005 transaction_repo.create(confirmed_tx).await.unwrap();
1007 transaction_repo.create(failed_tx).await.unwrap();
1008 transaction_repo.create(canceled_tx).await.unwrap();
1009 transaction_repo.create(expired_tx).await.unwrap();
1010 transaction_repo.create(pending_tx).await.unwrap();
1011 transaction_repo.create(sent_tx).await.unwrap();
1012
1013 let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
1015 .await
1016 .unwrap();
1017
1018 assert_eq!(final_transactions.len(), 4);
1020 let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
1021 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1022 assert!(final_ids.contains(&&"failed-tx".to_string()));
1023 assert!(final_ids.contains(&&"canceled-tx".to_string()));
1024 assert!(final_ids.contains(&&"expired-tx".to_string()));
1025 assert!(!final_ids.contains(&&"pending-tx".to_string()));
1026 assert!(!final_ids.contains(&&"sent-tx".to_string()));
1027 }
1028}