openzeppelin_relayer/repositories/transaction/
transaction_in_memory.rs

1//! This module defines an in-memory transaction repository for managing
2//! transaction data. It provides asynchronous methods for creating, retrieving,
3//! updating, and deleting transactions, as well as querying transactions by
4//! various criteria such as relayer ID, status, and nonce. The repository
5//! is implemented using a `Mutex`-protected `HashMap` to store transaction
6//! data, ensuring thread-safe access in an asynchronous context.
7use crate::{
8    models::{
9        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10    },
11    repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21    store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25    fn clone(&self) -> Self {
26        // Try to get the current data, or use empty HashMap if lock fails
27        let data = self
28            .store
29            .try_lock()
30            .map(|guard| guard.clone())
31            .unwrap_or_else(|_| HashMap::new());
32
33        Self {
34            store: Mutex::new(data),
35        }
36    }
37}
38
39impl InMemoryTransactionRepository {
40    pub fn new() -> Self {
41        Self {
42            store: Mutex::new(HashMap::new()),
43        }
44    }
45
46    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47        Ok(lock.lock().await)
48    }
49}
50
51// Implement both traits for InMemoryTransactionRepository
52
53#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55    async fn create(
56        &self,
57        tx: TransactionRepoModel,
58    ) -> Result<TransactionRepoModel, RepositoryError> {
59        let mut store = Self::acquire_lock(&self.store).await?;
60        if store.contains_key(&tx.id) {
61            return Err(RepositoryError::ConstraintViolation(format!(
62                "Transaction with ID {} already exists",
63                tx.id
64            )));
65        }
66        store.insert(tx.id.clone(), tx.clone());
67        Ok(tx)
68    }
69
70    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71        let store = Self::acquire_lock(&self.store).await?;
72        store
73            .get(&id)
74            .cloned()
75            .ok_or_else(|| RepositoryError::NotFound(format!("Transaction with ID {id} not found")))
76    }
77
78    #[allow(clippy::map_entry)]
79    async fn update(
80        &self,
81        id: String,
82        tx: TransactionRepoModel,
83    ) -> Result<TransactionRepoModel, RepositoryError> {
84        let mut store = Self::acquire_lock(&self.store).await?;
85        if store.contains_key(&id) {
86            let mut updated_tx = tx;
87            updated_tx.id = id.clone();
88            store.insert(id, updated_tx.clone());
89            Ok(updated_tx)
90        } else {
91            Err(RepositoryError::NotFound(format!(
92                "Transaction with ID {id} not found"
93            )))
94        }
95    }
96
97    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98        let mut store = Self::acquire_lock(&self.store).await?;
99        if store.remove(&id).is_some() {
100            Ok(())
101        } else {
102            Err(RepositoryError::NotFound(format!(
103                "Transaction with ID {id} not found"
104            )))
105        }
106    }
107
108    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
109        let store = Self::acquire_lock(&self.store).await?;
110        Ok(store.values().cloned().collect())
111    }
112
113    async fn list_paginated(
114        &self,
115        query: PaginationQuery,
116    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
117        let total = self.count().await?;
118        let start = ((query.page - 1) * query.per_page) as usize;
119        let store = Self::acquire_lock(&self.store).await?;
120        let items: Vec<TransactionRepoModel> = store
121            .values()
122            .skip(start)
123            .take(query.per_page as usize)
124            .cloned()
125            .collect();
126
127        Ok(PaginatedResult {
128            items,
129            total: total as u64,
130            page: query.page,
131            per_page: query.per_page,
132        })
133    }
134
135    async fn count(&self) -> Result<usize, RepositoryError> {
136        let store = Self::acquire_lock(&self.store).await?;
137        Ok(store.len())
138    }
139
140    async fn has_entries(&self) -> Result<bool, RepositoryError> {
141        let store = Self::acquire_lock(&self.store).await?;
142        Ok(!store.is_empty())
143    }
144
145    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
146        let mut store = Self::acquire_lock(&self.store).await?;
147        store.clear();
148        Ok(())
149    }
150}
151
152#[async_trait]
153impl TransactionRepository for InMemoryTransactionRepository {
154    async fn find_by_relayer_id(
155        &self,
156        relayer_id: &str,
157        query: PaginationQuery,
158    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
159        let store = Self::acquire_lock(&self.store).await?;
160        let filtered: Vec<TransactionRepoModel> = store
161            .values()
162            .filter(|tx| tx.relayer_id == relayer_id)
163            .cloned()
164            .collect();
165
166        let total = filtered.len() as u64;
167
168        if total == 0 {
169            return Ok(PaginatedResult::<TransactionRepoModel> {
170                items: vec![],
171                total: 0,
172                page: query.page,
173                per_page: query.per_page,
174            });
175        }
176
177        let start = ((query.page - 1) * query.per_page) as usize;
178
179        // Sort and paginate
180        let items = filtered
181            .into_iter()
182            .sorted_by(|a, b| a.created_at.cmp(&b.created_at)) // Sort by created_at
183            .skip(start)
184            .take(query.per_page as usize)
185            .collect();
186
187        Ok(PaginatedResult {
188            items,
189            total,
190            page: query.page,
191            per_page: query.per_page,
192        })
193    }
194
195    async fn find_by_status(
196        &self,
197        relayer_id: &str,
198        statuses: &[TransactionStatus],
199    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
200        let store = Self::acquire_lock(&self.store).await?;
201        let filtered: Vec<TransactionRepoModel> = store
202            .values()
203            .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
204            .cloned()
205            .collect();
206
207        // Sort by created_at (oldest first)
208        let sorted = filtered
209            .into_iter()
210            .sorted_by_key(|tx| tx.created_at.clone())
211            .collect();
212
213        Ok(sorted)
214    }
215
216    async fn find_by_nonce(
217        &self,
218        relayer_id: &str,
219        nonce: u64,
220    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
221        let store = Self::acquire_lock(&self.store).await?;
222        let filtered: Vec<TransactionRepoModel> = store
223            .values()
224            .filter(|tx| {
225                tx.relayer_id == relayer_id
226                    && match &tx.network_data {
227                        NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
228                        _ => false,
229                    }
230            })
231            .cloned()
232            .collect();
233
234        Ok(filtered.into_iter().next())
235    }
236
237    async fn update_status(
238        &self,
239        tx_id: String,
240        status: TransactionStatus,
241    ) -> Result<TransactionRepoModel, RepositoryError> {
242        let update = TransactionUpdateRequest {
243            status: Some(status),
244            ..Default::default()
245        };
246        self.partial_update(tx_id, update).await
247    }
248
249    async fn partial_update(
250        &self,
251        tx_id: String,
252        update: TransactionUpdateRequest,
253    ) -> Result<TransactionRepoModel, RepositoryError> {
254        let mut store = Self::acquire_lock(&self.store).await?;
255
256        if let Some(tx) = store.get_mut(&tx_id) {
257            // Apply partial updates using the model's business logic
258            tx.apply_partial_update(update);
259            Ok(tx.clone())
260        } else {
261            Err(RepositoryError::NotFound(format!(
262                "Transaction with ID {tx_id} not found"
263            )))
264        }
265    }
266
267    async fn update_network_data(
268        &self,
269        tx_id: String,
270        network_data: NetworkTransactionData,
271    ) -> Result<TransactionRepoModel, RepositoryError> {
272        let mut tx = self.get_by_id(tx_id.clone()).await?;
273        tx.network_data = network_data;
274        self.update(tx_id, tx).await
275    }
276
277    async fn set_sent_at(
278        &self,
279        tx_id: String,
280        sent_at: String,
281    ) -> Result<TransactionRepoModel, RepositoryError> {
282        let mut tx = self.get_by_id(tx_id.clone()).await?;
283        tx.sent_at = Some(sent_at);
284        self.update(tx_id, tx).await
285    }
286
287    async fn set_confirmed_at(
288        &self,
289        tx_id: String,
290        confirmed_at: String,
291    ) -> Result<TransactionRepoModel, RepositoryError> {
292        let mut tx = self.get_by_id(tx_id.clone()).await?;
293        tx.confirmed_at = Some(confirmed_at);
294        self.update(tx_id, tx).await
295    }
296}
297
298impl Default for InMemoryTransactionRepository {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
307    use lazy_static::lazy_static;
308    use std::str::FromStr;
309
310    use crate::models::U256;
311
312    use super::*;
313
314    use tokio::sync::Mutex;
315
316    lazy_static! {
317        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
318    }
319    // Helper function to create test transactions
320    fn create_test_transaction(id: &str) -> TransactionRepoModel {
321        TransactionRepoModel {
322            id: id.to_string(),
323            relayer_id: "relayer-1".to_string(),
324            status: TransactionStatus::Pending,
325            status_reason: None,
326            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
327            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
328            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
329            valid_until: None,
330            delete_at: None,
331            network_type: NetworkType::Evm,
332            priced_at: None,
333            hashes: vec![],
334            network_data: NetworkTransactionData::Evm(EvmTransactionData {
335                gas_price: Some(1000000000),
336                gas_limit: Some(21000),
337                nonce: Some(1),
338                value: U256::from_str("1000000000000000000").unwrap(),
339                data: Some("0x".to_string()),
340                from: "0xSender".to_string(),
341                to: Some("0xRecipient".to_string()),
342                chain_id: 1,
343                signature: None,
344                hash: Some(format!("0x{}", id)),
345                speed: Some(Speed::Fast),
346                max_fee_per_gas: None,
347                max_priority_fee_per_gas: None,
348                raw: None,
349            }),
350            noop_count: None,
351            is_canceled: Some(false),
352        }
353    }
354
355    fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
356        TransactionRepoModel {
357            id: id.to_string(),
358            relayer_id: "relayer-1".to_string(),
359            status: TransactionStatus::Pending,
360            status_reason: None,
361            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
362            sent_at: None,
363            confirmed_at: None,
364            valid_until: None,
365            delete_at: None,
366            network_type: NetworkType::Evm,
367            priced_at: None,
368            hashes: vec![],
369            network_data: NetworkTransactionData::Evm(EvmTransactionData {
370                gas_price: Some(1000000000),
371                gas_limit: Some(21000),
372                nonce: Some(1),
373                value: U256::from_str("1000000000000000000").unwrap(),
374                data: Some("0x".to_string()),
375                from: "0xSender".to_string(),
376                to: Some("0xRecipient".to_string()),
377                chain_id: 1,
378                signature: None,
379                hash: Some(format!("0x{}", id)),
380                speed: Some(Speed::Fast),
381                max_fee_per_gas: None,
382                max_priority_fee_per_gas: None,
383                raw: None,
384            }),
385            noop_count: None,
386            is_canceled: Some(false),
387        }
388    }
389
390    #[tokio::test]
391    async fn test_create_transaction() {
392        let repo = InMemoryTransactionRepository::new();
393        let tx = create_test_transaction("test-1");
394
395        let result = repo.create(tx.clone()).await.unwrap();
396        assert_eq!(result.id, tx.id);
397        assert_eq!(repo.count().await.unwrap(), 1);
398    }
399
400    #[tokio::test]
401    async fn test_get_transaction() {
402        let repo = InMemoryTransactionRepository::new();
403        let tx = create_test_transaction("test-1");
404
405        repo.create(tx.clone()).await.unwrap();
406        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
407        if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
408            if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
409                assert_eq!(stored_data.hash, tx_data.hash);
410            }
411        }
412    }
413
414    #[tokio::test]
415    async fn test_update_transaction() {
416        let repo = InMemoryTransactionRepository::new();
417        let mut tx = create_test_transaction("test-1");
418
419        repo.create(tx.clone()).await.unwrap();
420        tx.status = TransactionStatus::Confirmed;
421
422        let updated = repo.update("test-1".to_string(), tx).await.unwrap();
423        assert!(matches!(updated.status, TransactionStatus::Confirmed));
424    }
425
426    #[tokio::test]
427    async fn test_delete_transaction() {
428        let repo = InMemoryTransactionRepository::new();
429        let tx = create_test_transaction("test-1");
430
431        repo.create(tx).await.unwrap();
432        repo.delete_by_id("test-1".to_string()).await.unwrap();
433
434        let result = repo.get_by_id("test-1".to_string()).await;
435        assert!(result.is_err());
436    }
437
438    #[tokio::test]
439    async fn test_list_all_transactions() {
440        let repo = InMemoryTransactionRepository::new();
441        let tx1 = create_test_transaction("test-1");
442        let tx2 = create_test_transaction("test-2");
443
444        repo.create(tx1).await.unwrap();
445        repo.create(tx2).await.unwrap();
446
447        let transactions = repo.list_all().await.unwrap();
448        assert_eq!(transactions.len(), 2);
449    }
450
451    #[tokio::test]
452    async fn test_count_transactions() {
453        let repo = InMemoryTransactionRepository::new();
454        let tx = create_test_transaction("test-1");
455
456        assert_eq!(repo.count().await.unwrap(), 0);
457        repo.create(tx).await.unwrap();
458        assert_eq!(repo.count().await.unwrap(), 1);
459    }
460
461    #[tokio::test]
462    async fn test_get_nonexistent_transaction() {
463        let repo = InMemoryTransactionRepository::new();
464        let result = repo.get_by_id("nonexistent".to_string()).await;
465        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
466    }
467
468    #[tokio::test]
469    async fn test_duplicate_transaction_creation() {
470        let repo = InMemoryTransactionRepository::new();
471        let tx = create_test_transaction("test-1");
472
473        repo.create(tx.clone()).await.unwrap();
474        let result = repo.create(tx).await;
475
476        assert!(matches!(
477            result,
478            Err(RepositoryError::ConstraintViolation(_))
479        ));
480    }
481
482    #[tokio::test]
483    async fn test_update_nonexistent_transaction() {
484        let repo = InMemoryTransactionRepository::new();
485        let tx = create_test_transaction("test-1");
486
487        let result = repo.update("nonexistent".to_string(), tx).await;
488        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
489    }
490
491    #[tokio::test]
492    async fn test_partial_update() {
493        let repo = InMemoryTransactionRepository::new();
494        let tx = create_test_transaction_pending_state("test-tx-id");
495        repo.create(tx.clone()).await.unwrap();
496
497        // Test updating only status
498        let update1 = TransactionUpdateRequest {
499            status: Some(TransactionStatus::Sent),
500            status_reason: None,
501            sent_at: None,
502            confirmed_at: None,
503            network_data: None,
504            hashes: None,
505            priced_at: None,
506            noop_count: None,
507            is_canceled: None,
508            delete_at: None,
509        };
510        let updated_tx1 = repo
511            .partial_update("test-tx-id".to_string(), update1)
512            .await
513            .unwrap();
514        assert_eq!(updated_tx1.status, TransactionStatus::Sent);
515        assert_eq!(updated_tx1.sent_at, None);
516
517        // Test updating multiple fields
518        let update2 = TransactionUpdateRequest {
519            status: Some(TransactionStatus::Confirmed),
520            status_reason: None,
521            sent_at: Some("2023-01-01T12:00:00Z".to_string()),
522            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
523            network_data: None,
524            hashes: None,
525            priced_at: None,
526            noop_count: None,
527            is_canceled: None,
528            delete_at: None,
529        };
530        let updated_tx2 = repo
531            .partial_update("test-tx-id".to_string(), update2)
532            .await
533            .unwrap();
534        assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
535        assert_eq!(
536            updated_tx2.sent_at,
537            Some("2023-01-01T12:00:00Z".to_string())
538        );
539        assert_eq!(
540            updated_tx2.confirmed_at,
541            Some("2023-01-01T12:05:00Z".to_string())
542        );
543
544        // Test updating non-existent transaction
545        let update3 = TransactionUpdateRequest {
546            status: Some(TransactionStatus::Failed),
547            status_reason: None,
548            sent_at: None,
549            confirmed_at: None,
550            network_data: None,
551            hashes: None,
552            priced_at: None,
553            noop_count: None,
554            is_canceled: None,
555            delete_at: None,
556        };
557        let result = repo
558            .partial_update("non-existent-id".to_string(), update3)
559            .await;
560        assert!(result.is_err());
561        assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
562    }
563
564    #[tokio::test]
565    async fn test_update_status() {
566        let repo = InMemoryTransactionRepository::new();
567        let tx = create_test_transaction("test-1");
568
569        repo.create(tx).await.unwrap();
570
571        // Update status to Confirmed
572        let updated = repo
573            .update_status("test-1".to_string(), TransactionStatus::Confirmed)
574            .await
575            .unwrap();
576
577        // Verify the status was updated in the returned transaction
578        assert_eq!(updated.status, TransactionStatus::Confirmed);
579
580        // Also verify by getting the transaction directly
581        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
582        assert_eq!(stored.status, TransactionStatus::Confirmed);
583
584        // Update status to Failed
585        let updated = repo
586            .update_status("test-1".to_string(), TransactionStatus::Failed)
587            .await
588            .unwrap();
589
590        // Verify the status was updated
591        assert_eq!(updated.status, TransactionStatus::Failed);
592
593        // Verify updating a non-existent transaction
594        let result = repo
595            .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
596            .await;
597        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
598    }
599
600    #[tokio::test]
601    async fn test_list_paginated() {
602        let repo = InMemoryTransactionRepository::new();
603
604        // Create multiple transactions
605        for i in 1..=10 {
606            let tx = create_test_transaction(&format!("test-{}", i));
607            repo.create(tx).await.unwrap();
608        }
609
610        // Test first page with 3 items per page
611        let query = PaginationQuery {
612            page: 1,
613            per_page: 3,
614        };
615        let result = repo.list_paginated(query).await.unwrap();
616        assert_eq!(result.items.len(), 3);
617        assert_eq!(result.total, 10);
618        assert_eq!(result.page, 1);
619        assert_eq!(result.per_page, 3);
620
621        // Test second page with 3 items per page
622        let query = PaginationQuery {
623            page: 2,
624            per_page: 3,
625        };
626        let result = repo.list_paginated(query).await.unwrap();
627        assert_eq!(result.items.len(), 3);
628        assert_eq!(result.total, 10);
629        assert_eq!(result.page, 2);
630        assert_eq!(result.per_page, 3);
631
632        // Test page with fewer items than per_page
633        let query = PaginationQuery {
634            page: 4,
635            per_page: 3,
636        };
637        let result = repo.list_paginated(query).await.unwrap();
638        assert_eq!(result.items.len(), 1);
639        assert_eq!(result.total, 10);
640        assert_eq!(result.page, 4);
641        assert_eq!(result.per_page, 3);
642
643        // Test empty page (beyond total items)
644        let query = PaginationQuery {
645            page: 5,
646            per_page: 3,
647        };
648        let result = repo.list_paginated(query).await.unwrap();
649        assert_eq!(result.items.len(), 0);
650        assert_eq!(result.total, 10);
651    }
652
653    #[tokio::test]
654    async fn test_find_by_nonce() {
655        let repo = InMemoryTransactionRepository::new();
656
657        // Create transactions with different nonces
658        let tx1 = create_test_transaction("test-1");
659
660        let mut tx2 = create_test_transaction("test-2");
661        if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
662            data.nonce = Some(2);
663        }
664
665        let mut tx3 = create_test_transaction("test-3");
666        tx3.relayer_id = "relayer-2".to_string();
667        if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
668            data.nonce = Some(1);
669        }
670
671        repo.create(tx1).await.unwrap();
672        repo.create(tx2).await.unwrap();
673        repo.create(tx3).await.unwrap();
674
675        // Test finding transaction with specific relayer_id and nonce
676        let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
677        assert!(result.is_some());
678        assert_eq!(result.as_ref().unwrap().id, "test-1");
679
680        // Test finding transaction with a different nonce
681        let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
682        assert!(result.is_some());
683        assert_eq!(result.as_ref().unwrap().id, "test-2");
684
685        // Test finding transaction from a different relayer
686        let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
687        assert!(result.is_some());
688        assert_eq!(result.as_ref().unwrap().id, "test-3");
689
690        // Test finding transaction that doesn't exist
691        let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
692        assert!(result.is_none());
693    }
694
695    #[tokio::test]
696    async fn test_update_network_data() {
697        let repo = InMemoryTransactionRepository::new();
698        let tx = create_test_transaction("test-1");
699
700        repo.create(tx.clone()).await.unwrap();
701
702        // Create new network data with updated values
703        let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
704            gas_price: Some(2000000000),
705            gas_limit: Some(30000),
706            nonce: Some(2),
707            value: U256::from_str("2000000000000000000").unwrap(),
708            data: Some("0xUpdated".to_string()),
709            from: "0xSender".to_string(),
710            to: Some("0xRecipient".to_string()),
711            chain_id: 1,
712            signature: None,
713            hash: Some("0xUpdated".to_string()),
714            raw: None,
715            speed: None,
716            max_fee_per_gas: None,
717            max_priority_fee_per_gas: None,
718        });
719
720        let updated = repo
721            .update_network_data("test-1".to_string(), updated_network_data)
722            .await
723            .unwrap();
724
725        // Verify the network data was updated
726        if let NetworkTransactionData::Evm(data) = &updated.network_data {
727            assert_eq!(data.gas_price, Some(2000000000));
728            assert_eq!(data.gas_limit, Some(30000));
729            assert_eq!(data.nonce, Some(2));
730            assert_eq!(data.hash, Some("0xUpdated".to_string()));
731            assert_eq!(data.data, Some("0xUpdated".to_string()));
732        } else {
733            panic!("Expected EVM network data");
734        }
735    }
736
737    #[tokio::test]
738    async fn test_set_sent_at() {
739        let repo = InMemoryTransactionRepository::new();
740        let tx = create_test_transaction("test-1");
741
742        repo.create(tx).await.unwrap();
743
744        // Updated sent_at timestamp
745        let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
746
747        let updated = repo
748            .set_sent_at("test-1".to_string(), new_sent_at.clone())
749            .await
750            .unwrap();
751
752        // Verify the sent_at timestamp was updated
753        assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
754
755        // Also verify by getting the transaction directly
756        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
757        assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
758    }
759
760    #[tokio::test]
761    async fn test_set_confirmed_at() {
762        let repo = InMemoryTransactionRepository::new();
763        let tx = create_test_transaction("test-1");
764
765        repo.create(tx).await.unwrap();
766
767        // Updated confirmed_at timestamp
768        let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
769
770        let updated = repo
771            .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
772            .await
773            .unwrap();
774
775        // Verify the confirmed_at timestamp was updated
776        assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
777
778        // Also verify by getting the transaction directly
779        let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
780        assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
781    }
782
783    #[tokio::test]
784    async fn test_find_by_relayer_id() {
785        let repo = InMemoryTransactionRepository::new();
786        let tx1 = create_test_transaction("test-1");
787        let tx2 = create_test_transaction("test-2");
788
789        // Create a transaction with a different relayer_id
790        let mut tx3 = create_test_transaction("test-3");
791        tx3.relayer_id = "relayer-2".to_string();
792
793        repo.create(tx1).await.unwrap();
794        repo.create(tx2).await.unwrap();
795        repo.create(tx3).await.unwrap();
796
797        // Test finding transactions for relayer-1
798        let query = PaginationQuery {
799            page: 1,
800            per_page: 10,
801        };
802        let result = repo
803            .find_by_relayer_id("relayer-1", query.clone())
804            .await
805            .unwrap();
806        assert_eq!(result.total, 2);
807        assert_eq!(result.items.len(), 2);
808        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
809
810        // Test finding transactions for relayer-2
811        let result = repo
812            .find_by_relayer_id("relayer-2", query.clone())
813            .await
814            .unwrap();
815        assert_eq!(result.total, 1);
816        assert_eq!(result.items.len(), 1);
817        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
818
819        // Test finding transactions for non-existent relayer
820        let result = repo
821            .find_by_relayer_id("non-existent", query.clone())
822            .await
823            .unwrap();
824        assert_eq!(result.total, 0);
825        assert_eq!(result.items.len(), 0);
826    }
827
828    #[tokio::test]
829    async fn test_find_by_status() {
830        let repo = InMemoryTransactionRepository::new();
831        let tx1 = create_test_transaction_pending_state("tx1");
832        let mut tx2 = create_test_transaction_pending_state("tx2");
833        tx2.status = TransactionStatus::Submitted;
834        let mut tx3 = create_test_transaction_pending_state("tx3");
835        tx3.relayer_id = "relayer-2".to_string();
836        tx3.status = TransactionStatus::Pending;
837
838        repo.create(tx1.clone()).await.unwrap();
839        repo.create(tx2.clone()).await.unwrap();
840        repo.create(tx3.clone()).await.unwrap();
841
842        // Test finding by single status
843        let pending_txs = repo
844            .find_by_status("relayer-1", &[TransactionStatus::Pending])
845            .await
846            .unwrap();
847        assert_eq!(pending_txs.len(), 1);
848        assert_eq!(pending_txs[0].id, "tx1");
849
850        let submitted_txs = repo
851            .find_by_status("relayer-1", &[TransactionStatus::Submitted])
852            .await
853            .unwrap();
854        assert_eq!(submitted_txs.len(), 1);
855        assert_eq!(submitted_txs[0].id, "tx2");
856
857        // Test finding by multiple statuses
858        let multiple_status_txs = repo
859            .find_by_status(
860                "relayer-1",
861                &[TransactionStatus::Pending, TransactionStatus::Submitted],
862            )
863            .await
864            .unwrap();
865        assert_eq!(multiple_status_txs.len(), 2);
866
867        // Test finding for different relayer
868        let relayer2_pending = repo
869            .find_by_status("relayer-2", &[TransactionStatus::Pending])
870            .await
871            .unwrap();
872        assert_eq!(relayer2_pending.len(), 1);
873        assert_eq!(relayer2_pending[0].id, "tx3");
874
875        // Test finding for non-existent relayer
876        let no_txs = repo
877            .find_by_status("non-existent", &[TransactionStatus::Pending])
878            .await
879            .unwrap();
880        assert_eq!(no_txs.len(), 0);
881    }
882
883    #[tokio::test]
884    async fn test_find_by_status_sorted_by_created_at() {
885        let repo = InMemoryTransactionRepository::new();
886
887        // Helper function to create transaction with custom created_at timestamp
888        let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
889            let mut tx = create_test_transaction_pending_state(id);
890            tx.created_at = timestamp.to_string();
891            tx.status = TransactionStatus::Pending;
892            tx
893        };
894
895        // Create transactions with different timestamps (out of chronological order)
896        let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); // Latest
897        let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); // Earliest
898        let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); // Middle
899
900        // Create them in reverse chronological order to test sorting
901        repo.create(tx3.clone()).await.unwrap();
902        repo.create(tx1.clone()).await.unwrap();
903        repo.create(tx2.clone()).await.unwrap();
904
905        // Find by status
906        let result = repo
907            .find_by_status("relayer-1", &[TransactionStatus::Pending])
908            .await
909            .unwrap();
910
911        // Verify they are sorted by created_at (oldest first)
912        assert_eq!(result.len(), 3);
913        assert_eq!(result[0].id, "tx1"); // Earliest
914        assert_eq!(result[1].id, "tx2"); // Middle
915        assert_eq!(result[2].id, "tx3"); // Latest
916
917        // Verify the timestamps are in ascending order
918        assert_eq!(result[0].created_at, "2025-01-27T15:00:00.000000+00:00");
919        assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
920        assert_eq!(result[2].created_at, "2025-01-27T17:00:00.000000+00:00");
921    }
922
923    #[tokio::test]
924    async fn test_has_entries() {
925        let repo = InMemoryTransactionRepository::new();
926        assert!(!repo.has_entries().await.unwrap());
927
928        let tx = create_test_transaction("test");
929        repo.create(tx.clone()).await.unwrap();
930
931        assert!(repo.has_entries().await.unwrap());
932    }
933
934    #[tokio::test]
935    async fn test_drop_all_entries() {
936        let repo = InMemoryTransactionRepository::new();
937        let tx = create_test_transaction("test");
938        repo.create(tx.clone()).await.unwrap();
939
940        assert!(repo.has_entries().await.unwrap());
941
942        repo.drop_all_entries().await.unwrap();
943        assert!(!repo.has_entries().await.unwrap());
944    }
945
946    // Tests for delete_at field setting on final status updates
947
948    #[tokio::test]
949    async fn test_update_status_sets_delete_at_for_final_statuses() {
950        let _lock = ENV_MUTEX.lock().await;
951
952        use chrono::{DateTime, Duration, Utc};
953        use std::env;
954
955        // Use a unique test environment variable to avoid conflicts
956        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
957
958        let repo = InMemoryTransactionRepository::new();
959
960        let final_statuses = [
961            TransactionStatus::Canceled,
962            TransactionStatus::Confirmed,
963            TransactionStatus::Failed,
964            TransactionStatus::Expired,
965        ];
966
967        for (i, status) in final_statuses.iter().enumerate() {
968            let tx_id = format!("test-final-{}", i);
969            let tx = create_test_transaction_pending_state(&tx_id);
970
971            // Ensure transaction has no delete_at initially
972            assert!(tx.delete_at.is_none());
973
974            repo.create(tx).await.unwrap();
975
976            let before_update = Utc::now();
977
978            // Update to final status
979            let updated = repo
980                .update_status(tx_id.clone(), status.clone())
981                .await
982                .unwrap();
983
984            // Should have delete_at set
985            assert!(
986                updated.delete_at.is_some(),
987                "delete_at should be set for status: {:?}",
988                status
989            );
990
991            // Verify the timestamp is reasonable (approximately 6 hours from now)
992            let delete_at_str = updated.delete_at.unwrap();
993            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
994                .expect("delete_at should be valid RFC3339")
995                .with_timezone(&Utc);
996
997            let duration_from_before = delete_at.signed_duration_since(before_update);
998            let expected_duration = Duration::hours(6);
999            let tolerance = Duration::minutes(5);
1000
1001            assert!(
1002                duration_from_before >= expected_duration - tolerance &&
1003                duration_from_before <= expected_duration + tolerance,
1004                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1005                status, duration_from_before
1006            );
1007        }
1008
1009        // Cleanup
1010        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1011    }
1012
1013    #[tokio::test]
1014    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1015        let _lock = ENV_MUTEX.lock().await;
1016
1017        use std::env;
1018
1019        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1020
1021        let repo = InMemoryTransactionRepository::new();
1022
1023        let non_final_statuses = [
1024            TransactionStatus::Pending,
1025            TransactionStatus::Sent,
1026            TransactionStatus::Submitted,
1027            TransactionStatus::Mined,
1028        ];
1029
1030        for (i, status) in non_final_statuses.iter().enumerate() {
1031            let tx_id = format!("test-non-final-{}", i);
1032            let tx = create_test_transaction_pending_state(&tx_id);
1033
1034            repo.create(tx).await.unwrap();
1035
1036            // Update to non-final status
1037            let updated = repo
1038                .update_status(tx_id.clone(), status.clone())
1039                .await
1040                .unwrap();
1041
1042            // Should NOT have delete_at set
1043            assert!(
1044                updated.delete_at.is_none(),
1045                "delete_at should NOT be set for status: {:?}",
1046                status
1047            );
1048        }
1049
1050        // Cleanup
1051        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1052    }
1053
1054    #[tokio::test]
1055    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1056        let _lock = ENV_MUTEX.lock().await;
1057
1058        use chrono::{DateTime, Duration, Utc};
1059        use std::env;
1060
1061        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1062
1063        let repo = InMemoryTransactionRepository::new();
1064        let tx = create_test_transaction_pending_state("test-partial-final");
1065
1066        repo.create(tx).await.unwrap();
1067
1068        let before_update = Utc::now();
1069
1070        // Use partial_update to set status to Confirmed (final status)
1071        let update = TransactionUpdateRequest {
1072            status: Some(TransactionStatus::Confirmed),
1073            status_reason: Some("Transaction completed".to_string()),
1074            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1075            ..Default::default()
1076        };
1077
1078        let updated = repo
1079            .partial_update("test-partial-final".to_string(), update)
1080            .await
1081            .unwrap();
1082
1083        // Should have delete_at set
1084        assert!(
1085            updated.delete_at.is_some(),
1086            "delete_at should be set when updating to Confirmed status"
1087        );
1088
1089        // Verify the timestamp is reasonable (approximately 8 hours from now)
1090        let delete_at_str = updated.delete_at.unwrap();
1091        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1092            .expect("delete_at should be valid RFC3339")
1093            .with_timezone(&Utc);
1094
1095        let duration_from_before = delete_at.signed_duration_since(before_update);
1096        let expected_duration = Duration::hours(8);
1097        let tolerance = Duration::minutes(5);
1098
1099        assert!(
1100            duration_from_before >= expected_duration - tolerance
1101                && duration_from_before <= expected_duration + tolerance,
1102            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1103            duration_from_before
1104        );
1105
1106        // Also verify other fields were updated
1107        assert_eq!(updated.status, TransactionStatus::Confirmed);
1108        assert_eq!(
1109            updated.status_reason,
1110            Some("Transaction completed".to_string())
1111        );
1112        assert_eq!(
1113            updated.confirmed_at,
1114            Some("2023-01-01T12:05:00Z".to_string())
1115        );
1116
1117        // Cleanup
1118        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1119    }
1120
1121    #[tokio::test]
1122    async fn test_update_status_preserves_existing_delete_at() {
1123        let _lock = ENV_MUTEX.lock().await;
1124
1125        use std::env;
1126
1127        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1128
1129        let repo = InMemoryTransactionRepository::new();
1130        let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1131
1132        // Set an existing delete_at value
1133        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1134        tx.delete_at = Some(existing_delete_at.clone());
1135
1136        repo.create(tx).await.unwrap();
1137
1138        // Update to final status
1139        let updated = repo
1140            .update_status(
1141                "test-preserve-delete-at".to_string(),
1142                TransactionStatus::Confirmed,
1143            )
1144            .await
1145            .unwrap();
1146
1147        // Should preserve the existing delete_at value
1148        assert_eq!(
1149            updated.delete_at,
1150            Some(existing_delete_at),
1151            "Existing delete_at should be preserved when updating to final status"
1152        );
1153
1154        // Cleanup
1155        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1156    }
1157
1158    #[tokio::test]
1159    async fn test_partial_update_without_status_change_preserves_delete_at() {
1160        let _lock = ENV_MUTEX.lock().await;
1161
1162        use std::env;
1163
1164        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1165
1166        let repo = InMemoryTransactionRepository::new();
1167        let tx = create_test_transaction_pending_state("test-preserve-no-status");
1168
1169        repo.create(tx).await.unwrap();
1170
1171        // First, update to final status to set delete_at
1172        let updated1 = repo
1173            .update_status(
1174                "test-preserve-no-status".to_string(),
1175                TransactionStatus::Confirmed,
1176            )
1177            .await
1178            .unwrap();
1179
1180        assert!(updated1.delete_at.is_some());
1181        let original_delete_at = updated1.delete_at.clone();
1182
1183        // Now update other fields without changing status
1184        let update = TransactionUpdateRequest {
1185            status: None, // No status change
1186            status_reason: Some("Updated reason".to_string()),
1187            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1188            ..Default::default()
1189        };
1190
1191        let updated2 = repo
1192            .partial_update("test-preserve-no-status".to_string(), update)
1193            .await
1194            .unwrap();
1195
1196        // delete_at should be preserved
1197        assert_eq!(
1198            updated2.delete_at, original_delete_at,
1199            "delete_at should be preserved when status is not updated"
1200        );
1201
1202        // Other fields should be updated
1203        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1204        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1205        assert_eq!(
1206            updated2.confirmed_at,
1207            Some("2023-01-01T12:10:00Z".to_string())
1208        );
1209
1210        // Cleanup
1211        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1212    }
1213
1214    #[tokio::test]
1215    async fn test_update_status_multiple_updates_idempotent() {
1216        let _lock = ENV_MUTEX.lock().await;
1217
1218        use std::env;
1219
1220        env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1221
1222        let repo = InMemoryTransactionRepository::new();
1223        let tx = create_test_transaction_pending_state("test-idempotent");
1224
1225        repo.create(tx).await.unwrap();
1226
1227        // First update to final status
1228        let updated1 = repo
1229            .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1230            .await
1231            .unwrap();
1232
1233        assert!(updated1.delete_at.is_some());
1234        let first_delete_at = updated1.delete_at.clone();
1235
1236        // Second update to another final status
1237        let updated2 = repo
1238            .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1239            .await
1240            .unwrap();
1241
1242        // delete_at should remain the same (idempotent)
1243        assert_eq!(
1244            updated2.delete_at, first_delete_at,
1245            "delete_at should not change on subsequent final status updates"
1246        );
1247
1248        // Status should be updated
1249        assert_eq!(updated2.status, TransactionStatus::Failed);
1250
1251        // Cleanup
1252        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1253    }
1254}