1use crate::models::{
4 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5 TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9 BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27 pub client: Arc<ConnectionManager>,
28 pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34 pub fn new(
35 connection_manager: Arc<ConnectionManager>,
36 key_prefix: String,
37 ) -> Result<Self, RepositoryError> {
38 if key_prefix.is_empty() {
39 return Err(RepositoryError::InvalidData(
40 "Redis key prefix cannot be empty".to_string(),
41 ));
42 }
43
44 Ok(Self {
45 client: connection_manager,
46 key_prefix,
47 })
48 }
49
50 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52 format!(
53 "{}:{}:{}:{}:{}",
54 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55 )
56 }
57
58 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60 format!(
61 "{}:{}:{}:{}",
62 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63 )
64 }
65
66 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68 format!(
69 "{}:{}:{}:{}:{}",
70 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71 )
72 }
73
74 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76 format!(
77 "{}:{}:{}:{}:{}",
78 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79 )
80 }
81
82 fn relayer_list_key(&self) -> String {
84 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85 }
86
87 async fn get_transactions_by_ids(
89 &self,
90 ids: &[String],
91 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92 if ids.is_empty() {
93 debug!("no transaction IDs provided for batch fetch");
94 return Ok(BatchRetrievalResult {
95 results: vec![],
96 failed_ids: vec![],
97 });
98 }
99
100 let mut conn = self.client.as_ref().clone();
101
102 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
105
106 let relayer_ids: Vec<Option<String>> = conn
107 .mget(&reverse_keys)
108 .await
109 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111 let mut tx_keys = Vec::new();
112 let mut valid_ids = Vec::new();
113 let mut failed_ids = Vec::new();
114 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115 match relayer_id {
116 Some(relayer_id) => {
117 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118 valid_ids.push(ids[i].clone());
119 }
120 None => {
121 warn!(tx_id = %ids[i], "no relayer found for transaction");
122 failed_ids.push(ids[i].clone());
123 }
124 }
125 }
126
127 if tx_keys.is_empty() {
128 debug!("no valid transactions found for batch fetch");
129 return Ok(BatchRetrievalResult {
130 results: vec![],
131 failed_ids,
132 });
133 }
134
135 debug!(count = %tx_keys.len(), "batch fetching transaction data");
136
137 let values: Vec<Option<String>> = conn
138 .mget(&tx_keys)
139 .await
140 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142 let mut transactions = Vec::new();
143 let mut failed_count = 0;
144 let mut failed_ids = Vec::new();
145 for (i, value) in values.into_iter().enumerate() {
146 match value {
147 Some(json) => {
148 match self.deserialize_entity::<TransactionRepoModel>(
149 &json,
150 &valid_ids[i],
151 "transaction",
152 ) {
153 Ok(tx) => transactions.push(tx),
154 Err(e) => {
155 failed_count += 1;
156 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
157 }
159 }
160 }
161 None => {
162 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
163 failed_ids.push(valid_ids[i].clone());
164 }
165 }
166 }
167
168 if failed_count > 0 {
169 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
170 }
171
172 debug!(count = %transactions.len(), "successfully fetched transactions");
173 Ok(BatchRetrievalResult {
174 results: transactions,
175 failed_ids,
176 })
177 }
178
179 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
181 match network_data.get_evm_transaction_data() {
182 Ok(tx_data) => tx_data.nonce,
183 Err(_) => {
184 debug!("no EVM transaction data available for nonce extraction");
185 None
186 }
187 }
188 }
189
190 async fn update_indexes(
192 &self,
193 tx: &TransactionRepoModel,
194 old_tx: Option<&TransactionRepoModel>,
195 ) -> Result<(), RepositoryError> {
196 let mut conn = self.client.as_ref().clone();
197 let mut pipe = redis::pipe();
198 pipe.atomic();
199
200 debug!(tx_id = %tx.id, "updating indexes for transaction");
201
202 let relayer_list_key = self.relayer_list_key();
204 pipe.sadd(&relayer_list_key, &tx.relayer_id);
205
206 let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
208 pipe.sadd(&new_status_key, &tx.id);
209
210 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
211 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
212 pipe.set(&nonce_key, &tx.id);
213 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
214 }
215
216 if let Some(old) = old_tx {
218 if old.status != tx.status {
219 let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
220 pipe.srem(&old_status_key, &tx.id);
221 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
222 }
223
224 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
226 let new_nonce = self.extract_nonce(&tx.network_data);
227 if Some(old_nonce) != new_nonce {
228 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
229 pipe.del(&old_nonce_key);
230 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
231 }
232 }
233 }
234
235 pipe.exec_async(&mut conn).await.map_err(|e| {
237 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
238 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
239 })?;
240
241 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
242 Ok(())
243 }
244
245 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
247 let mut conn = self.client.as_ref().clone();
248 let mut pipe = redis::pipe();
249 pipe.atomic();
250
251 debug!(tx_id = %tx.id, "removing all indexes for transaction");
252
253 let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
255 pipe.srem(&status_key, &tx.id);
256
257 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
259 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
260 pipe.del(&nonce_key);
261 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
262 }
263
264 let reverse_key = self.tx_to_relayer_key(&tx.id);
266 pipe.del(&reverse_key);
267
268 pipe.exec_async(&mut conn).await.map_err(|e| {
269 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
270 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
271 })?;
272
273 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
274 Ok(())
275 }
276}
277
278impl fmt::Debug for RedisTransactionRepository {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 f.debug_struct("RedisTransactionRepository")
281 .field("client", &"<ConnectionManager>")
282 .field("key_prefix", &self.key_prefix)
283 .finish()
284 }
285}
286
287#[async_trait]
288impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
289 async fn create(
290 &self,
291 entity: TransactionRepoModel,
292 ) -> Result<TransactionRepoModel, RepositoryError> {
293 if entity.id.is_empty() {
294 return Err(RepositoryError::InvalidData(
295 "Transaction ID cannot be empty".to_string(),
296 ));
297 }
298
299 let key = self.tx_key(&entity.relayer_id, &entity.id);
300 let reverse_key = self.tx_to_relayer_key(&entity.id);
301 let mut conn = self.client.as_ref().clone();
302
303 debug!(tx_id = %entity.id, "creating transaction");
304
305 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
306
307 let existing: Option<String> = conn
309 .get(&reverse_key)
310 .await
311 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
312
313 if existing.is_some() {
314 return Err(RepositoryError::ConstraintViolation(format!(
315 "Transaction with ID {} already exists",
316 entity.id
317 )));
318 }
319
320 let mut pipe = redis::pipe();
322 pipe.atomic();
323 pipe.set(&key, &value);
324 pipe.set(&reverse_key, &entity.relayer_id);
325
326 pipe.exec_async(&mut conn)
327 .await
328 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
329
330 if let Err(e) = self.update_indexes(&entity, None).await {
332 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
333 return Err(e);
334 }
335
336 debug!(tx_id = %entity.id, "successfully created transaction");
337 Ok(entity)
338 }
339
340 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
341 if id.is_empty() {
342 return Err(RepositoryError::InvalidData(
343 "Transaction ID cannot be empty".to_string(),
344 ));
345 }
346
347 let mut conn = self.client.as_ref().clone();
348
349 debug!(tx_id = %id, "fetching transaction");
350
351 let reverse_key = self.tx_to_relayer_key(&id);
352 let relayer_id: Option<String> = conn
353 .get(&reverse_key)
354 .await
355 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
356
357 let relayer_id = match relayer_id {
358 Some(relayer_id) => relayer_id,
359 None => {
360 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
361 return Err(RepositoryError::NotFound(format!(
362 "Transaction with ID {id} not found"
363 )));
364 }
365 };
366
367 let key = self.tx_key(&relayer_id, &id);
368 let value: Option<String> = conn
369 .get(&key)
370 .await
371 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
372
373 match value {
374 Some(json) => {
375 let tx =
376 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
377 debug!(tx_id = %id, "successfully fetched transaction");
378 Ok(tx)
379 }
380 None => {
381 debug!(tx_id = %id, "transaction not found");
382 Err(RepositoryError::NotFound(format!(
383 "Transaction with ID {id} not found"
384 )))
385 }
386 }
387 }
388
389 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
390 let mut conn = self.client.as_ref().clone();
391
392 debug!("fetching all transaction IDs");
393
394 let relayer_list_key = self.relayer_list_key();
396 let relayer_ids: Vec<String> = conn
397 .smembers(&relayer_list_key)
398 .await
399 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
400
401 debug!(count = %relayer_ids.len(), "found relayers");
402
403 let mut all_tx_ids = Vec::new();
405 for relayer_id in relayer_ids {
406 let pattern = format!(
407 "{}:{}:{}:{}:*",
408 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
409 );
410 let mut cursor = 0;
411 loop {
412 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
413 .cursor_arg(cursor)
414 .arg("MATCH")
415 .arg(&pattern)
416 .query_async(&mut conn)
417 .await
418 .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
419
420 for key in keys {
422 if let Some(tx_id) = key.split(':').next_back() {
423 all_tx_ids.push(tx_id.to_string());
424 }
425 }
426
427 cursor = next_cursor;
428 if cursor == 0 {
429 break;
430 }
431 }
432 }
433
434 debug!(count = %all_tx_ids.len(), "found transaction IDs");
435
436 let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
437 Ok(transactions.results)
438 }
439
440 async fn list_paginated(
441 &self,
442 query: PaginationQuery,
443 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
444 if query.per_page == 0 {
445 return Err(RepositoryError::InvalidData(
446 "per_page must be greater than 0".to_string(),
447 ));
448 }
449
450 let mut conn = self.client.as_ref().clone();
451
452 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions");
453
454 let relayer_list_key = self.relayer_list_key();
456 let relayer_ids: Vec<String> = conn
457 .smembers(&relayer_list_key)
458 .await
459 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
460
461 let mut all_tx_ids = Vec::new();
463 for relayer_id in relayer_ids {
464 let pattern = format!(
465 "{}:{}:{}:{}:*",
466 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
467 );
468 let mut cursor = 0;
469 loop {
470 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
471 .cursor_arg(cursor)
472 .arg("MATCH")
473 .arg(&pattern)
474 .query_async(&mut conn)
475 .await
476 .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
477
478 for key in keys {
480 if let Some(tx_id) = key.split(':').next_back() {
481 all_tx_ids.push(tx_id.to_string());
482 }
483 }
484
485 cursor = next_cursor;
486 if cursor == 0 {
487 break;
488 }
489 }
490 }
491
492 let total = all_tx_ids.len() as u64;
493 let start = ((query.page - 1) * query.per_page) as usize;
494 let end = (start + query.per_page as usize).min(all_tx_ids.len());
495
496 if start >= all_tx_ids.len() {
497 debug!(page = %query.page, total = %total, "page is beyond available data");
498 return Ok(PaginatedResult {
499 items: vec![],
500 total,
501 page: query.page,
502 per_page: query.per_page,
503 });
504 }
505
506 let page_ids = &all_tx_ids[start..end];
507 let items = self.get_transactions_by_ids(page_ids).await?;
508
509 debug!(count = %items.results.len(), page = %query.page, "successfully fetched transactions for page");
510
511 Ok(PaginatedResult {
512 items: items.results.clone(),
513 total,
514 page: query.page,
515 per_page: query.per_page,
516 })
517 }
518
519 async fn update(
520 &self,
521 id: String,
522 entity: TransactionRepoModel,
523 ) -> Result<TransactionRepoModel, RepositoryError> {
524 if id.is_empty() {
525 return Err(RepositoryError::InvalidData(
526 "Transaction ID cannot be empty".to_string(),
527 ));
528 }
529
530 debug!(tx_id = %id, "updating transaction");
531
532 let old_tx = self.get_by_id(id.clone()).await?;
534
535 let key = self.tx_key(&entity.relayer_id, &id);
536 let mut conn = self.client.as_ref().clone();
537
538 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
539
540 let _: () = conn
542 .set(&key, value)
543 .await
544 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
545
546 self.update_indexes(&entity, Some(&old_tx)).await?;
548
549 debug!(tx_id = %id, "successfully updated transaction");
550 Ok(entity)
551 }
552
553 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
554 if id.is_empty() {
555 return Err(RepositoryError::InvalidData(
556 "Transaction ID cannot be empty".to_string(),
557 ));
558 }
559
560 debug!(tx_id = %id, "deleting transaction");
561
562 let tx = self.get_by_id(id.clone()).await?;
564
565 let key = self.tx_key(&tx.relayer_id, &id);
566 let reverse_key = self.tx_to_relayer_key(&id);
567 let mut conn = self.client.as_ref().clone();
568
569 let mut pipe = redis::pipe();
570 pipe.atomic();
571 pipe.del(&key);
572 pipe.del(&reverse_key);
573
574 pipe.exec_async(&mut conn)
575 .await
576 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
577
578 if let Err(e) = self.remove_all_indexes(&tx).await {
580 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
581 }
582
583 debug!(tx_id = %id, "successfully deleted transaction");
584 Ok(())
585 }
586
587 async fn count(&self) -> Result<usize, RepositoryError> {
588 let mut conn = self.client.as_ref().clone();
589
590 debug!("counting transactions");
591
592 let relayer_list_key = self.relayer_list_key();
594 let relayer_ids: Vec<String> = conn
595 .smembers(&relayer_list_key)
596 .await
597 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
598
599 let mut total_count = 0;
601 for relayer_id in relayer_ids {
602 let pattern = format!(
603 "{}:{}:{}:{}:*",
604 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
605 );
606 let mut cursor = 0;
607 loop {
608 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
609 .cursor_arg(cursor)
610 .arg("MATCH")
611 .arg(&pattern)
612 .query_async(&mut conn)
613 .await
614 .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
615
616 total_count += keys.len();
617
618 cursor = next_cursor;
619 if cursor == 0 {
620 break;
621 }
622 }
623 }
624
625 debug!(count = %total_count, "transaction count");
626 Ok(total_count)
627 }
628
629 async fn has_entries(&self) -> Result<bool, RepositoryError> {
630 let mut conn = self.client.as_ref().clone();
631 let relayer_list_key = self.relayer_list_key();
632
633 debug!("checking if transaction entries exist");
634
635 let exists: bool = conn
636 .exists(&relayer_list_key)
637 .await
638 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
639
640 debug!(exists = %exists, "transaction entries exist");
641 Ok(exists)
642 }
643
644 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
645 let mut conn = self.client.as_ref().clone();
646 let relayer_list_key = self.relayer_list_key();
647
648 debug!("dropping all transaction entries");
649
650 let relayer_ids: Vec<String> = conn
652 .smembers(&relayer_list_key)
653 .await
654 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
655
656 if relayer_ids.is_empty() {
657 debug!("no transaction entries to drop");
658 return Ok(());
659 }
660
661 let mut pipe = redis::pipe();
663 pipe.atomic();
664
665 for relayer_id in &relayer_ids {
667 let pattern = format!(
669 "{}:{}:{}:{}:*",
670 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
671 );
672 let mut cursor = 0;
673 let mut tx_ids = Vec::new();
674
675 loop {
676 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
677 .cursor_arg(cursor)
678 .arg("MATCH")
679 .arg(&pattern)
680 .query_async(&mut conn)
681 .await
682 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
683
684 for key in keys {
686 pipe.del(&key);
687 if let Some(tx_id) = key.split(':').next_back() {
688 tx_ids.push(tx_id.to_string());
689 }
690 }
691
692 cursor = next_cursor;
693 if cursor == 0 {
694 break;
695 }
696 }
697
698 for tx_id in tx_ids {
700 let reverse_key = self.tx_to_relayer_key(&tx_id);
701 pipe.del(&reverse_key);
702
703 for status in &[
705 TransactionStatus::Pending,
706 TransactionStatus::Sent,
707 TransactionStatus::Confirmed,
708 TransactionStatus::Failed,
709 TransactionStatus::Canceled,
710 ] {
711 let status_key = self.relayer_status_key(relayer_id, status);
712 pipe.srem(&status_key, &tx_id);
713 }
714 }
715 }
716
717 pipe.del(&relayer_list_key);
719
720 pipe.exec_async(&mut conn)
721 .await
722 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
723
724 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
725 Ok(())
726 }
727}
728
729#[async_trait]
730impl TransactionRepository for RedisTransactionRepository {
731 async fn find_by_relayer_id(
732 &self,
733 relayer_id: &str,
734 query: PaginationQuery,
735 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
736 let mut conn = self.client.as_ref().clone();
737
738 let pattern = format!(
740 "{}:{}:{}:{}:*",
741 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
742 );
743 let mut all_tx_ids = Vec::new();
744 let mut cursor = 0;
745
746 loop {
747 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
748 .cursor_arg(cursor)
749 .arg("MATCH")
750 .arg(&pattern)
751 .query_async(&mut conn)
752 .await
753 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
754
755 for key in keys {
757 if let Some(tx_id) = key.split(':').next_back() {
758 all_tx_ids.push(tx_id.to_string());
759 }
760 }
761
762 cursor = next_cursor;
763 if cursor == 0 {
764 break;
765 }
766 }
767
768 let total = all_tx_ids.len() as u64;
769 let start = ((query.page - 1) * query.per_page) as usize;
770 let end = (start + query.per_page as usize).min(all_tx_ids.len());
771
772 let page_ids = &all_tx_ids[start..end];
773 let items = self.get_transactions_by_ids(page_ids).await?;
774
775 Ok(PaginatedResult {
776 items: items.results.clone(),
777 total,
778 page: query.page,
779 per_page: query.per_page,
780 })
781 }
782
783 async fn find_by_status(
784 &self,
785 relayer_id: &str,
786 statuses: &[TransactionStatus],
787 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
788 let mut conn = self.client.as_ref().clone();
789 let mut all_ids = Vec::new();
790
791 for status in statuses {
793 let status_key = self.relayer_status_key(relayer_id, status);
794 let ids: Vec<String> = conn
795 .smembers(status_key)
796 .await
797 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
798
799 all_ids.extend(ids);
800 }
801
802 all_ids.sort();
804 all_ids.dedup();
805
806 let transactions = self.get_transactions_by_ids(&all_ids).await?;
807 Ok(transactions.results)
808 }
809
810 async fn find_by_nonce(
811 &self,
812 relayer_id: &str,
813 nonce: u64,
814 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
815 let mut conn = self.client.as_ref().clone();
816 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
817
818 let tx_id: Option<String> = conn
820 .get(nonce_key)
821 .await
822 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
823
824 match tx_id {
825 Some(tx_id) => {
826 match self.get_by_id(tx_id.clone()).await {
827 Ok(tx) => Ok(Some(tx)),
828 Err(RepositoryError::NotFound(_)) => {
829 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
831 Ok(None)
832 }
833 Err(e) => Err(e),
834 }
835 }
836 None => Ok(None),
837 }
838 }
839
840 async fn update_status(
841 &self,
842 tx_id: String,
843 status: TransactionStatus,
844 ) -> Result<TransactionRepoModel, RepositoryError> {
845 let update = TransactionUpdateRequest {
846 status: Some(status),
847 ..Default::default()
848 };
849 self.partial_update(tx_id, update).await
850 }
851
852 async fn partial_update(
853 &self,
854 tx_id: String,
855 update: TransactionUpdateRequest,
856 ) -> Result<TransactionRepoModel, RepositoryError> {
857 const MAX_RETRIES: u32 = 3;
858 const BACKOFF_MS: u64 = 100;
859
860 let original_tx = self.get_by_id(tx_id.clone()).await?;
866 let mut updated_tx = original_tx.clone();
867 updated_tx.apply_partial_update(update.clone());
868
869 let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
870 let value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
871
872 let mut last_error = None;
873
874 for attempt in 0..MAX_RETRIES {
875 let mut conn = self.client.as_ref().clone();
876
877 let result: Result<(), _> = conn.set(&key, &value).await;
879 match result {
880 Ok(_) => {}
881 Err(e) => {
882 if attempt < MAX_RETRIES - 1 {
883 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to set transaction data, retrying");
884 last_error = Some(self.map_redis_error(e, "partial_update"));
885 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
886 continue;
887 }
888 return Err(self.map_redis_error(e, "partial_update"));
889 }
890 }
891
892 match self.update_indexes(&updated_tx, Some(&original_tx)).await {
895 Ok(_) => {
896 debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
897 return Ok(updated_tx);
898 }
899 Err(e) if attempt < MAX_RETRIES - 1 => {
900 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
901 last_error = Some(e);
902 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
903 continue;
904 }
905 Err(e) => return Err(e),
906 }
907 }
908
909 Err(last_error.unwrap_or_else(|| {
910 RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
911 }))
912 }
913
914 async fn update_network_data(
915 &self,
916 tx_id: String,
917 network_data: NetworkTransactionData,
918 ) -> Result<TransactionRepoModel, RepositoryError> {
919 let update = TransactionUpdateRequest {
920 network_data: Some(network_data),
921 ..Default::default()
922 };
923 self.partial_update(tx_id, update).await
924 }
925
926 async fn set_sent_at(
927 &self,
928 tx_id: String,
929 sent_at: String,
930 ) -> Result<TransactionRepoModel, RepositoryError> {
931 let update = TransactionUpdateRequest {
932 sent_at: Some(sent_at),
933 ..Default::default()
934 };
935 self.partial_update(tx_id, update).await
936 }
937
938 async fn set_confirmed_at(
939 &self,
940 tx_id: String,
941 confirmed_at: String,
942 ) -> Result<TransactionRepoModel, RepositoryError> {
943 let update = TransactionUpdateRequest {
944 confirmed_at: Some(confirmed_at),
945 ..Default::default()
946 };
947 self.partial_update(tx_id, update).await
948 }
949}
950
951#[cfg(test)]
952mod tests {
953 use super::*;
954 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
955 use alloy::primitives::U256;
956 use lazy_static::lazy_static;
957 use redis::Client;
958 use std::str::FromStr;
959 use tokio;
960 use uuid::Uuid;
961
962 use tokio::sync::Mutex;
963
964 lazy_static! {
966 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
967 }
968
969 fn create_test_transaction(id: &str) -> TransactionRepoModel {
971 TransactionRepoModel {
972 id: id.to_string(),
973 relayer_id: "relayer-1".to_string(),
974 status: TransactionStatus::Pending,
975 status_reason: None,
976 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
977 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
978 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
979 valid_until: None,
980 delete_at: None,
981 network_type: NetworkType::Evm,
982 priced_at: None,
983 hashes: vec![],
984 network_data: NetworkTransactionData::Evm(EvmTransactionData {
985 gas_price: Some(1000000000),
986 gas_limit: Some(21000),
987 nonce: Some(1),
988 value: U256::from_str("1000000000000000000").unwrap(),
989 data: Some("0x".to_string()),
990 from: "0xSender".to_string(),
991 to: Some("0xRecipient".to_string()),
992 chain_id: 1,
993 signature: None,
994 hash: Some(format!("0x{}", id)),
995 speed: Some(Speed::Fast),
996 max_fee_per_gas: None,
997 max_priority_fee_per_gas: None,
998 raw: None,
999 }),
1000 noop_count: None,
1001 is_canceled: Some(false),
1002 }
1003 }
1004
1005 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1006 let mut tx = create_test_transaction(id);
1007 tx.relayer_id = relayer_id.to_string();
1008 tx
1009 }
1010
1011 fn create_test_transaction_with_status(
1012 id: &str,
1013 relayer_id: &str,
1014 status: TransactionStatus,
1015 ) -> TransactionRepoModel {
1016 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1017 tx.status = status;
1018 tx
1019 }
1020
1021 fn create_test_transaction_with_nonce(
1022 id: &str,
1023 nonce: u64,
1024 relayer_id: &str,
1025 ) -> TransactionRepoModel {
1026 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1027 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1028 evm_data.nonce = Some(nonce);
1029 }
1030 tx
1031 }
1032
1033 async fn setup_test_repo() -> RedisTransactionRepository {
1034 let redis_url = std::env::var("REDIS_TEST_URL")
1036 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1037
1038 let client = Client::open(redis_url).expect("Failed to create Redis client");
1039 let connection_manager = ConnectionManager::new(client)
1040 .await
1041 .expect("Failed to create connection manager");
1042
1043 let random_id = Uuid::new_v4().to_string();
1044 let key_prefix = format!("test_prefix:{}", random_id);
1045
1046 RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1047 .expect("Failed to create RedisTransactionRepository")
1048 }
1049
1050 #[tokio::test]
1051 #[ignore = "Requires active Redis instance"]
1052 async fn test_new_repository_creation() {
1053 let repo = setup_test_repo().await;
1054 assert!(repo.key_prefix.contains("test_prefix"));
1055 }
1056
1057 #[tokio::test]
1058 #[ignore = "Requires active Redis instance"]
1059 async fn test_new_repository_empty_prefix_fails() {
1060 let redis_url = std::env::var("REDIS_TEST_URL")
1061 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1062 let client = Client::open(redis_url).expect("Failed to create Redis client");
1063 let connection_manager = ConnectionManager::new(client)
1064 .await
1065 .expect("Failed to create connection manager");
1066
1067 let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1068 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1069 }
1070
1071 #[tokio::test]
1072 #[ignore = "Requires active Redis instance"]
1073 async fn test_key_generation() {
1074 let repo = setup_test_repo().await;
1075
1076 assert!(repo
1077 .tx_key("relayer-1", "test-id")
1078 .contains(":relayer:relayer-1:tx:test-id"));
1079 assert!(repo
1080 .tx_to_relayer_key("test-id")
1081 .contains(":relayer:tx_to_relayer:test-id"));
1082 assert!(repo.relayer_list_key().contains(":relayer_list"));
1083 assert!(repo
1084 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1085 .contains(":relayer:relayer-1:status:Pending"));
1086 assert!(repo
1087 .relayer_nonce_key("relayer-1", 42)
1088 .contains(":relayer:relayer-1:nonce:42"));
1089 }
1090
1091 #[tokio::test]
1092 #[ignore = "Requires active Redis instance"]
1093 async fn test_serialize_deserialize_transaction() {
1094 let repo = setup_test_repo().await;
1095 let tx = create_test_transaction("test-1");
1096
1097 let serialized = repo
1098 .serialize_entity(&tx, |t| &t.id, "transaction")
1099 .expect("Serialization should succeed");
1100 let deserialized: TransactionRepoModel = repo
1101 .deserialize_entity(&serialized, "test-1", "transaction")
1102 .expect("Deserialization should succeed");
1103
1104 assert_eq!(tx.id, deserialized.id);
1105 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1106 assert_eq!(tx.status, deserialized.status);
1107 }
1108
1109 #[tokio::test]
1110 #[ignore = "Requires active Redis instance"]
1111 async fn test_extract_nonce() {
1112 let repo = setup_test_repo().await;
1113 let random_id = Uuid::new_v4().to_string();
1114 let relayer_id = Uuid::new_v4().to_string();
1115 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1116
1117 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1118 assert_eq!(nonce, Some(42));
1119 }
1120
1121 #[tokio::test]
1122 #[ignore = "Requires active Redis instance"]
1123 async fn test_create_transaction() {
1124 let repo = setup_test_repo().await;
1125 let random_id = Uuid::new_v4().to_string();
1126 let tx = create_test_transaction(&random_id);
1127
1128 let result = repo.create(tx.clone()).await.unwrap();
1129 assert_eq!(result.id, tx.id);
1130 }
1131
1132 #[tokio::test]
1133 #[ignore = "Requires active Redis instance"]
1134 async fn test_get_transaction() {
1135 let repo = setup_test_repo().await;
1136 let random_id = Uuid::new_v4().to_string();
1137 let tx = create_test_transaction(&random_id);
1138
1139 repo.create(tx.clone()).await.unwrap();
1140 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1141 assert_eq!(stored.id, tx.id);
1142 assert_eq!(stored.relayer_id, tx.relayer_id);
1143 }
1144
1145 #[tokio::test]
1146 #[ignore = "Requires active Redis instance"]
1147 async fn test_update_transaction() {
1148 let repo = setup_test_repo().await;
1149 let random_id = Uuid::new_v4().to_string();
1150 let mut tx = create_test_transaction(&random_id);
1151
1152 repo.create(tx.clone()).await.unwrap();
1153 tx.status = TransactionStatus::Confirmed;
1154
1155 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1156 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1157 }
1158
1159 #[tokio::test]
1160 #[ignore = "Requires active Redis instance"]
1161 async fn test_delete_transaction() {
1162 let repo = setup_test_repo().await;
1163 let random_id = Uuid::new_v4().to_string();
1164 let tx = create_test_transaction(&random_id);
1165
1166 repo.create(tx).await.unwrap();
1167 repo.delete_by_id(random_id.to_string()).await.unwrap();
1168
1169 let result = repo.get_by_id(random_id.to_string()).await;
1170 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1171 }
1172
1173 #[tokio::test]
1174 #[ignore = "Requires active Redis instance"]
1175 async fn test_list_all_transactions() {
1176 let repo = setup_test_repo().await;
1177 let random_id = Uuid::new_v4().to_string();
1178 let random_id2 = Uuid::new_v4().to_string();
1179
1180 let tx1 = create_test_transaction(&random_id);
1181 let tx2 = create_test_transaction(&random_id2);
1182
1183 repo.create(tx1).await.unwrap();
1184 repo.create(tx2).await.unwrap();
1185
1186 let transactions = repo.list_all().await.unwrap();
1187 assert!(transactions.len() >= 2);
1188 }
1189
1190 #[tokio::test]
1191 #[ignore = "Requires active Redis instance"]
1192 async fn test_count_transactions() {
1193 let repo = setup_test_repo().await;
1194 let random_id = Uuid::new_v4().to_string();
1195 let tx = create_test_transaction(&random_id);
1196
1197 let count = repo.count().await.unwrap();
1198 repo.create(tx).await.unwrap();
1199 assert!(repo.count().await.unwrap() > count);
1200 }
1201
1202 #[tokio::test]
1203 #[ignore = "Requires active Redis instance"]
1204 async fn test_get_nonexistent_transaction() {
1205 let repo = setup_test_repo().await;
1206 let result = repo.get_by_id("nonexistent".to_string()).await;
1207 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1208 }
1209
1210 #[tokio::test]
1211 #[ignore = "Requires active Redis instance"]
1212 async fn test_duplicate_transaction_creation() {
1213 let repo = setup_test_repo().await;
1214 let random_id = Uuid::new_v4().to_string();
1215
1216 let tx = create_test_transaction(&random_id);
1217
1218 repo.create(tx.clone()).await.unwrap();
1219 let result = repo.create(tx).await;
1220
1221 assert!(matches!(
1222 result,
1223 Err(RepositoryError::ConstraintViolation(_))
1224 ));
1225 }
1226
1227 #[tokio::test]
1228 #[ignore = "Requires active Redis instance"]
1229 async fn test_update_nonexistent_transaction() {
1230 let repo = setup_test_repo().await;
1231 let tx = create_test_transaction("test-1");
1232
1233 let result = repo.update("nonexistent".to_string(), tx).await;
1234 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1235 }
1236
1237 #[tokio::test]
1238 #[ignore = "Requires active Redis instance"]
1239 async fn test_list_paginated() {
1240 let repo = setup_test_repo().await;
1241
1242 for _ in 1..=10 {
1244 let random_id = Uuid::new_v4().to_string();
1245 let tx = create_test_transaction(&random_id);
1246 repo.create(tx).await.unwrap();
1247 }
1248
1249 let query = PaginationQuery {
1251 page: 1,
1252 per_page: 3,
1253 };
1254 let result = repo.list_paginated(query).await.unwrap();
1255 assert_eq!(result.items.len(), 3);
1256 assert!(result.total >= 10);
1257 assert_eq!(result.page, 1);
1258 assert_eq!(result.per_page, 3);
1259
1260 let query = PaginationQuery {
1262 page: 1000,
1263 per_page: 3,
1264 };
1265 let result = repo.list_paginated(query).await.unwrap();
1266 assert_eq!(result.items.len(), 0);
1267 }
1268
1269 #[tokio::test]
1270 #[ignore = "Requires active Redis instance"]
1271 async fn test_find_by_relayer_id() {
1272 let repo = setup_test_repo().await;
1273 let random_id = Uuid::new_v4().to_string();
1274 let random_id2 = Uuid::new_v4().to_string();
1275 let random_id3 = Uuid::new_v4().to_string();
1276
1277 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1278 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1279 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1280
1281 repo.create(tx1).await.unwrap();
1282 repo.create(tx2).await.unwrap();
1283 repo.create(tx3).await.unwrap();
1284
1285 let query = PaginationQuery {
1287 page: 1,
1288 per_page: 10,
1289 };
1290 let result = repo
1291 .find_by_relayer_id("relayer-1", query.clone())
1292 .await
1293 .unwrap();
1294 assert!(result.total >= 2);
1295 assert!(result.items.len() >= 2);
1296 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1297
1298 let result = repo
1300 .find_by_relayer_id("relayer-2", query.clone())
1301 .await
1302 .unwrap();
1303 assert!(result.total >= 1);
1304 assert!(!result.items.is_empty());
1305 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1306
1307 let result = repo
1309 .find_by_relayer_id("non-existent", query.clone())
1310 .await
1311 .unwrap();
1312 assert_eq!(result.total, 0);
1313 assert_eq!(result.items.len(), 0);
1314 }
1315
1316 #[tokio::test]
1317 #[ignore = "Requires active Redis instance"]
1318 async fn test_find_by_status() {
1319 let repo = setup_test_repo().await;
1320 let random_id = Uuid::new_v4().to_string();
1321 let random_id2 = Uuid::new_v4().to_string();
1322 let random_id3 = Uuid::new_v4().to_string();
1323 let relayer_id = Uuid::new_v4().to_string();
1324 let tx1 = create_test_transaction_with_status(
1325 &random_id,
1326 &relayer_id,
1327 TransactionStatus::Pending,
1328 );
1329 let tx2 =
1330 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1331 let tx3 = create_test_transaction_with_status(
1332 &random_id3,
1333 &relayer_id,
1334 TransactionStatus::Confirmed,
1335 );
1336
1337 repo.create(tx1).await.unwrap();
1338 repo.create(tx2).await.unwrap();
1339 repo.create(tx3).await.unwrap();
1340
1341 let result = repo
1343 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1344 .await
1345 .unwrap();
1346 assert_eq!(result.len(), 1);
1347 assert_eq!(result[0].status, TransactionStatus::Pending);
1348
1349 let result = repo
1351 .find_by_status(
1352 &relayer_id,
1353 &[TransactionStatus::Pending, TransactionStatus::Sent],
1354 )
1355 .await
1356 .unwrap();
1357 assert_eq!(result.len(), 2);
1358
1359 let result = repo
1361 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1362 .await
1363 .unwrap();
1364 assert_eq!(result.len(), 0);
1365 }
1366
1367 #[tokio::test]
1368 #[ignore = "Requires active Redis instance"]
1369 async fn test_find_by_nonce() {
1370 let repo = setup_test_repo().await;
1371 let random_id = Uuid::new_v4().to_string();
1372 let random_id2 = Uuid::new_v4().to_string();
1373 let relayer_id = Uuid::new_v4().to_string();
1374
1375 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1376 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1377
1378 repo.create(tx1.clone()).await.unwrap();
1379 repo.create(tx2).await.unwrap();
1380
1381 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1383 assert!(result.is_some());
1384 assert_eq!(result.unwrap().id, random_id);
1385
1386 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1388 assert!(result.is_none());
1389
1390 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1392 assert!(result.is_none());
1393 }
1394
1395 #[tokio::test]
1396 #[ignore = "Requires active Redis instance"]
1397 async fn test_update_status() {
1398 let repo = setup_test_repo().await;
1399 let random_id = Uuid::new_v4().to_string();
1400 let tx = create_test_transaction(&random_id);
1401
1402 repo.create(tx).await.unwrap();
1403 let updated = repo
1404 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1405 .await
1406 .unwrap();
1407 assert_eq!(updated.status, TransactionStatus::Confirmed);
1408 }
1409
1410 #[tokio::test]
1411 #[ignore = "Requires active Redis instance"]
1412 async fn test_partial_update() {
1413 let repo = setup_test_repo().await;
1414 let random_id = Uuid::new_v4().to_string();
1415 let tx = create_test_transaction(&random_id);
1416
1417 repo.create(tx).await.unwrap();
1418
1419 let update = TransactionUpdateRequest {
1420 status: Some(TransactionStatus::Sent),
1421 status_reason: Some("Transaction sent".to_string()),
1422 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1423 confirmed_at: None,
1424 network_data: None,
1425 hashes: None,
1426 is_canceled: None,
1427 priced_at: None,
1428 noop_count: None,
1429 delete_at: None,
1430 };
1431
1432 let updated = repo
1433 .partial_update(random_id.to_string(), update)
1434 .await
1435 .unwrap();
1436 assert_eq!(updated.status, TransactionStatus::Sent);
1437 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1438 assert_eq!(
1439 updated.sent_at,
1440 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1441 );
1442 }
1443
1444 #[tokio::test]
1445 #[ignore = "Requires active Redis instance"]
1446 async fn test_set_sent_at() {
1447 let repo = setup_test_repo().await;
1448 let random_id = Uuid::new_v4().to_string();
1449 let tx = create_test_transaction(&random_id);
1450
1451 repo.create(tx).await.unwrap();
1452 let updated = repo
1453 .set_sent_at(
1454 random_id.to_string(),
1455 "2025-01-27T16:00:00.000000+00:00".to_string(),
1456 )
1457 .await
1458 .unwrap();
1459 assert_eq!(
1460 updated.sent_at,
1461 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1462 );
1463 }
1464
1465 #[tokio::test]
1466 #[ignore = "Requires active Redis instance"]
1467 async fn test_set_confirmed_at() {
1468 let repo = setup_test_repo().await;
1469 let random_id = Uuid::new_v4().to_string();
1470 let tx = create_test_transaction(&random_id);
1471
1472 repo.create(tx).await.unwrap();
1473 let updated = repo
1474 .set_confirmed_at(
1475 random_id.to_string(),
1476 "2025-01-27T16:00:00.000000+00:00".to_string(),
1477 )
1478 .await
1479 .unwrap();
1480 assert_eq!(
1481 updated.confirmed_at,
1482 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1483 );
1484 }
1485
1486 #[tokio::test]
1487 #[ignore = "Requires active Redis instance"]
1488 async fn test_update_network_data() {
1489 let repo = setup_test_repo().await;
1490 let random_id = Uuid::new_v4().to_string();
1491 let tx = create_test_transaction(&random_id);
1492
1493 repo.create(tx).await.unwrap();
1494
1495 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1496 gas_price: Some(2000000000),
1497 gas_limit: Some(42000),
1498 nonce: Some(2),
1499 value: U256::from_str("2000000000000000000").unwrap(),
1500 data: Some("0x1234".to_string()),
1501 from: "0xNewSender".to_string(),
1502 to: Some("0xNewRecipient".to_string()),
1503 chain_id: 1,
1504 signature: None,
1505 hash: Some("0xnewhash".to_string()),
1506 speed: Some(Speed::SafeLow),
1507 max_fee_per_gas: None,
1508 max_priority_fee_per_gas: None,
1509 raw: None,
1510 });
1511
1512 let updated = repo
1513 .update_network_data(random_id.to_string(), new_network_data.clone())
1514 .await
1515 .unwrap();
1516 assert_eq!(
1517 updated
1518 .network_data
1519 .get_evm_transaction_data()
1520 .unwrap()
1521 .hash,
1522 new_network_data.get_evm_transaction_data().unwrap().hash
1523 );
1524 }
1525
1526 #[tokio::test]
1527 #[ignore = "Requires active Redis instance"]
1528 async fn test_debug_implementation() {
1529 let repo = setup_test_repo().await;
1530 let debug_str = format!("{:?}", repo);
1531 assert!(debug_str.contains("RedisTransactionRepository"));
1532 assert!(debug_str.contains("test_prefix"));
1533 }
1534
1535 #[tokio::test]
1536 #[ignore = "Requires active Redis instance"]
1537 async fn test_error_handling_empty_id() {
1538 let repo = setup_test_repo().await;
1539
1540 let result = repo.get_by_id("".to_string()).await;
1541 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1542
1543 let result = repo
1544 .update("".to_string(), create_test_transaction("test"))
1545 .await;
1546 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1547
1548 let result = repo.delete_by_id("".to_string()).await;
1549 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1550 }
1551
1552 #[tokio::test]
1553 #[ignore = "Requires active Redis instance"]
1554 async fn test_pagination_validation() {
1555 let repo = setup_test_repo().await;
1556
1557 let query = PaginationQuery {
1558 page: 1,
1559 per_page: 0,
1560 };
1561 let result = repo.list_paginated(query).await;
1562 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1563 }
1564
1565 #[tokio::test]
1566 #[ignore = "Requires active Redis instance"]
1567 async fn test_index_consistency() {
1568 let repo = setup_test_repo().await;
1569 let random_id = Uuid::new_v4().to_string();
1570 let relayer_id = Uuid::new_v4().to_string();
1571 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1572
1573 repo.create(tx.clone()).await.unwrap();
1575
1576 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1578 assert!(found.is_some());
1579
1580 let mut updated_tx = tx.clone();
1582 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1583 evm_data.nonce = Some(43);
1584 }
1585
1586 repo.update(random_id.to_string(), updated_tx)
1587 .await
1588 .unwrap();
1589
1590 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1592 assert!(old_nonce_result.is_none());
1593
1594 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1596 assert!(new_nonce_result.is_some());
1597 }
1598
1599 #[tokio::test]
1600 #[ignore = "Requires active Redis instance"]
1601 async fn test_has_entries() {
1602 let repo = setup_test_repo().await;
1603 assert!(!repo.has_entries().await.unwrap());
1604
1605 let tx_id = uuid::Uuid::new_v4().to_string();
1606 let tx = create_test_transaction(&tx_id);
1607 repo.create(tx.clone()).await.unwrap();
1608
1609 assert!(repo.has_entries().await.unwrap());
1610 }
1611
1612 #[tokio::test]
1613 #[ignore = "Requires active Redis instance"]
1614 async fn test_drop_all_entries() {
1615 let repo = setup_test_repo().await;
1616 let tx_id = uuid::Uuid::new_v4().to_string();
1617 let tx = create_test_transaction(&tx_id);
1618 repo.create(tx.clone()).await.unwrap();
1619 assert!(repo.has_entries().await.unwrap());
1620
1621 repo.drop_all_entries().await.unwrap();
1622 assert!(!repo.has_entries().await.unwrap());
1623 }
1624
1625 #[tokio::test]
1627 #[ignore = "Requires active Redis instance"]
1628 async fn test_update_status_sets_delete_at_for_final_statuses() {
1629 let _lock = ENV_MUTEX.lock().await;
1630
1631 use chrono::{DateTime, Duration, Utc};
1632 use std::env;
1633
1634 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1636
1637 let repo = setup_test_repo().await;
1638
1639 let final_statuses = [
1640 TransactionStatus::Canceled,
1641 TransactionStatus::Confirmed,
1642 TransactionStatus::Failed,
1643 TransactionStatus::Expired,
1644 ];
1645
1646 for (i, status) in final_statuses.iter().enumerate() {
1647 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1648 let mut tx = create_test_transaction(&tx_id);
1649
1650 tx.delete_at = None;
1652 tx.status = TransactionStatus::Pending;
1653
1654 repo.create(tx).await.unwrap();
1655
1656 let before_update = Utc::now();
1657
1658 let updated = repo
1660 .update_status(tx_id.clone(), status.clone())
1661 .await
1662 .unwrap();
1663
1664 assert!(
1666 updated.delete_at.is_some(),
1667 "delete_at should be set for status: {:?}",
1668 status
1669 );
1670
1671 let delete_at_str = updated.delete_at.unwrap();
1673 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1674 .expect("delete_at should be valid RFC3339")
1675 .with_timezone(&Utc);
1676
1677 let duration_from_before = delete_at.signed_duration_since(before_update);
1678 let expected_duration = Duration::hours(6);
1679 let tolerance = Duration::minutes(5);
1680
1681 assert!(
1682 duration_from_before >= expected_duration - tolerance &&
1683 duration_from_before <= expected_duration + tolerance,
1684 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1685 status, duration_from_before
1686 );
1687 }
1688
1689 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1691 }
1692
1693 #[tokio::test]
1694 #[ignore = "Requires active Redis instance"]
1695 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1696 let _lock = ENV_MUTEX.lock().await;
1697
1698 use std::env;
1699
1700 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1701
1702 let repo = setup_test_repo().await;
1703
1704 let non_final_statuses = [
1705 TransactionStatus::Pending,
1706 TransactionStatus::Sent,
1707 TransactionStatus::Submitted,
1708 TransactionStatus::Mined,
1709 ];
1710
1711 for (i, status) in non_final_statuses.iter().enumerate() {
1712 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1713 let mut tx = create_test_transaction(&tx_id);
1714 tx.delete_at = None;
1715 tx.status = TransactionStatus::Pending;
1716
1717 repo.create(tx).await.unwrap();
1718
1719 let updated = repo
1721 .update_status(tx_id.clone(), status.clone())
1722 .await
1723 .unwrap();
1724
1725 assert!(
1727 updated.delete_at.is_none(),
1728 "delete_at should NOT be set for status: {:?}",
1729 status
1730 );
1731 }
1732
1733 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1735 }
1736
1737 #[tokio::test]
1738 #[ignore = "Requires active Redis instance"]
1739 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1740 let _lock = ENV_MUTEX.lock().await;
1741
1742 use chrono::{DateTime, Duration, Utc};
1743 use std::env;
1744
1745 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1746
1747 let repo = setup_test_repo().await;
1748 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1749 let mut tx = create_test_transaction(&tx_id);
1750 tx.delete_at = None;
1751 tx.status = TransactionStatus::Pending;
1752
1753 repo.create(tx).await.unwrap();
1754
1755 let before_update = Utc::now();
1756
1757 let update = TransactionUpdateRequest {
1759 status: Some(TransactionStatus::Confirmed),
1760 status_reason: Some("Transaction completed".to_string()),
1761 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1762 ..Default::default()
1763 };
1764
1765 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1766
1767 assert!(
1769 updated.delete_at.is_some(),
1770 "delete_at should be set when updating to Confirmed status"
1771 );
1772
1773 let delete_at_str = updated.delete_at.unwrap();
1775 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1776 .expect("delete_at should be valid RFC3339")
1777 .with_timezone(&Utc);
1778
1779 let duration_from_before = delete_at.signed_duration_since(before_update);
1780 let expected_duration = Duration::hours(8);
1781 let tolerance = Duration::minutes(5);
1782
1783 assert!(
1784 duration_from_before >= expected_duration - tolerance
1785 && duration_from_before <= expected_duration + tolerance,
1786 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1787 duration_from_before
1788 );
1789
1790 assert_eq!(updated.status, TransactionStatus::Confirmed);
1792 assert_eq!(
1793 updated.status_reason,
1794 Some("Transaction completed".to_string())
1795 );
1796 assert_eq!(
1797 updated.confirmed_at,
1798 Some("2023-01-01T12:05:00Z".to_string())
1799 );
1800
1801 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1803 }
1804
1805 #[tokio::test]
1806 #[ignore = "Requires active Redis instance"]
1807 async fn test_update_status_preserves_existing_delete_at() {
1808 let _lock = ENV_MUTEX.lock().await;
1809
1810 use std::env;
1811
1812 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1813
1814 let repo = setup_test_repo().await;
1815 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1816 let mut tx = create_test_transaction(&tx_id);
1817
1818 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1820 tx.delete_at = Some(existing_delete_at.clone());
1821 tx.status = TransactionStatus::Pending;
1822
1823 repo.create(tx).await.unwrap();
1824
1825 let updated = repo
1827 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1828 .await
1829 .unwrap();
1830
1831 assert_eq!(
1833 updated.delete_at,
1834 Some(existing_delete_at),
1835 "Existing delete_at should be preserved when updating to final status"
1836 );
1837
1838 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1840 }
1841 #[tokio::test]
1842 #[ignore = "Requires active Redis instance"]
1843 async fn test_partial_update_without_status_change_preserves_delete_at() {
1844 let _lock = ENV_MUTEX.lock().await;
1845
1846 use std::env;
1847
1848 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1849
1850 let repo = setup_test_repo().await;
1851 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1852 let mut tx = create_test_transaction(&tx_id);
1853 tx.delete_at = None;
1854 tx.status = TransactionStatus::Pending;
1855
1856 repo.create(tx).await.unwrap();
1857
1858 let updated1 = repo
1860 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1861 .await
1862 .unwrap();
1863
1864 assert!(updated1.delete_at.is_some());
1865 let original_delete_at = updated1.delete_at.clone();
1866
1867 let update = TransactionUpdateRequest {
1869 status: None, status_reason: Some("Updated reason".to_string()),
1871 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1872 ..Default::default()
1873 };
1874
1875 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1876
1877 assert_eq!(
1879 updated2.delete_at, original_delete_at,
1880 "delete_at should be preserved when status is not updated"
1881 );
1882
1883 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1886 assert_eq!(
1887 updated2.confirmed_at,
1888 Some("2023-01-01T12:10:00Z".to_string())
1889 );
1890
1891 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1893 }
1894}