1use crate::{
8 models::{
9 NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
10 },
11 repositories::*,
12};
13use async_trait::async_trait;
14use eyre::Result;
15use itertools::Itertools;
16use std::collections::HashMap;
17use tokio::sync::{Mutex, MutexGuard};
18
19#[derive(Debug)]
20pub struct InMemoryTransactionRepository {
21 store: Mutex<HashMap<String, TransactionRepoModel>>,
22}
23
24impl Clone for InMemoryTransactionRepository {
25 fn clone(&self) -> Self {
26 let data = self
28 .store
29 .try_lock()
30 .map(|guard| guard.clone())
31 .unwrap_or_else(|_| HashMap::new());
32
33 Self {
34 store: Mutex::new(data),
35 }
36 }
37}
38
39impl InMemoryTransactionRepository {
40 pub fn new() -> Self {
41 Self {
42 store: Mutex::new(HashMap::new()),
43 }
44 }
45
46 async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
47 Ok(lock.lock().await)
48 }
49}
50
51#[async_trait]
54impl Repository<TransactionRepoModel, String> for InMemoryTransactionRepository {
55 async fn create(
56 &self,
57 tx: TransactionRepoModel,
58 ) -> Result<TransactionRepoModel, RepositoryError> {
59 let mut store = Self::acquire_lock(&self.store).await?;
60 if store.contains_key(&tx.id) {
61 return Err(RepositoryError::ConstraintViolation(format!(
62 "Transaction with ID {} already exists",
63 tx.id
64 )));
65 }
66 store.insert(tx.id.clone(), tx.clone());
67 Ok(tx)
68 }
69
70 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
71 let store = Self::acquire_lock(&self.store).await?;
72 store
73 .get(&id)
74 .cloned()
75 .ok_or_else(|| RepositoryError::NotFound(format!("Transaction with ID {id} not found")))
76 }
77
78 #[allow(clippy::map_entry)]
79 async fn update(
80 &self,
81 id: String,
82 tx: TransactionRepoModel,
83 ) -> Result<TransactionRepoModel, RepositoryError> {
84 let mut store = Self::acquire_lock(&self.store).await?;
85 if store.contains_key(&id) {
86 let mut updated_tx = tx;
87 updated_tx.id = id.clone();
88 store.insert(id, updated_tx.clone());
89 Ok(updated_tx)
90 } else {
91 Err(RepositoryError::NotFound(format!(
92 "Transaction with ID {id} not found"
93 )))
94 }
95 }
96
97 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
98 let mut store = Self::acquire_lock(&self.store).await?;
99 if store.remove(&id).is_some() {
100 Ok(())
101 } else {
102 Err(RepositoryError::NotFound(format!(
103 "Transaction with ID {id} not found"
104 )))
105 }
106 }
107
108 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
109 let store = Self::acquire_lock(&self.store).await?;
110 Ok(store.values().cloned().collect())
111 }
112
113 async fn list_paginated(
114 &self,
115 query: PaginationQuery,
116 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
117 let total = self.count().await?;
118 let start = ((query.page - 1) * query.per_page) as usize;
119 let store = Self::acquire_lock(&self.store).await?;
120 let items: Vec<TransactionRepoModel> = store
121 .values()
122 .skip(start)
123 .take(query.per_page as usize)
124 .cloned()
125 .collect();
126
127 Ok(PaginatedResult {
128 items,
129 total: total as u64,
130 page: query.page,
131 per_page: query.per_page,
132 })
133 }
134
135 async fn count(&self) -> Result<usize, RepositoryError> {
136 let store = Self::acquire_lock(&self.store).await?;
137 Ok(store.len())
138 }
139
140 async fn has_entries(&self) -> Result<bool, RepositoryError> {
141 let store = Self::acquire_lock(&self.store).await?;
142 Ok(!store.is_empty())
143 }
144
145 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
146 let mut store = Self::acquire_lock(&self.store).await?;
147 store.clear();
148 Ok(())
149 }
150}
151
152#[async_trait]
153impl TransactionRepository for InMemoryTransactionRepository {
154 async fn find_by_relayer_id(
155 &self,
156 relayer_id: &str,
157 query: PaginationQuery,
158 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
159 let store = Self::acquire_lock(&self.store).await?;
160 let filtered: Vec<TransactionRepoModel> = store
161 .values()
162 .filter(|tx| tx.relayer_id == relayer_id)
163 .cloned()
164 .collect();
165
166 let total = filtered.len() as u64;
167
168 if total == 0 {
169 return Ok(PaginatedResult::<TransactionRepoModel> {
170 items: vec![],
171 total: 0,
172 page: query.page,
173 per_page: query.per_page,
174 });
175 }
176
177 let start = ((query.page - 1) * query.per_page) as usize;
178
179 let items = filtered
181 .into_iter()
182 .sorted_by(|a, b| a.created_at.cmp(&b.created_at)) .skip(start)
184 .take(query.per_page as usize)
185 .collect();
186
187 Ok(PaginatedResult {
188 items,
189 total,
190 page: query.page,
191 per_page: query.per_page,
192 })
193 }
194
195 async fn find_by_status(
196 &self,
197 relayer_id: &str,
198 statuses: &[TransactionStatus],
199 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
200 let store = Self::acquire_lock(&self.store).await?;
201 let filtered: Vec<TransactionRepoModel> = store
202 .values()
203 .filter(|tx| tx.relayer_id == relayer_id && statuses.contains(&tx.status))
204 .cloned()
205 .collect();
206
207 let sorted = filtered
209 .into_iter()
210 .sorted_by_key(|tx| tx.created_at.clone())
211 .collect();
212
213 Ok(sorted)
214 }
215
216 async fn find_by_nonce(
217 &self,
218 relayer_id: &str,
219 nonce: u64,
220 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
221 let store = Self::acquire_lock(&self.store).await?;
222 let filtered: Vec<TransactionRepoModel> = store
223 .values()
224 .filter(|tx| {
225 tx.relayer_id == relayer_id
226 && match &tx.network_data {
227 NetworkTransactionData::Evm(data) => data.nonce == Some(nonce),
228 _ => false,
229 }
230 })
231 .cloned()
232 .collect();
233
234 Ok(filtered.into_iter().next())
235 }
236
237 async fn update_status(
238 &self,
239 tx_id: String,
240 status: TransactionStatus,
241 ) -> Result<TransactionRepoModel, RepositoryError> {
242 let update = TransactionUpdateRequest {
243 status: Some(status),
244 ..Default::default()
245 };
246 self.partial_update(tx_id, update).await
247 }
248
249 async fn partial_update(
250 &self,
251 tx_id: String,
252 update: TransactionUpdateRequest,
253 ) -> Result<TransactionRepoModel, RepositoryError> {
254 let mut store = Self::acquire_lock(&self.store).await?;
255
256 if let Some(tx) = store.get_mut(&tx_id) {
257 tx.apply_partial_update(update);
259 Ok(tx.clone())
260 } else {
261 Err(RepositoryError::NotFound(format!(
262 "Transaction with ID {tx_id} not found"
263 )))
264 }
265 }
266
267 async fn update_network_data(
268 &self,
269 tx_id: String,
270 network_data: NetworkTransactionData,
271 ) -> Result<TransactionRepoModel, RepositoryError> {
272 let mut tx = self.get_by_id(tx_id.clone()).await?;
273 tx.network_data = network_data;
274 self.update(tx_id, tx).await
275 }
276
277 async fn set_sent_at(
278 &self,
279 tx_id: String,
280 sent_at: String,
281 ) -> Result<TransactionRepoModel, RepositoryError> {
282 let mut tx = self.get_by_id(tx_id.clone()).await?;
283 tx.sent_at = Some(sent_at);
284 self.update(tx_id, tx).await
285 }
286
287 async fn set_confirmed_at(
288 &self,
289 tx_id: String,
290 confirmed_at: String,
291 ) -> Result<TransactionRepoModel, RepositoryError> {
292 let mut tx = self.get_by_id(tx_id.clone()).await?;
293 tx.confirmed_at = Some(confirmed_at);
294 self.update(tx_id, tx).await
295 }
296}
297
298impl Default for InMemoryTransactionRepository {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
307 use lazy_static::lazy_static;
308 use std::str::FromStr;
309
310 use crate::models::U256;
311
312 use super::*;
313
314 use tokio::sync::Mutex;
315
316 lazy_static! {
317 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
318 }
319 fn create_test_transaction(id: &str) -> TransactionRepoModel {
321 TransactionRepoModel {
322 id: id.to_string(),
323 relayer_id: "relayer-1".to_string(),
324 status: TransactionStatus::Pending,
325 status_reason: None,
326 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
327 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
328 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
329 valid_until: None,
330 delete_at: None,
331 network_type: NetworkType::Evm,
332 priced_at: None,
333 hashes: vec![],
334 network_data: NetworkTransactionData::Evm(EvmTransactionData {
335 gas_price: Some(1000000000),
336 gas_limit: Some(21000),
337 nonce: Some(1),
338 value: U256::from_str("1000000000000000000").unwrap(),
339 data: Some("0x".to_string()),
340 from: "0xSender".to_string(),
341 to: Some("0xRecipient".to_string()),
342 chain_id: 1,
343 signature: None,
344 hash: Some(format!("0x{}", id)),
345 speed: Some(Speed::Fast),
346 max_fee_per_gas: None,
347 max_priority_fee_per_gas: None,
348 raw: None,
349 }),
350 noop_count: None,
351 is_canceled: Some(false),
352 }
353 }
354
355 fn create_test_transaction_pending_state(id: &str) -> TransactionRepoModel {
356 TransactionRepoModel {
357 id: id.to_string(),
358 relayer_id: "relayer-1".to_string(),
359 status: TransactionStatus::Pending,
360 status_reason: None,
361 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
362 sent_at: None,
363 confirmed_at: None,
364 valid_until: None,
365 delete_at: None,
366 network_type: NetworkType::Evm,
367 priced_at: None,
368 hashes: vec![],
369 network_data: NetworkTransactionData::Evm(EvmTransactionData {
370 gas_price: Some(1000000000),
371 gas_limit: Some(21000),
372 nonce: Some(1),
373 value: U256::from_str("1000000000000000000").unwrap(),
374 data: Some("0x".to_string()),
375 from: "0xSender".to_string(),
376 to: Some("0xRecipient".to_string()),
377 chain_id: 1,
378 signature: None,
379 hash: Some(format!("0x{}", id)),
380 speed: Some(Speed::Fast),
381 max_fee_per_gas: None,
382 max_priority_fee_per_gas: None,
383 raw: None,
384 }),
385 noop_count: None,
386 is_canceled: Some(false),
387 }
388 }
389
390 #[tokio::test]
391 async fn test_create_transaction() {
392 let repo = InMemoryTransactionRepository::new();
393 let tx = create_test_transaction("test-1");
394
395 let result = repo.create(tx.clone()).await.unwrap();
396 assert_eq!(result.id, tx.id);
397 assert_eq!(repo.count().await.unwrap(), 1);
398 }
399
400 #[tokio::test]
401 async fn test_get_transaction() {
402 let repo = InMemoryTransactionRepository::new();
403 let tx = create_test_transaction("test-1");
404
405 repo.create(tx.clone()).await.unwrap();
406 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
407 if let NetworkTransactionData::Evm(stored_data) = &stored.network_data {
408 if let NetworkTransactionData::Evm(tx_data) = &tx.network_data {
409 assert_eq!(stored_data.hash, tx_data.hash);
410 }
411 }
412 }
413
414 #[tokio::test]
415 async fn test_update_transaction() {
416 let repo = InMemoryTransactionRepository::new();
417 let mut tx = create_test_transaction("test-1");
418
419 repo.create(tx.clone()).await.unwrap();
420 tx.status = TransactionStatus::Confirmed;
421
422 let updated = repo.update("test-1".to_string(), tx).await.unwrap();
423 assert!(matches!(updated.status, TransactionStatus::Confirmed));
424 }
425
426 #[tokio::test]
427 async fn test_delete_transaction() {
428 let repo = InMemoryTransactionRepository::new();
429 let tx = create_test_transaction("test-1");
430
431 repo.create(tx).await.unwrap();
432 repo.delete_by_id("test-1".to_string()).await.unwrap();
433
434 let result = repo.get_by_id("test-1".to_string()).await;
435 assert!(result.is_err());
436 }
437
438 #[tokio::test]
439 async fn test_list_all_transactions() {
440 let repo = InMemoryTransactionRepository::new();
441 let tx1 = create_test_transaction("test-1");
442 let tx2 = create_test_transaction("test-2");
443
444 repo.create(tx1).await.unwrap();
445 repo.create(tx2).await.unwrap();
446
447 let transactions = repo.list_all().await.unwrap();
448 assert_eq!(transactions.len(), 2);
449 }
450
451 #[tokio::test]
452 async fn test_count_transactions() {
453 let repo = InMemoryTransactionRepository::new();
454 let tx = create_test_transaction("test-1");
455
456 assert_eq!(repo.count().await.unwrap(), 0);
457 repo.create(tx).await.unwrap();
458 assert_eq!(repo.count().await.unwrap(), 1);
459 }
460
461 #[tokio::test]
462 async fn test_get_nonexistent_transaction() {
463 let repo = InMemoryTransactionRepository::new();
464 let result = repo.get_by_id("nonexistent".to_string()).await;
465 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
466 }
467
468 #[tokio::test]
469 async fn test_duplicate_transaction_creation() {
470 let repo = InMemoryTransactionRepository::new();
471 let tx = create_test_transaction("test-1");
472
473 repo.create(tx.clone()).await.unwrap();
474 let result = repo.create(tx).await;
475
476 assert!(matches!(
477 result,
478 Err(RepositoryError::ConstraintViolation(_))
479 ));
480 }
481
482 #[tokio::test]
483 async fn test_update_nonexistent_transaction() {
484 let repo = InMemoryTransactionRepository::new();
485 let tx = create_test_transaction("test-1");
486
487 let result = repo.update("nonexistent".to_string(), tx).await;
488 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
489 }
490
491 #[tokio::test]
492 async fn test_partial_update() {
493 let repo = InMemoryTransactionRepository::new();
494 let tx = create_test_transaction_pending_state("test-tx-id");
495 repo.create(tx.clone()).await.unwrap();
496
497 let update1 = TransactionUpdateRequest {
499 status: Some(TransactionStatus::Sent),
500 status_reason: None,
501 sent_at: None,
502 confirmed_at: None,
503 network_data: None,
504 hashes: None,
505 priced_at: None,
506 noop_count: None,
507 is_canceled: None,
508 delete_at: None,
509 };
510 let updated_tx1 = repo
511 .partial_update("test-tx-id".to_string(), update1)
512 .await
513 .unwrap();
514 assert_eq!(updated_tx1.status, TransactionStatus::Sent);
515 assert_eq!(updated_tx1.sent_at, None);
516
517 let update2 = TransactionUpdateRequest {
519 status: Some(TransactionStatus::Confirmed),
520 status_reason: None,
521 sent_at: Some("2023-01-01T12:00:00Z".to_string()),
522 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
523 network_data: None,
524 hashes: None,
525 priced_at: None,
526 noop_count: None,
527 is_canceled: None,
528 delete_at: None,
529 };
530 let updated_tx2 = repo
531 .partial_update("test-tx-id".to_string(), update2)
532 .await
533 .unwrap();
534 assert_eq!(updated_tx2.status, TransactionStatus::Confirmed);
535 assert_eq!(
536 updated_tx2.sent_at,
537 Some("2023-01-01T12:00:00Z".to_string())
538 );
539 assert_eq!(
540 updated_tx2.confirmed_at,
541 Some("2023-01-01T12:05:00Z".to_string())
542 );
543
544 let update3 = TransactionUpdateRequest {
546 status: Some(TransactionStatus::Failed),
547 status_reason: None,
548 sent_at: None,
549 confirmed_at: None,
550 network_data: None,
551 hashes: None,
552 priced_at: None,
553 noop_count: None,
554 is_canceled: None,
555 delete_at: None,
556 };
557 let result = repo
558 .partial_update("non-existent-id".to_string(), update3)
559 .await;
560 assert!(result.is_err());
561 assert!(matches!(result.unwrap_err(), RepositoryError::NotFound(_)));
562 }
563
564 #[tokio::test]
565 async fn test_update_status() {
566 let repo = InMemoryTransactionRepository::new();
567 let tx = create_test_transaction("test-1");
568
569 repo.create(tx).await.unwrap();
570
571 let updated = repo
573 .update_status("test-1".to_string(), TransactionStatus::Confirmed)
574 .await
575 .unwrap();
576
577 assert_eq!(updated.status, TransactionStatus::Confirmed);
579
580 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
582 assert_eq!(stored.status, TransactionStatus::Confirmed);
583
584 let updated = repo
586 .update_status("test-1".to_string(), TransactionStatus::Failed)
587 .await
588 .unwrap();
589
590 assert_eq!(updated.status, TransactionStatus::Failed);
592
593 let result = repo
595 .update_status("non-existent".to_string(), TransactionStatus::Confirmed)
596 .await;
597 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
598 }
599
600 #[tokio::test]
601 async fn test_list_paginated() {
602 let repo = InMemoryTransactionRepository::new();
603
604 for i in 1..=10 {
606 let tx = create_test_transaction(&format!("test-{}", i));
607 repo.create(tx).await.unwrap();
608 }
609
610 let query = PaginationQuery {
612 page: 1,
613 per_page: 3,
614 };
615 let result = repo.list_paginated(query).await.unwrap();
616 assert_eq!(result.items.len(), 3);
617 assert_eq!(result.total, 10);
618 assert_eq!(result.page, 1);
619 assert_eq!(result.per_page, 3);
620
621 let query = PaginationQuery {
623 page: 2,
624 per_page: 3,
625 };
626 let result = repo.list_paginated(query).await.unwrap();
627 assert_eq!(result.items.len(), 3);
628 assert_eq!(result.total, 10);
629 assert_eq!(result.page, 2);
630 assert_eq!(result.per_page, 3);
631
632 let query = PaginationQuery {
634 page: 4,
635 per_page: 3,
636 };
637 let result = repo.list_paginated(query).await.unwrap();
638 assert_eq!(result.items.len(), 1);
639 assert_eq!(result.total, 10);
640 assert_eq!(result.page, 4);
641 assert_eq!(result.per_page, 3);
642
643 let query = PaginationQuery {
645 page: 5,
646 per_page: 3,
647 };
648 let result = repo.list_paginated(query).await.unwrap();
649 assert_eq!(result.items.len(), 0);
650 assert_eq!(result.total, 10);
651 }
652
653 #[tokio::test]
654 async fn test_find_by_nonce() {
655 let repo = InMemoryTransactionRepository::new();
656
657 let tx1 = create_test_transaction("test-1");
659
660 let mut tx2 = create_test_transaction("test-2");
661 if let NetworkTransactionData::Evm(ref mut data) = tx2.network_data {
662 data.nonce = Some(2);
663 }
664
665 let mut tx3 = create_test_transaction("test-3");
666 tx3.relayer_id = "relayer-2".to_string();
667 if let NetworkTransactionData::Evm(ref mut data) = tx3.network_data {
668 data.nonce = Some(1);
669 }
670
671 repo.create(tx1).await.unwrap();
672 repo.create(tx2).await.unwrap();
673 repo.create(tx3).await.unwrap();
674
675 let result = repo.find_by_nonce("relayer-1", 1).await.unwrap();
677 assert!(result.is_some());
678 assert_eq!(result.as_ref().unwrap().id, "test-1");
679
680 let result = repo.find_by_nonce("relayer-1", 2).await.unwrap();
682 assert!(result.is_some());
683 assert_eq!(result.as_ref().unwrap().id, "test-2");
684
685 let result = repo.find_by_nonce("relayer-2", 1).await.unwrap();
687 assert!(result.is_some());
688 assert_eq!(result.as_ref().unwrap().id, "test-3");
689
690 let result = repo.find_by_nonce("relayer-1", 99).await.unwrap();
692 assert!(result.is_none());
693 }
694
695 #[tokio::test]
696 async fn test_update_network_data() {
697 let repo = InMemoryTransactionRepository::new();
698 let tx = create_test_transaction("test-1");
699
700 repo.create(tx.clone()).await.unwrap();
701
702 let updated_network_data = NetworkTransactionData::Evm(EvmTransactionData {
704 gas_price: Some(2000000000),
705 gas_limit: Some(30000),
706 nonce: Some(2),
707 value: U256::from_str("2000000000000000000").unwrap(),
708 data: Some("0xUpdated".to_string()),
709 from: "0xSender".to_string(),
710 to: Some("0xRecipient".to_string()),
711 chain_id: 1,
712 signature: None,
713 hash: Some("0xUpdated".to_string()),
714 raw: None,
715 speed: None,
716 max_fee_per_gas: None,
717 max_priority_fee_per_gas: None,
718 });
719
720 let updated = repo
721 .update_network_data("test-1".to_string(), updated_network_data)
722 .await
723 .unwrap();
724
725 if let NetworkTransactionData::Evm(data) = &updated.network_data {
727 assert_eq!(data.gas_price, Some(2000000000));
728 assert_eq!(data.gas_limit, Some(30000));
729 assert_eq!(data.nonce, Some(2));
730 assert_eq!(data.hash, Some("0xUpdated".to_string()));
731 assert_eq!(data.data, Some("0xUpdated".to_string()));
732 } else {
733 panic!("Expected EVM network data");
734 }
735 }
736
737 #[tokio::test]
738 async fn test_set_sent_at() {
739 let repo = InMemoryTransactionRepository::new();
740 let tx = create_test_transaction("test-1");
741
742 repo.create(tx).await.unwrap();
743
744 let new_sent_at = "2025-02-01T10:00:00.000000+00:00".to_string();
746
747 let updated = repo
748 .set_sent_at("test-1".to_string(), new_sent_at.clone())
749 .await
750 .unwrap();
751
752 assert_eq!(updated.sent_at, Some(new_sent_at.clone()));
754
755 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
757 assert_eq!(stored.sent_at, Some(new_sent_at.clone()));
758 }
759
760 #[tokio::test]
761 async fn test_set_confirmed_at() {
762 let repo = InMemoryTransactionRepository::new();
763 let tx = create_test_transaction("test-1");
764
765 repo.create(tx).await.unwrap();
766
767 let new_confirmed_at = "2025-02-01T11:30:45.123456+00:00".to_string();
769
770 let updated = repo
771 .set_confirmed_at("test-1".to_string(), new_confirmed_at.clone())
772 .await
773 .unwrap();
774
775 assert_eq!(updated.confirmed_at, Some(new_confirmed_at.clone()));
777
778 let stored = repo.get_by_id("test-1".to_string()).await.unwrap();
780 assert_eq!(stored.confirmed_at, Some(new_confirmed_at.clone()));
781 }
782
783 #[tokio::test]
784 async fn test_find_by_relayer_id() {
785 let repo = InMemoryTransactionRepository::new();
786 let tx1 = create_test_transaction("test-1");
787 let tx2 = create_test_transaction("test-2");
788
789 let mut tx3 = create_test_transaction("test-3");
791 tx3.relayer_id = "relayer-2".to_string();
792
793 repo.create(tx1).await.unwrap();
794 repo.create(tx2).await.unwrap();
795 repo.create(tx3).await.unwrap();
796
797 let query = PaginationQuery {
799 page: 1,
800 per_page: 10,
801 };
802 let result = repo
803 .find_by_relayer_id("relayer-1", query.clone())
804 .await
805 .unwrap();
806 assert_eq!(result.total, 2);
807 assert_eq!(result.items.len(), 2);
808 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
809
810 let result = repo
812 .find_by_relayer_id("relayer-2", query.clone())
813 .await
814 .unwrap();
815 assert_eq!(result.total, 1);
816 assert_eq!(result.items.len(), 1);
817 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
818
819 let result = repo
821 .find_by_relayer_id("non-existent", query.clone())
822 .await
823 .unwrap();
824 assert_eq!(result.total, 0);
825 assert_eq!(result.items.len(), 0);
826 }
827
828 #[tokio::test]
829 async fn test_find_by_status() {
830 let repo = InMemoryTransactionRepository::new();
831 let tx1 = create_test_transaction_pending_state("tx1");
832 let mut tx2 = create_test_transaction_pending_state("tx2");
833 tx2.status = TransactionStatus::Submitted;
834 let mut tx3 = create_test_transaction_pending_state("tx3");
835 tx3.relayer_id = "relayer-2".to_string();
836 tx3.status = TransactionStatus::Pending;
837
838 repo.create(tx1.clone()).await.unwrap();
839 repo.create(tx2.clone()).await.unwrap();
840 repo.create(tx3.clone()).await.unwrap();
841
842 let pending_txs = repo
844 .find_by_status("relayer-1", &[TransactionStatus::Pending])
845 .await
846 .unwrap();
847 assert_eq!(pending_txs.len(), 1);
848 assert_eq!(pending_txs[0].id, "tx1");
849
850 let submitted_txs = repo
851 .find_by_status("relayer-1", &[TransactionStatus::Submitted])
852 .await
853 .unwrap();
854 assert_eq!(submitted_txs.len(), 1);
855 assert_eq!(submitted_txs[0].id, "tx2");
856
857 let multiple_status_txs = repo
859 .find_by_status(
860 "relayer-1",
861 &[TransactionStatus::Pending, TransactionStatus::Submitted],
862 )
863 .await
864 .unwrap();
865 assert_eq!(multiple_status_txs.len(), 2);
866
867 let relayer2_pending = repo
869 .find_by_status("relayer-2", &[TransactionStatus::Pending])
870 .await
871 .unwrap();
872 assert_eq!(relayer2_pending.len(), 1);
873 assert_eq!(relayer2_pending[0].id, "tx3");
874
875 let no_txs = repo
877 .find_by_status("non-existent", &[TransactionStatus::Pending])
878 .await
879 .unwrap();
880 assert_eq!(no_txs.len(), 0);
881 }
882
883 #[tokio::test]
884 async fn test_find_by_status_sorted_by_created_at() {
885 let repo = InMemoryTransactionRepository::new();
886
887 let create_tx_with_timestamp = |id: &str, timestamp: &str| -> TransactionRepoModel {
889 let mut tx = create_test_transaction_pending_state(id);
890 tx.created_at = timestamp.to_string();
891 tx.status = TransactionStatus::Pending;
892 tx
893 };
894
895 let tx3 = create_tx_with_timestamp("tx3", "2025-01-27T17:00:00.000000+00:00"); let tx1 = create_tx_with_timestamp("tx1", "2025-01-27T15:00:00.000000+00:00"); let tx2 = create_tx_with_timestamp("tx2", "2025-01-27T16:00:00.000000+00:00"); repo.create(tx3.clone()).await.unwrap();
902 repo.create(tx1.clone()).await.unwrap();
903 repo.create(tx2.clone()).await.unwrap();
904
905 let result = repo
907 .find_by_status("relayer-1", &[TransactionStatus::Pending])
908 .await
909 .unwrap();
910
911 assert_eq!(result.len(), 3);
913 assert_eq!(result[0].id, "tx1"); assert_eq!(result[1].id, "tx2"); assert_eq!(result[2].id, "tx3"); assert_eq!(result[0].created_at, "2025-01-27T15:00:00.000000+00:00");
919 assert_eq!(result[1].created_at, "2025-01-27T16:00:00.000000+00:00");
920 assert_eq!(result[2].created_at, "2025-01-27T17:00:00.000000+00:00");
921 }
922
923 #[tokio::test]
924 async fn test_has_entries() {
925 let repo = InMemoryTransactionRepository::new();
926 assert!(!repo.has_entries().await.unwrap());
927
928 let tx = create_test_transaction("test");
929 repo.create(tx.clone()).await.unwrap();
930
931 assert!(repo.has_entries().await.unwrap());
932 }
933
934 #[tokio::test]
935 async fn test_drop_all_entries() {
936 let repo = InMemoryTransactionRepository::new();
937 let tx = create_test_transaction("test");
938 repo.create(tx.clone()).await.unwrap();
939
940 assert!(repo.has_entries().await.unwrap());
941
942 repo.drop_all_entries().await.unwrap();
943 assert!(!repo.has_entries().await.unwrap());
944 }
945
946 #[tokio::test]
949 async fn test_update_status_sets_delete_at_for_final_statuses() {
950 let _lock = ENV_MUTEX.lock().await;
951
952 use chrono::{DateTime, Duration, Utc};
953 use std::env;
954
955 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
957
958 let repo = InMemoryTransactionRepository::new();
959
960 let final_statuses = [
961 TransactionStatus::Canceled,
962 TransactionStatus::Confirmed,
963 TransactionStatus::Failed,
964 TransactionStatus::Expired,
965 ];
966
967 for (i, status) in final_statuses.iter().enumerate() {
968 let tx_id = format!("test-final-{}", i);
969 let tx = create_test_transaction_pending_state(&tx_id);
970
971 assert!(tx.delete_at.is_none());
973
974 repo.create(tx).await.unwrap();
975
976 let before_update = Utc::now();
977
978 let updated = repo
980 .update_status(tx_id.clone(), status.clone())
981 .await
982 .unwrap();
983
984 assert!(
986 updated.delete_at.is_some(),
987 "delete_at should be set for status: {:?}",
988 status
989 );
990
991 let delete_at_str = updated.delete_at.unwrap();
993 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
994 .expect("delete_at should be valid RFC3339")
995 .with_timezone(&Utc);
996
997 let duration_from_before = delete_at.signed_duration_since(before_update);
998 let expected_duration = Duration::hours(6);
999 let tolerance = Duration::minutes(5);
1000
1001 assert!(
1002 duration_from_before >= expected_duration - tolerance &&
1003 duration_from_before <= expected_duration + tolerance,
1004 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1005 status, duration_from_before
1006 );
1007 }
1008
1009 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1011 }
1012
1013 #[tokio::test]
1014 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1015 let _lock = ENV_MUTEX.lock().await;
1016
1017 use std::env;
1018
1019 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1020
1021 let repo = InMemoryTransactionRepository::new();
1022
1023 let non_final_statuses = [
1024 TransactionStatus::Pending,
1025 TransactionStatus::Sent,
1026 TransactionStatus::Submitted,
1027 TransactionStatus::Mined,
1028 ];
1029
1030 for (i, status) in non_final_statuses.iter().enumerate() {
1031 let tx_id = format!("test-non-final-{}", i);
1032 let tx = create_test_transaction_pending_state(&tx_id);
1033
1034 repo.create(tx).await.unwrap();
1035
1036 let updated = repo
1038 .update_status(tx_id.clone(), status.clone())
1039 .await
1040 .unwrap();
1041
1042 assert!(
1044 updated.delete_at.is_none(),
1045 "delete_at should NOT be set for status: {:?}",
1046 status
1047 );
1048 }
1049
1050 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1052 }
1053
1054 #[tokio::test]
1055 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1056 let _lock = ENV_MUTEX.lock().await;
1057
1058 use chrono::{DateTime, Duration, Utc};
1059 use std::env;
1060
1061 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1062
1063 let repo = InMemoryTransactionRepository::new();
1064 let tx = create_test_transaction_pending_state("test-partial-final");
1065
1066 repo.create(tx).await.unwrap();
1067
1068 let before_update = Utc::now();
1069
1070 let update = TransactionUpdateRequest {
1072 status: Some(TransactionStatus::Confirmed),
1073 status_reason: Some("Transaction completed".to_string()),
1074 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1075 ..Default::default()
1076 };
1077
1078 let updated = repo
1079 .partial_update("test-partial-final".to_string(), update)
1080 .await
1081 .unwrap();
1082
1083 assert!(
1085 updated.delete_at.is_some(),
1086 "delete_at should be set when updating to Confirmed status"
1087 );
1088
1089 let delete_at_str = updated.delete_at.unwrap();
1091 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1092 .expect("delete_at should be valid RFC3339")
1093 .with_timezone(&Utc);
1094
1095 let duration_from_before = delete_at.signed_duration_since(before_update);
1096 let expected_duration = Duration::hours(8);
1097 let tolerance = Duration::minutes(5);
1098
1099 assert!(
1100 duration_from_before >= expected_duration - tolerance
1101 && duration_from_before <= expected_duration + tolerance,
1102 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1103 duration_from_before
1104 );
1105
1106 assert_eq!(updated.status, TransactionStatus::Confirmed);
1108 assert_eq!(
1109 updated.status_reason,
1110 Some("Transaction completed".to_string())
1111 );
1112 assert_eq!(
1113 updated.confirmed_at,
1114 Some("2023-01-01T12:05:00Z".to_string())
1115 );
1116
1117 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1119 }
1120
1121 #[tokio::test]
1122 async fn test_update_status_preserves_existing_delete_at() {
1123 let _lock = ENV_MUTEX.lock().await;
1124
1125 use std::env;
1126
1127 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1128
1129 let repo = InMemoryTransactionRepository::new();
1130 let mut tx = create_test_transaction_pending_state("test-preserve-delete-at");
1131
1132 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1134 tx.delete_at = Some(existing_delete_at.clone());
1135
1136 repo.create(tx).await.unwrap();
1137
1138 let updated = repo
1140 .update_status(
1141 "test-preserve-delete-at".to_string(),
1142 TransactionStatus::Confirmed,
1143 )
1144 .await
1145 .unwrap();
1146
1147 assert_eq!(
1149 updated.delete_at,
1150 Some(existing_delete_at),
1151 "Existing delete_at should be preserved when updating to final status"
1152 );
1153
1154 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1156 }
1157
1158 #[tokio::test]
1159 async fn test_partial_update_without_status_change_preserves_delete_at() {
1160 let _lock = ENV_MUTEX.lock().await;
1161
1162 use std::env;
1163
1164 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1165
1166 let repo = InMemoryTransactionRepository::new();
1167 let tx = create_test_transaction_pending_state("test-preserve-no-status");
1168
1169 repo.create(tx).await.unwrap();
1170
1171 let updated1 = repo
1173 .update_status(
1174 "test-preserve-no-status".to_string(),
1175 TransactionStatus::Confirmed,
1176 )
1177 .await
1178 .unwrap();
1179
1180 assert!(updated1.delete_at.is_some());
1181 let original_delete_at = updated1.delete_at.clone();
1182
1183 let update = TransactionUpdateRequest {
1185 status: None, status_reason: Some("Updated reason".to_string()),
1187 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1188 ..Default::default()
1189 };
1190
1191 let updated2 = repo
1192 .partial_update("test-preserve-no-status".to_string(), update)
1193 .await
1194 .unwrap();
1195
1196 assert_eq!(
1198 updated2.delete_at, original_delete_at,
1199 "delete_at should be preserved when status is not updated"
1200 );
1201
1202 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1205 assert_eq!(
1206 updated2.confirmed_at,
1207 Some("2023-01-01T12:10:00Z".to_string())
1208 );
1209
1210 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1212 }
1213
1214 #[tokio::test]
1215 async fn test_update_status_multiple_updates_idempotent() {
1216 let _lock = ENV_MUTEX.lock().await;
1217
1218 use std::env;
1219
1220 env::set_var("TRANSACTION_EXPIRATION_HOURS", "12");
1221
1222 let repo = InMemoryTransactionRepository::new();
1223 let tx = create_test_transaction_pending_state("test-idempotent");
1224
1225 repo.create(tx).await.unwrap();
1226
1227 let updated1 = repo
1229 .update_status("test-idempotent".to_string(), TransactionStatus::Confirmed)
1230 .await
1231 .unwrap();
1232
1233 assert!(updated1.delete_at.is_some());
1234 let first_delete_at = updated1.delete_at.clone();
1235
1236 let updated2 = repo
1238 .update_status("test-idempotent".to_string(), TransactionStatus::Failed)
1239 .await
1240 .unwrap();
1241
1242 assert_eq!(
1244 updated2.delete_at, first_delete_at,
1245 "delete_at should not change on subsequent final status updates"
1246 );
1247
1248 assert_eq!(updated2.status, TransactionStatus::Failed);
1250
1251 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1253 }
1254}