openzeppelin_relayer/jobs/handlers/
transaction_cleanup_handler.rs

1//! Transaction cleanup worker implementation.
2//!
3//! This module implements the transaction cleanup worker that processes
4//! expired transactions marked for deletion. It runs as a cron job to
5//! automatically clean up transactions that have passed their delete_at timestamp.
6
7use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, *};
9use chrono::{DateTime, Utc};
10use eyre::Result;
11use std::sync::Arc;
12use tracing::{debug, error, info, instrument, warn};
13
14use crate::{
15    constants::{FINAL_TRANSACTION_STATUSES, WORKER_TRANSACTION_CLEANUP_RETRIES},
16    jobs::handle_result,
17    models::{DefaultAppState, RelayerRepoModel, TransactionRepoModel},
18    repositories::{Repository, TransactionRepository},
19};
20
21/// Maximum number of relayers to process concurrently
22const MAX_CONCURRENT_RELAYERS: usize = 10;
23
24/// Maximum number of transactions to process concurrently per relayer
25const MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER: usize = 50;
26
27/// Handles periodic transaction cleanup jobs from the queue.
28///
29/// This function processes expired transactions by:
30/// 1. Fetching all relayers from the system
31/// 2. For each relayer, finding transactions with final statuses
32/// 3. Checking if their delete_at timestamp has passed
33/// 4. Validating transactions are in final states before deletion
34/// 5. Deleting transactions that have expired (in parallel)
35///
36/// # Arguments
37/// * `job` - The cron reminder job triggering the cleanup
38/// * `data` - Application state containing repositories
39/// * `attempt` - Current attempt number for retry logic
40///
41/// # Returns
42/// * `Result<(), Error>` - Success or failure of cleanup processing
43#[instrument(
44    level = "debug",
45    skip(job, data),
46    fields(
47        job_type = "transaction_cleanup",
48        attempt = %attempt.current(),
49    ),
50    err
51)]
52pub async fn transaction_cleanup_handler(
53    job: TransactionCleanupCronReminder,
54    data: Data<ThinData<DefaultAppState>>,
55    attempt: Attempt,
56) -> Result<(), Error> {
57    let result = handle_request(job, data, attempt.clone()).await;
58
59    handle_result(
60        result,
61        attempt,
62        "TransactionCleanup",
63        WORKER_TRANSACTION_CLEANUP_RETRIES,
64    )
65}
66
67/// Represents a cron reminder job for triggering cleanup operations.
68#[derive(Default, Debug, Clone)]
69pub struct TransactionCleanupCronReminder();
70
71/// Handles the actual transaction cleanup request logic.
72///
73/// # Arguments
74/// * `_job` - The cron reminder job (currently unused)
75/// * `data` - Application state containing repositories
76/// * `_attempt` - Current attempt number (currently unused)
77///
78/// # Returns
79/// * `Result<()>` - Success or failure of the cleanup operation
80async fn handle_request(
81    _job: TransactionCleanupCronReminder,
82    data: Data<ThinData<DefaultAppState>>,
83    _attempt: Attempt,
84) -> Result<()> {
85    let now = Utc::now();
86    info!(
87        timestamp = %now.to_rfc3339(),
88        "executing transaction cleanup from storage"
89    );
90
91    let transaction_repo = data.transaction_repository();
92    let relayer_repo = data.relayer_repository();
93
94    // Fetch all relayers
95    let relayers = relayer_repo.list_all().await.map_err(|e| {
96        error!(
97            error = %e,
98            "failed to fetch relayers for cleanup"
99        );
100        eyre::eyre!("Failed to fetch relayers: {}", e)
101    })?;
102
103    info!(
104        relayer_count = relayers.len(),
105        "found relayers to process for cleanup"
106    );
107
108    // Process relayers in parallel batches
109    let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
110
111    // Aggregate and report results
112    report_cleanup_results(cleanup_results).await
113}
114
115/// Processes multiple relayers in parallel batches for cleanup.
116///
117/// # Arguments
118/// * `relayers` - List of relayers to process
119/// * `transaction_repo` - Reference to the transaction repository
120/// * `now` - Current UTC timestamp for comparison
121///
122/// # Returns
123/// * `Vec<RelayerCleanupResult>` - Results from processing each relayer
124async fn process_relayers_in_batches(
125    relayers: Vec<RelayerRepoModel>,
126    transaction_repo: Arc<impl TransactionRepository>,
127    now: DateTime<Utc>,
128) -> Vec<RelayerCleanupResult> {
129    use futures::stream::{self, StreamExt};
130
131    // Process relayers with limited concurrency to avoid overwhelming the system
132    let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
133        .map(|relayer| {
134            let repo_clone = Arc::clone(&transaction_repo);
135            async move { process_single_relayer(relayer, repo_clone, now).await }
136        })
137        .buffer_unordered(MAX_CONCURRENT_RELAYERS)
138        .collect()
139        .await;
140
141    results
142}
143
144/// Result of processing a single relayer's transactions.
145#[derive(Debug)]
146struct RelayerCleanupResult {
147    relayer_id: String,
148    cleaned_count: usize,
149    error: Option<String>,
150}
151
152/// Processes cleanup for a single relayer.
153///
154/// # Arguments
155/// * `relayer` - The relayer to process
156/// * `transaction_repo` - Reference to the transaction repository
157/// * `now` - Current UTC timestamp for comparison
158///
159/// # Returns
160/// * `RelayerCleanupResult` - Result of processing this relayer
161async fn process_single_relayer(
162    relayer: RelayerRepoModel,
163    transaction_repo: Arc<impl TransactionRepository>,
164    now: DateTime<Utc>,
165) -> RelayerCleanupResult {
166    debug!(
167        relayer_id = %relayer.id,
168        "processing cleanup for relayer"
169    );
170
171    match fetch_final_transactions(&relayer.id, &transaction_repo).await {
172        Ok(final_transactions) => {
173            debug!(
174                transaction_count = final_transactions.len(),
175                relayer_id = %relayer.id,
176                "found transactions with final statuses"
177            );
178
179            let cleaned_count = process_transactions_for_cleanup(
180                final_transactions,
181                &transaction_repo,
182                &relayer.id,
183                now,
184            )
185            .await;
186
187            if cleaned_count > 0 {
188                info!(
189                    cleaned_count,
190                    relayer_id = %relayer.id,
191                    "cleaned up expired transactions"
192                );
193            }
194
195            RelayerCleanupResult {
196                relayer_id: relayer.id,
197                cleaned_count,
198                error: None,
199            }
200        }
201        Err(e) => {
202            error!(
203                error = %e,
204                relayer_id = %relayer.id,
205                "failed to fetch final transactions"
206            );
207            RelayerCleanupResult {
208                relayer_id: relayer.id,
209                cleaned_count: 0,
210                error: Some(e.to_string()),
211            }
212        }
213    }
214}
215
216/// Fetches all transactions with final statuses for a specific relayer.
217///
218/// # Arguments
219/// * `relayer_id` - ID of the relayer
220/// * `transaction_repo` - Reference to the transaction repository
221///
222/// # Returns
223/// * `Result<Vec<TransactionRepoModel>>` - List of transactions with final statuses or error
224async fn fetch_final_transactions(
225    relayer_id: &str,
226    transaction_repo: &Arc<impl TransactionRepository>,
227) -> Result<Vec<TransactionRepoModel>> {
228    transaction_repo
229        .find_by_status(relayer_id, FINAL_TRANSACTION_STATUSES)
230        .await
231        .map_err(|e| {
232            eyre::eyre!(
233                "Failed to fetch final transactions for relayer {}: {}",
234                relayer_id,
235                e
236            )
237        })
238}
239
240/// Processes a list of transactions for cleanup in parallel, deleting expired ones.
241///
242/// This function validates that transactions are in final states before deletion,
243/// ensuring data integrity by preventing accidental deletion of active transactions.
244///
245/// # Arguments
246/// * `transactions` - List of transactions to process
247/// * `transaction_repo` - Reference to the transaction repository
248/// * `relayer_id` - ID of the relayer (for logging)
249/// * `now` - Current UTC timestamp for comparison
250///
251/// # Returns
252/// * `usize` - Number of transactions successfully cleaned up
253async fn process_transactions_for_cleanup(
254    transactions: Vec<TransactionRepoModel>,
255    transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
256    relayer_id: &str,
257    now: DateTime<Utc>,
258) -> usize {
259    use futures::stream::{self, StreamExt};
260
261    if transactions.is_empty() {
262        return 0;
263    }
264
265    debug!(
266        transaction_count = transactions.len(),
267        relayer_id = %relayer_id,
268        "processing transactions in parallel"
269    );
270
271    // Filter expired transactions first (this is fast and synchronous)
272    let expired_transactions: Vec<TransactionRepoModel> = transactions
273        .into_iter()
274        .filter(|tx| should_delete_transaction(tx, now))
275        .collect();
276
277    if expired_transactions.is_empty() {
278        debug!(
279            relayer_id = %relayer_id,
280            "no expired transactions found"
281        );
282        return 0;
283    }
284
285    debug!(
286        expired_count = expired_transactions.len(),
287        relayer_id = %relayer_id,
288        "found expired transactions to delete"
289    );
290
291    // Process deletions in parallel with limited concurrency
292    let deletion_results: Vec<bool> = stream::iter(expired_transactions)
293        .map(|transaction| {
294            let repo_clone = Arc::clone(transaction_repo);
295            let relayer_id = relayer_id.to_string();
296            async move {
297                match delete_expired_transaction(&transaction, &repo_clone, &relayer_id).await {
298                    Ok(()) => true,
299                    Err(e) => {
300                        error!(
301                            tx_id = %transaction.id,
302                            error = %e,
303                            "failed to delete expired transaction"
304                        );
305                        false
306                    }
307                }
308            }
309        })
310        .buffer_unordered(MAX_CONCURRENT_TRANSACTIONS_PER_RELAYER)
311        .collect()
312        .await;
313
314    // Count successful deletions
315    let cleaned_count = deletion_results.iter().filter(|&&success| success).count();
316
317    debug!(
318        cleaned_count,
319        total_attempted = deletion_results.len(),
320        relayer_id = %relayer_id,
321        "successfully deleted expired transactions"
322    );
323
324    cleaned_count
325}
326
327/// Determines if a transaction should be deleted based on its delete_at timestamp.
328///
329/// # Arguments
330/// * `transaction` - The transaction to check
331/// * `now` - Current UTC timestamp for comparison
332///
333/// # Returns
334/// * `bool` - True if the transaction should be deleted, false otherwise
335fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
336    transaction
337        .delete_at
338        .as_ref()
339        .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
340        .map(|delete_at| {
341            let is_expired = now >= delete_at.with_timezone(&Utc);
342            if is_expired {
343                debug!(
344                    tx_id = %transaction.id,
345                    expired_at = %delete_at.to_rfc3339(),
346                    "transaction is expired"
347                );
348            }
349            is_expired
350        })
351        .unwrap_or_else(|| {
352            if transaction.delete_at.is_some() {
353                warn!(
354                    tx_id = %transaction.id,
355                    "transaction has invalid delete_at timestamp"
356                );
357            }
358            false
359        })
360}
361
362/// Deletes an expired transaction from the repository.
363///
364/// # Arguments
365/// * `transaction` - The transaction to delete
366/// * `transaction_repo` - Reference to the transaction repository
367/// * `relayer_id` - ID of the relayer (for logging)
368///
369/// # Returns
370/// * `Result<()>` - Success or failure of the deletion
371async fn delete_expired_transaction(
372    transaction: &TransactionRepoModel,
373    transaction_repo: &Arc<impl Repository<TransactionRepoModel, String>>,
374    relayer_id: &str,
375) -> Result<()> {
376    // Validate that the transaction is in a final state before deletion
377    if !FINAL_TRANSACTION_STATUSES.contains(&transaction.status) {
378        return Err(eyre::eyre!(
379            "Transaction {} is not in a final state (current: {:?})",
380            transaction.id,
381            transaction.status
382        ));
383    }
384
385    debug!(
386        tx_id = %transaction.id,
387        status = ?transaction.status,
388        relayer_id = %relayer_id,
389        "deleting expired transaction"
390    );
391
392    transaction_repo
393        .delete_by_id(transaction.id.clone())
394        .await
395        .map_err(|e| eyre::eyre!("Failed to delete transaction {}: {}", transaction.id, e))?;
396
397    info!(
398        tx_id = %transaction.id,
399        status = ?transaction.status,
400        relayer_id = %relayer_id,
401        "successfully deleted expired transaction"
402    );
403
404    Ok(())
405}
406
407/// Reports the aggregated results of the cleanup operation.
408///
409/// # Arguments
410/// * `cleanup_results` - Results from processing all relayers
411///
412/// # Returns
413/// * `Result<()>` - Success if all went well, error if there were failures
414async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
415    let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
416    let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
417    let total_relayers = cleanup_results.len();
418
419    // Log detailed results for relayers with errors
420    for result in &cleanup_results {
421        if let Some(error) = &result.error {
422            error!(
423                relayer_id = %result.relayer_id,
424                error = %error,
425                "failed to cleanup transactions for relayer"
426            );
427        }
428    }
429
430    if total_errors > 0 {
431        warn!(
432            total_errors,
433            total_relayers, total_cleaned, "transaction cleanup completed with errors"
434        );
435
436        // Return error if there were failures, but don't fail the entire job
437        // This allows for partial success and retry of failed relayers
438        Err(eyre::eyre!(
439            "Cleanup completed with {} errors out of {} relayers",
440            total_errors,
441            total_relayers
442        ))
443    } else {
444        info!(
445            total_cleaned,
446            total_relayers, "transaction cleanup completed successfully"
447        );
448        Ok(())
449    }
450}
451
452#[cfg(test)]
453mod tests {
454
455    use super::*;
456    use crate::{
457        models::{
458            NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
459            TransactionRepoModel, TransactionStatus,
460        },
461        repositories::{InMemoryTransactionRepository, Repository},
462        utils::mocks::mockutils::create_mock_transaction,
463    };
464    use chrono::{Duration, Utc};
465
466    fn create_test_transaction(
467        id: &str,
468        relayer_id: &str,
469        status: TransactionStatus,
470        delete_at: Option<String>,
471    ) -> TransactionRepoModel {
472        let mut tx = create_mock_transaction();
473        tx.id = id.to_string();
474        tx.relayer_id = relayer_id.to_string();
475        tx.status = status;
476        tx.delete_at = delete_at;
477        tx
478    }
479
480    #[tokio::test]
481    async fn test_should_delete_transaction_expired() {
482        let now = Utc::now();
483        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
484
485        let transaction = create_test_transaction(
486            "test-tx",
487            "test-relayer",
488            TransactionStatus::Confirmed,
489            Some(expired_delete_at),
490        );
491
492        assert!(should_delete_transaction(&transaction, now));
493    }
494
495    #[tokio::test]
496    async fn test_should_delete_transaction_not_expired() {
497        let now = Utc::now();
498        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
499
500        let transaction = create_test_transaction(
501            "test-tx",
502            "test-relayer",
503            TransactionStatus::Confirmed,
504            Some(future_delete_at),
505        );
506
507        assert!(!should_delete_transaction(&transaction, now));
508    }
509
510    #[tokio::test]
511    async fn test_should_delete_transaction_no_delete_at() {
512        let now = Utc::now();
513
514        let transaction = create_test_transaction(
515            "test-tx",
516            "test-relayer",
517            TransactionStatus::Confirmed,
518            None,
519        );
520
521        assert!(!should_delete_transaction(&transaction, now));
522    }
523
524    #[tokio::test]
525    async fn test_should_delete_transaction_invalid_timestamp() {
526        let now = Utc::now();
527
528        let transaction = create_test_transaction(
529            "test-tx",
530            "test-relayer",
531            TransactionStatus::Confirmed,
532            Some("invalid-timestamp".to_string()),
533        );
534
535        assert!(!should_delete_transaction(&transaction, now));
536    }
537
538    #[tokio::test]
539    async fn test_process_transactions_for_cleanup_parallel() {
540        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
541        let relayer_id = "test-relayer";
542        let now = Utc::now();
543
544        // Create test transactions
545        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
546        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
547
548        let expired_tx = create_test_transaction(
549            "expired-tx",
550            relayer_id,
551            TransactionStatus::Confirmed,
552            Some(expired_delete_at),
553        );
554        let future_tx = create_test_transaction(
555            "future-tx",
556            relayer_id,
557            TransactionStatus::Failed,
558            Some(future_delete_at),
559        );
560        let no_delete_tx = create_test_transaction(
561            "no-delete-tx",
562            relayer_id,
563            TransactionStatus::Canceled,
564            None,
565        );
566
567        // Store transactions
568        transaction_repo.create(expired_tx.clone()).await.unwrap();
569        transaction_repo.create(future_tx.clone()).await.unwrap();
570        transaction_repo.create(no_delete_tx.clone()).await.unwrap();
571
572        let transactions = vec![expired_tx, future_tx, no_delete_tx];
573
574        // Process transactions
575        let cleaned_count =
576            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
577                .await;
578
579        // Should have cleaned up 1 expired transaction
580        assert_eq!(cleaned_count, 1);
581
582        // Verify expired transaction was deleted
583        assert!(transaction_repo
584            .get_by_id("expired-tx".to_string())
585            .await
586            .is_err());
587
588        // Verify non-expired transactions still exist
589        assert!(transaction_repo
590            .get_by_id("future-tx".to_string())
591            .await
592            .is_ok());
593        assert!(transaction_repo
594            .get_by_id("no-delete-tx".to_string())
595            .await
596            .is_ok());
597    }
598
599    #[tokio::test]
600    async fn test_delete_expired_transaction() {
601        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
602        let relayer_id = "test-relayer";
603
604        let transaction = create_test_transaction(
605            "test-tx",
606            relayer_id,
607            TransactionStatus::Confirmed, // Final status
608            Some(Utc::now().to_rfc3339()),
609        );
610
611        // Store transaction
612        transaction_repo.create(transaction.clone()).await.unwrap();
613
614        // Verify it exists
615        assert!(transaction_repo
616            .get_by_id("test-tx".to_string())
617            .await
618            .is_ok());
619
620        // Delete it
621        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
622        assert!(result.is_ok());
623
624        // Verify it was deleted
625        assert!(transaction_repo
626            .get_by_id("test-tx".to_string())
627            .await
628            .is_err());
629    }
630
631    #[tokio::test]
632    async fn test_delete_expired_transaction_validates_final_status() {
633        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
634        let relayer_id = "test-relayer";
635
636        let transaction = create_test_transaction(
637            "test-tx",
638            relayer_id,
639            TransactionStatus::Pending, // Non-final status
640            Some(Utc::now().to_rfc3339()),
641        );
642
643        // Store transaction
644        transaction_repo.create(transaction.clone()).await.unwrap();
645
646        // Verify it exists
647        assert!(transaction_repo
648            .get_by_id("test-tx".to_string())
649            .await
650            .is_ok());
651
652        // Try to delete it - should fail due to validation
653        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
654        assert!(result.is_err());
655
656        let error_message = result.unwrap_err().to_string();
657        assert!(error_message.contains("is not in a final state"));
658        assert!(error_message.contains("Pending"));
659
660        // Verify it still exists (wasn't deleted)
661        assert!(transaction_repo
662            .get_by_id("test-tx".to_string())
663            .await
664            .is_ok());
665    }
666
667    #[tokio::test]
668    async fn test_delete_expired_transaction_validates_all_final_statuses() {
669        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
670        let relayer_id = "test-relayer";
671
672        // Test each final status to ensure they all pass validation
673        let final_statuses = [
674            TransactionStatus::Confirmed,
675            TransactionStatus::Failed,
676            TransactionStatus::Canceled,
677            TransactionStatus::Expired,
678        ];
679
680        for (i, status) in final_statuses.iter().enumerate() {
681            let tx_id = format!("test-tx-{}", i);
682            let transaction = create_test_transaction(
683                &tx_id,
684                relayer_id,
685                status.clone(),
686                Some(Utc::now().to_rfc3339()),
687            );
688
689            // Store transaction
690            transaction_repo.create(transaction.clone()).await.unwrap();
691
692            // Delete it - should succeed for all final statuses
693            let result =
694                delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
695            assert!(
696                result.is_ok(),
697                "Failed to delete transaction with status: {:?}",
698                status
699            );
700
701            // Verify it was deleted
702            assert!(transaction_repo.get_by_id(tx_id).await.is_err());
703        }
704    }
705
706    #[tokio::test]
707    async fn test_fetch_final_transactions() {
708        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
709        let relayer_id = "test-relayer";
710
711        // Create transactions with different statuses
712        let confirmed_tx = create_test_transaction(
713            "confirmed-tx",
714            relayer_id,
715            TransactionStatus::Confirmed,
716            None,
717        );
718        let pending_tx =
719            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
720        let failed_tx =
721            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
722
723        // Store transactions
724        transaction_repo.create(confirmed_tx).await.unwrap();
725        transaction_repo.create(pending_tx).await.unwrap();
726        transaction_repo.create(failed_tx).await.unwrap();
727
728        // Fetch final transactions
729        let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
730            .await
731            .unwrap();
732
733        // Should only return transactions with final statuses (Confirmed, Failed)
734        assert_eq!(final_transactions.len(), 2);
735        let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
736        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
737        assert!(final_ids.contains(&&"failed-tx".to_string()));
738        assert!(!final_ids.contains(&&"pending-tx".to_string()));
739    }
740
741    #[tokio::test]
742    async fn test_report_cleanup_results_success() {
743        let results = vec![
744            RelayerCleanupResult {
745                relayer_id: "relayer-1".to_string(),
746                cleaned_count: 2,
747                error: None,
748            },
749            RelayerCleanupResult {
750                relayer_id: "relayer-2".to_string(),
751                cleaned_count: 1,
752                error: None,
753            },
754        ];
755
756        let result = report_cleanup_results(results).await;
757        assert!(result.is_ok());
758    }
759
760    #[tokio::test]
761    async fn test_report_cleanup_results_with_errors() {
762        let results = vec![
763            RelayerCleanupResult {
764                relayer_id: "relayer-1".to_string(),
765                cleaned_count: 2,
766                error: None,
767            },
768            RelayerCleanupResult {
769                relayer_id: "relayer-2".to_string(),
770                cleaned_count: 0,
771                error: Some("Database error".to_string()),
772            },
773        ];
774
775        let result = report_cleanup_results(results).await;
776        assert!(result.is_err());
777    }
778
779    #[tokio::test]
780    async fn test_process_single_relayer_success() {
781        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
782        let relayer = RelayerRepoModel {
783            id: "test-relayer".to_string(),
784            name: "Test Relayer".to_string(),
785            network: "ethereum".to_string(),
786            paused: false,
787            network_type: NetworkType::Evm,
788            signer_id: "test-signer".to_string(),
789            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
790            address: "0x1234567890123456789012345678901234567890".to_string(),
791            notification_id: None,
792            system_disabled: false,
793            custom_rpc_urls: None,
794            ..Default::default()
795        };
796        let now = Utc::now();
797
798        // Create expired and non-expired transactions
799        let expired_tx = create_test_transaction(
800            "expired-tx",
801            &relayer.id,
802            TransactionStatus::Confirmed,
803            Some((now - Duration::hours(1)).to_rfc3339()),
804        );
805        let future_tx = create_test_transaction(
806            "future-tx",
807            &relayer.id,
808            TransactionStatus::Failed,
809            Some((now + Duration::hours(1)).to_rfc3339()),
810        );
811
812        transaction_repo.create(expired_tx).await.unwrap();
813        transaction_repo.create(future_tx).await.unwrap();
814
815        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
816
817        assert_eq!(result.relayer_id, relayer.id);
818        assert_eq!(result.cleaned_count, 1);
819        assert!(result.error.is_none());
820    }
821
822    #[tokio::test]
823    async fn test_process_single_relayer_no_transactions() {
824        // Create a relayer with no transactions in the repo
825        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
826        let relayer = RelayerRepoModel {
827            id: "empty-relayer".to_string(),
828            name: "Empty Relayer".to_string(),
829            network: "ethereum".to_string(),
830            paused: false,
831            network_type: NetworkType::Evm,
832            signer_id: "test-signer".to_string(),
833            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
834            address: "0x1234567890123456789012345678901234567890".to_string(),
835            notification_id: None,
836            system_disabled: false,
837            custom_rpc_urls: None,
838            ..Default::default()
839        };
840        let now = Utc::now();
841
842        // This should succeed but find no transactions
843        let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
844
845        assert_eq!(result.relayer_id, relayer.id);
846        assert_eq!(result.cleaned_count, 0);
847        assert!(result.error.is_none()); // No error, just no transactions found
848    }
849
850    #[tokio::test]
851    async fn test_process_transactions_with_empty_list() {
852        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
853        let relayer_id = "test-relayer";
854        let now = Utc::now();
855        let transactions = vec![];
856
857        let cleaned_count =
858            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
859                .await;
860
861        assert_eq!(cleaned_count, 0);
862    }
863
864    #[tokio::test]
865    async fn test_process_transactions_with_no_expired() {
866        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
867        let relayer_id = "test-relayer";
868        let now = Utc::now();
869
870        // Create only non-expired transactions
871        let future_tx1 = create_test_transaction(
872            "future-tx-1",
873            relayer_id,
874            TransactionStatus::Confirmed,
875            Some((now + Duration::hours(1)).to_rfc3339()),
876        );
877        let future_tx2 = create_test_transaction(
878            "future-tx-2",
879            relayer_id,
880            TransactionStatus::Failed,
881            Some((now + Duration::hours(2)).to_rfc3339()),
882        );
883        let no_delete_tx = create_test_transaction(
884            "no-delete-tx",
885            relayer_id,
886            TransactionStatus::Canceled,
887            None,
888        );
889
890        let transactions = vec![future_tx1, future_tx2, no_delete_tx];
891
892        let cleaned_count =
893            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
894                .await;
895
896        assert_eq!(cleaned_count, 0);
897    }
898
899    #[tokio::test]
900    async fn test_should_delete_transaction_exactly_at_expiry_time() {
901        let now = Utc::now();
902        let exact_expiry_time = now.to_rfc3339();
903
904        let transaction = create_test_transaction(
905            "test-tx",
906            "test-relayer",
907            TransactionStatus::Confirmed,
908            Some(exact_expiry_time),
909        );
910
911        // Should be considered expired when exactly at expiry time
912        assert!(should_delete_transaction(&transaction, now));
913    }
914
915    #[tokio::test]
916    async fn test_parallel_processing_with_mixed_results() {
917        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
918        let relayer_id = "test-relayer";
919        let now = Utc::now();
920
921        // Create multiple expired transactions
922        let expired_tx1 = create_test_transaction(
923            "expired-tx-1",
924            relayer_id,
925            TransactionStatus::Confirmed,
926            Some((now - Duration::hours(1)).to_rfc3339()),
927        );
928        let expired_tx2 = create_test_transaction(
929            "expired-tx-2",
930            relayer_id,
931            TransactionStatus::Failed,
932            Some((now - Duration::hours(2)).to_rfc3339()),
933        );
934        let expired_tx3 = create_test_transaction(
935            "expired-tx-3",
936            relayer_id,
937            TransactionStatus::Canceled,
938            Some((now - Duration::hours(3)).to_rfc3339()),
939        );
940
941        // Store only some transactions (others will fail deletion due to NotFound)
942        transaction_repo.create(expired_tx1.clone()).await.unwrap();
943        transaction_repo.create(expired_tx2.clone()).await.unwrap();
944        // Don't store expired_tx3 - it will fail deletion
945
946        let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
947
948        let cleaned_count =
949            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
950                .await;
951
952        // Should have cleaned 2 out of 3 transactions (one failed due to NotFound)
953        assert_eq!(cleaned_count, 2);
954    }
955
956    #[tokio::test]
957    async fn test_delete_expired_transaction_repository_error() {
958        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
959        let relayer_id = "test-relayer";
960
961        let transaction = create_test_transaction(
962            "nonexistent-tx",
963            relayer_id,
964            TransactionStatus::Confirmed,
965            Some(Utc::now().to_rfc3339()),
966        );
967
968        // Don't store the transaction, so delete will fail with NotFound
969        let result = delete_expired_transaction(&transaction, &transaction_repo, relayer_id).await;
970
971        assert!(result.is_err());
972        let error_message = result.unwrap_err().to_string();
973        assert!(error_message.contains("Failed to delete transaction"));
974    }
975
976    #[tokio::test]
977    async fn test_report_cleanup_results_empty() {
978        let results = vec![];
979        let result = report_cleanup_results(results).await;
980        assert!(result.is_ok());
981    }
982
983    #[tokio::test]
984    async fn test_fetch_final_transactions_with_mixed_statuses() {
985        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
986        let relayer_id = "test-relayer";
987
988        // Create transactions with all possible statuses
989        let confirmed_tx = create_test_transaction(
990            "confirmed-tx",
991            relayer_id,
992            TransactionStatus::Confirmed,
993            None,
994        );
995        let failed_tx =
996            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
997        let canceled_tx =
998            create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
999        let expired_tx =
1000            create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1001        let pending_tx =
1002            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1003        let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1004
1005        // Store all transactions
1006        transaction_repo.create(confirmed_tx).await.unwrap();
1007        transaction_repo.create(failed_tx).await.unwrap();
1008        transaction_repo.create(canceled_tx).await.unwrap();
1009        transaction_repo.create(expired_tx).await.unwrap();
1010        transaction_repo.create(pending_tx).await.unwrap();
1011        transaction_repo.create(sent_tx).await.unwrap();
1012
1013        // Fetch final transactions
1014        let final_transactions = fetch_final_transactions(relayer_id, &transaction_repo)
1015            .await
1016            .unwrap();
1017
1018        // Should only return the 4 final status transactions
1019        assert_eq!(final_transactions.len(), 4);
1020        let final_ids: Vec<&String> = final_transactions.iter().map(|tx| &tx.id).collect();
1021        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1022        assert!(final_ids.contains(&&"failed-tx".to_string()));
1023        assert!(final_ids.contains(&&"canceled-tx".to_string()));
1024        assert!(final_ids.contains(&&"expired-tx".to_string()));
1025        assert!(!final_ids.contains(&&"pending-tx".to_string()));
1026        assert!(!final_ids.contains(&&"sent-tx".to_string()));
1027    }
1028}