openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::models::{
4    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5    TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9    BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27    pub client: Arc<ConnectionManager>,
28    pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34    pub fn new(
35        connection_manager: Arc<ConnectionManager>,
36        key_prefix: String,
37    ) -> Result<Self, RepositoryError> {
38        if key_prefix.is_empty() {
39            return Err(RepositoryError::InvalidData(
40                "Redis key prefix cannot be empty".to_string(),
41            ));
42        }
43
44        Ok(Self {
45            client: connection_manager,
46            key_prefix,
47        })
48    }
49
50    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
51    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52        format!(
53            "{}:{}:{}:{}:{}",
54            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55        )
56    }
57
58    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
59    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60        format!(
61            "{}:{}:{}:{}",
62            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63        )
64    }
65
66    /// Generate key for relayer status index: relayer:{relayer_id}:status:{status}
67    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68        format!(
69            "{}:{}:{}:{}:{}",
70            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71        )
72    }
73
74    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
75    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76        format!(
77            "{}:{}:{}:{}:{}",
78            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79        )
80    }
81
82    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
83    fn relayer_list_key(&self) -> String {
84        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85    }
86
87    /// Batch fetch transactions by IDs using reverse lookup
88    async fn get_transactions_by_ids(
89        &self,
90        ids: &[String],
91    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92        if ids.is_empty() {
93            debug!("no transaction IDs provided for batch fetch");
94            return Ok(BatchRetrievalResult {
95                results: vec![],
96                failed_ids: vec![],
97            });
98        }
99
100        let mut conn = self.client.as_ref().clone();
101
102        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
105
106        let relayer_ids: Vec<Option<String>> = conn
107            .mget(&reverse_keys)
108            .await
109            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111        let mut tx_keys = Vec::new();
112        let mut valid_ids = Vec::new();
113        let mut failed_ids = Vec::new();
114        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115            match relayer_id {
116                Some(relayer_id) => {
117                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118                    valid_ids.push(ids[i].clone());
119                }
120                None => {
121                    warn!(tx_id = %ids[i], "no relayer found for transaction");
122                    failed_ids.push(ids[i].clone());
123                }
124            }
125        }
126
127        if tx_keys.is_empty() {
128            debug!("no valid transactions found for batch fetch");
129            return Ok(BatchRetrievalResult {
130                results: vec![],
131                failed_ids,
132            });
133        }
134
135        debug!(count = %tx_keys.len(), "batch fetching transaction data");
136
137        let values: Vec<Option<String>> = conn
138            .mget(&tx_keys)
139            .await
140            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142        let mut transactions = Vec::new();
143        let mut failed_count = 0;
144        let mut failed_ids = Vec::new();
145        for (i, value) in values.into_iter().enumerate() {
146            match value {
147                Some(json) => {
148                    match self.deserialize_entity::<TransactionRepoModel>(
149                        &json,
150                        &valid_ids[i],
151                        "transaction",
152                    ) {
153                        Ok(tx) => transactions.push(tx),
154                        Err(e) => {
155                            failed_count += 1;
156                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
157                            // Continue processing other transactions
158                        }
159                    }
160                }
161                None => {
162                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
163                    failed_ids.push(valid_ids[i].clone());
164                }
165            }
166        }
167
168        if failed_count > 0 {
169            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
170        }
171
172        debug!(count = %transactions.len(), "successfully fetched transactions");
173        Ok(BatchRetrievalResult {
174            results: transactions,
175            failed_ids,
176        })
177    }
178
179    /// Extract nonce from EVM transaction data
180    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
181        match network_data.get_evm_transaction_data() {
182            Ok(tx_data) => tx_data.nonce,
183            Err(_) => {
184                debug!("no EVM transaction data available for nonce extraction");
185                None
186            }
187        }
188    }
189
190    /// Update indexes atomically with comprehensive error handling
191    async fn update_indexes(
192        &self,
193        tx: &TransactionRepoModel,
194        old_tx: Option<&TransactionRepoModel>,
195    ) -> Result<(), RepositoryError> {
196        let mut conn = self.client.as_ref().clone();
197        let mut pipe = redis::pipe();
198        pipe.atomic();
199
200        debug!(tx_id = %tx.id, "updating indexes for transaction");
201
202        // Add relayer to the global relayer list
203        let relayer_list_key = self.relayer_list_key();
204        pipe.sadd(&relayer_list_key, &tx.relayer_id);
205
206        // Handle status index updates
207        let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
208        pipe.sadd(&new_status_key, &tx.id);
209
210        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
211            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
212            pipe.set(&nonce_key, &tx.id);
213            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
214        }
215
216        // Remove old indexes if updating
217        if let Some(old) = old_tx {
218            if old.status != tx.status {
219                let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
220                pipe.srem(&old_status_key, &tx.id);
221                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
222            }
223
224            // Handle nonce index cleanup
225            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
226                let new_nonce = self.extract_nonce(&tx.network_data);
227                if Some(old_nonce) != new_nonce {
228                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
229                    pipe.del(&old_nonce_key);
230                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
231                }
232            }
233        }
234
235        // Execute all operations in a single pipeline
236        pipe.exec_async(&mut conn).await.map_err(|e| {
237            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
238            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
239        })?;
240
241        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
242        Ok(())
243    }
244
245    /// Remove all indexes with error recovery
246    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
247        let mut conn = self.client.as_ref().clone();
248        let mut pipe = redis::pipe();
249        pipe.atomic();
250
251        debug!(tx_id = %tx.id, "removing all indexes for transaction");
252
253        // Remove from status index
254        let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
255        pipe.srem(&status_key, &tx.id);
256
257        // Remove nonce index if exists
258        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
259            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
260            pipe.del(&nonce_key);
261            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
262        }
263
264        // Remove reverse lookup
265        let reverse_key = self.tx_to_relayer_key(&tx.id);
266        pipe.del(&reverse_key);
267
268        pipe.exec_async(&mut conn).await.map_err(|e| {
269            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
270            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
271        })?;
272
273        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
274        Ok(())
275    }
276}
277
278impl fmt::Debug for RedisTransactionRepository {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        f.debug_struct("RedisTransactionRepository")
281            .field("client", &"<ConnectionManager>")
282            .field("key_prefix", &self.key_prefix)
283            .finish()
284    }
285}
286
287#[async_trait]
288impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
289    async fn create(
290        &self,
291        entity: TransactionRepoModel,
292    ) -> Result<TransactionRepoModel, RepositoryError> {
293        if entity.id.is_empty() {
294            return Err(RepositoryError::InvalidData(
295                "Transaction ID cannot be empty".to_string(),
296            ));
297        }
298
299        let key = self.tx_key(&entity.relayer_id, &entity.id);
300        let reverse_key = self.tx_to_relayer_key(&entity.id);
301        let mut conn = self.client.as_ref().clone();
302
303        debug!(tx_id = %entity.id, "creating transaction");
304
305        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
306
307        // Check if transaction already exists by checking reverse lookup
308        let existing: Option<String> = conn
309            .get(&reverse_key)
310            .await
311            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
312
313        if existing.is_some() {
314            return Err(RepositoryError::ConstraintViolation(format!(
315                "Transaction with ID {} already exists",
316                entity.id
317            )));
318        }
319
320        // Use atomic pipeline for consistency
321        let mut pipe = redis::pipe();
322        pipe.atomic();
323        pipe.set(&key, &value);
324        pipe.set(&reverse_key, &entity.relayer_id);
325
326        pipe.exec_async(&mut conn)
327            .await
328            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
329
330        // Update indexes separately to handle partial failures gracefully
331        if let Err(e) = self.update_indexes(&entity, None).await {
332            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
333            return Err(e);
334        }
335
336        debug!(tx_id = %entity.id, "successfully created transaction");
337        Ok(entity)
338    }
339
340    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
341        if id.is_empty() {
342            return Err(RepositoryError::InvalidData(
343                "Transaction ID cannot be empty".to_string(),
344            ));
345        }
346
347        let mut conn = self.client.as_ref().clone();
348
349        debug!(tx_id = %id, "fetching transaction");
350
351        let reverse_key = self.tx_to_relayer_key(&id);
352        let relayer_id: Option<String> = conn
353            .get(&reverse_key)
354            .await
355            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
356
357        let relayer_id = match relayer_id {
358            Some(relayer_id) => relayer_id,
359            None => {
360                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
361                return Err(RepositoryError::NotFound(format!(
362                    "Transaction with ID {id} not found"
363                )));
364            }
365        };
366
367        let key = self.tx_key(&relayer_id, &id);
368        let value: Option<String> = conn
369            .get(&key)
370            .await
371            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
372
373        match value {
374            Some(json) => {
375                let tx =
376                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
377                debug!(tx_id = %id, "successfully fetched transaction");
378                Ok(tx)
379            }
380            None => {
381                debug!(tx_id = %id, "transaction not found");
382                Err(RepositoryError::NotFound(format!(
383                    "Transaction with ID {id} not found"
384                )))
385            }
386        }
387    }
388
389    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
390        let mut conn = self.client.as_ref().clone();
391
392        debug!("fetching all transaction IDs");
393
394        // Get all relayer IDs
395        let relayer_list_key = self.relayer_list_key();
396        let relayer_ids: Vec<String> = conn
397            .smembers(&relayer_list_key)
398            .await
399            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
400
401        debug!(count = %relayer_ids.len(), "found relayers");
402
403        // Collect all transaction IDs from all relayers
404        let mut all_tx_ids = Vec::new();
405        for relayer_id in relayer_ids {
406            let pattern = format!(
407                "{}:{}:{}:{}:*",
408                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
409            );
410            let mut cursor = 0;
411            loop {
412                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
413                    .cursor_arg(cursor)
414                    .arg("MATCH")
415                    .arg(&pattern)
416                    .query_async(&mut conn)
417                    .await
418                    .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
419
420                // Extract transaction IDs from keys
421                for key in keys {
422                    if let Some(tx_id) = key.split(':').next_back() {
423                        all_tx_ids.push(tx_id.to_string());
424                    }
425                }
426
427                cursor = next_cursor;
428                if cursor == 0 {
429                    break;
430                }
431            }
432        }
433
434        debug!(count = %all_tx_ids.len(), "found transaction IDs");
435
436        let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
437        Ok(transactions.results)
438    }
439
440    async fn list_paginated(
441        &self,
442        query: PaginationQuery,
443    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
444        if query.per_page == 0 {
445            return Err(RepositoryError::InvalidData(
446                "per_page must be greater than 0".to_string(),
447            ));
448        }
449
450        let mut conn = self.client.as_ref().clone();
451
452        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions");
453
454        // Get all relayer IDs
455        let relayer_list_key = self.relayer_list_key();
456        let relayer_ids: Vec<String> = conn
457            .smembers(&relayer_list_key)
458            .await
459            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
460
461        // Collect all transaction IDs from all relayers
462        let mut all_tx_ids = Vec::new();
463        for relayer_id in relayer_ids {
464            let pattern = format!(
465                "{}:{}:{}:{}:*",
466                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
467            );
468            let mut cursor = 0;
469            loop {
470                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
471                    .cursor_arg(cursor)
472                    .arg("MATCH")
473                    .arg(&pattern)
474                    .query_async(&mut conn)
475                    .await
476                    .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
477
478                // Extract transaction IDs from keys
479                for key in keys {
480                    if let Some(tx_id) = key.split(':').next_back() {
481                        all_tx_ids.push(tx_id.to_string());
482                    }
483                }
484
485                cursor = next_cursor;
486                if cursor == 0 {
487                    break;
488                }
489            }
490        }
491
492        let total = all_tx_ids.len() as u64;
493        let start = ((query.page - 1) * query.per_page) as usize;
494        let end = (start + query.per_page as usize).min(all_tx_ids.len());
495
496        if start >= all_tx_ids.len() {
497            debug!(page = %query.page, total = %total, "page is beyond available data");
498            return Ok(PaginatedResult {
499                items: vec![],
500                total,
501                page: query.page,
502                per_page: query.per_page,
503            });
504        }
505
506        let page_ids = &all_tx_ids[start..end];
507        let items = self.get_transactions_by_ids(page_ids).await?;
508
509        debug!(count = %items.results.len(), page = %query.page, "successfully fetched transactions for page");
510
511        Ok(PaginatedResult {
512            items: items.results.clone(),
513            total,
514            page: query.page,
515            per_page: query.per_page,
516        })
517    }
518
519    async fn update(
520        &self,
521        id: String,
522        entity: TransactionRepoModel,
523    ) -> Result<TransactionRepoModel, RepositoryError> {
524        if id.is_empty() {
525            return Err(RepositoryError::InvalidData(
526                "Transaction ID cannot be empty".to_string(),
527            ));
528        }
529
530        debug!(tx_id = %id, "updating transaction");
531
532        // Get the old transaction for index cleanup
533        let old_tx = self.get_by_id(id.clone()).await?;
534
535        let key = self.tx_key(&entity.relayer_id, &id);
536        let mut conn = self.client.as_ref().clone();
537
538        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
539
540        // Update transaction
541        let _: () = conn
542            .set(&key, value)
543            .await
544            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
545
546        // Update indexes
547        self.update_indexes(&entity, Some(&old_tx)).await?;
548
549        debug!(tx_id = %id, "successfully updated transaction");
550        Ok(entity)
551    }
552
553    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
554        if id.is_empty() {
555            return Err(RepositoryError::InvalidData(
556                "Transaction ID cannot be empty".to_string(),
557            ));
558        }
559
560        debug!(tx_id = %id, "deleting transaction");
561
562        // Get transaction first for index cleanup
563        let tx = self.get_by_id(id.clone()).await?;
564
565        let key = self.tx_key(&tx.relayer_id, &id);
566        let reverse_key = self.tx_to_relayer_key(&id);
567        let mut conn = self.client.as_ref().clone();
568
569        let mut pipe = redis::pipe();
570        pipe.atomic();
571        pipe.del(&key);
572        pipe.del(&reverse_key);
573
574        pipe.exec_async(&mut conn)
575            .await
576            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
577
578        // Remove indexes (log errors but don't fail the delete)
579        if let Err(e) = self.remove_all_indexes(&tx).await {
580            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
581        }
582
583        debug!(tx_id = %id, "successfully deleted transaction");
584        Ok(())
585    }
586
587    async fn count(&self) -> Result<usize, RepositoryError> {
588        let mut conn = self.client.as_ref().clone();
589
590        debug!("counting transactions");
591
592        // Get all relayer IDs
593        let relayer_list_key = self.relayer_list_key();
594        let relayer_ids: Vec<String> = conn
595            .smembers(&relayer_list_key)
596            .await
597            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
598
599        // Count transactions across all relayers
600        let mut total_count = 0;
601        for relayer_id in relayer_ids {
602            let pattern = format!(
603                "{}:{}:{}:{}:*",
604                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
605            );
606            let mut cursor = 0;
607            loop {
608                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
609                    .cursor_arg(cursor)
610                    .arg("MATCH")
611                    .arg(&pattern)
612                    .query_async(&mut conn)
613                    .await
614                    .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
615
616                total_count += keys.len();
617
618                cursor = next_cursor;
619                if cursor == 0 {
620                    break;
621                }
622            }
623        }
624
625        debug!(count = %total_count, "transaction count");
626        Ok(total_count)
627    }
628
629    async fn has_entries(&self) -> Result<bool, RepositoryError> {
630        let mut conn = self.client.as_ref().clone();
631        let relayer_list_key = self.relayer_list_key();
632
633        debug!("checking if transaction entries exist");
634
635        let exists: bool = conn
636            .exists(&relayer_list_key)
637            .await
638            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
639
640        debug!(exists = %exists, "transaction entries exist");
641        Ok(exists)
642    }
643
644    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
645        let mut conn = self.client.as_ref().clone();
646        let relayer_list_key = self.relayer_list_key();
647
648        debug!("dropping all transaction entries");
649
650        // Get all relayer IDs first
651        let relayer_ids: Vec<String> = conn
652            .smembers(&relayer_list_key)
653            .await
654            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
655
656        if relayer_ids.is_empty() {
657            debug!("no transaction entries to drop");
658            return Ok(());
659        }
660
661        // Use pipeline for atomic operations
662        let mut pipe = redis::pipe();
663        pipe.atomic();
664
665        // Delete all transactions and their indexes for each relayer
666        for relayer_id in &relayer_ids {
667            // Get all transaction IDs for this relayer
668            let pattern = format!(
669                "{}:{}:{}:{}:*",
670                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
671            );
672            let mut cursor = 0;
673            let mut tx_ids = Vec::new();
674
675            loop {
676                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
677                    .cursor_arg(cursor)
678                    .arg("MATCH")
679                    .arg(&pattern)
680                    .query_async(&mut conn)
681                    .await
682                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
683
684                // Extract transaction IDs from keys and delete keys
685                for key in keys {
686                    pipe.del(&key);
687                    if let Some(tx_id) = key.split(':').next_back() {
688                        tx_ids.push(tx_id.to_string());
689                    }
690                }
691
692                cursor = next_cursor;
693                if cursor == 0 {
694                    break;
695                }
696            }
697
698            // Delete reverse lookup keys and indexes
699            for tx_id in tx_ids {
700                let reverse_key = self.tx_to_relayer_key(&tx_id);
701                pipe.del(&reverse_key);
702
703                // Delete status indexes (we can't know the specific status, so we'll clean up known ones)
704                for status in &[
705                    TransactionStatus::Pending,
706                    TransactionStatus::Sent,
707                    TransactionStatus::Confirmed,
708                    TransactionStatus::Failed,
709                    TransactionStatus::Canceled,
710                ] {
711                    let status_key = self.relayer_status_key(relayer_id, status);
712                    pipe.srem(&status_key, &tx_id);
713                }
714            }
715        }
716
717        // Delete the relayer list key
718        pipe.del(&relayer_list_key);
719
720        pipe.exec_async(&mut conn)
721            .await
722            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
723
724        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
725        Ok(())
726    }
727}
728
729#[async_trait]
730impl TransactionRepository for RedisTransactionRepository {
731    async fn find_by_relayer_id(
732        &self,
733        relayer_id: &str,
734        query: PaginationQuery,
735    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
736        let mut conn = self.client.as_ref().clone();
737
738        // Scan for all transaction keys for this relayer
739        let pattern = format!(
740            "{}:{}:{}:{}:*",
741            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
742        );
743        let mut all_tx_ids = Vec::new();
744        let mut cursor = 0;
745
746        loop {
747            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
748                .cursor_arg(cursor)
749                .arg("MATCH")
750                .arg(&pattern)
751                .query_async(&mut conn)
752                .await
753                .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
754
755            // Extract transaction IDs from keys
756            for key in keys {
757                if let Some(tx_id) = key.split(':').next_back() {
758                    all_tx_ids.push(tx_id.to_string());
759                }
760            }
761
762            cursor = next_cursor;
763            if cursor == 0 {
764                break;
765            }
766        }
767
768        let total = all_tx_ids.len() as u64;
769        let start = ((query.page - 1) * query.per_page) as usize;
770        let end = (start + query.per_page as usize).min(all_tx_ids.len());
771
772        let page_ids = &all_tx_ids[start..end];
773        let items = self.get_transactions_by_ids(page_ids).await?;
774
775        Ok(PaginatedResult {
776            items: items.results.clone(),
777            total,
778            page: query.page,
779            per_page: query.per_page,
780        })
781    }
782
783    async fn find_by_status(
784        &self,
785        relayer_id: &str,
786        statuses: &[TransactionStatus],
787    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
788        let mut conn = self.client.as_ref().clone();
789        let mut all_ids = Vec::new();
790
791        // Collect IDs from all status sets
792        for status in statuses {
793            let status_key = self.relayer_status_key(relayer_id, status);
794            let ids: Vec<String> = conn
795                .smembers(status_key)
796                .await
797                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
798
799            all_ids.extend(ids);
800        }
801
802        // Remove duplicates and batch fetch
803        all_ids.sort();
804        all_ids.dedup();
805
806        let transactions = self.get_transactions_by_ids(&all_ids).await?;
807        Ok(transactions.results)
808    }
809
810    async fn find_by_nonce(
811        &self,
812        relayer_id: &str,
813        nonce: u64,
814    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
815        let mut conn = self.client.as_ref().clone();
816        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
817
818        // Get transaction ID with this nonce for this relayer (should be single value)
819        let tx_id: Option<String> = conn
820            .get(nonce_key)
821            .await
822            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
823
824        match tx_id {
825            Some(tx_id) => {
826                match self.get_by_id(tx_id.clone()).await {
827                    Ok(tx) => Ok(Some(tx)),
828                    Err(RepositoryError::NotFound(_)) => {
829                        // Transaction was deleted but index wasn't cleaned up
830                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
831                        Ok(None)
832                    }
833                    Err(e) => Err(e),
834                }
835            }
836            None => Ok(None),
837        }
838    }
839
840    async fn update_status(
841        &self,
842        tx_id: String,
843        status: TransactionStatus,
844    ) -> Result<TransactionRepoModel, RepositoryError> {
845        let update = TransactionUpdateRequest {
846            status: Some(status),
847            ..Default::default()
848        };
849        self.partial_update(tx_id, update).await
850    }
851
852    async fn partial_update(
853        &self,
854        tx_id: String,
855        update: TransactionUpdateRequest,
856    ) -> Result<TransactionRepoModel, RepositoryError> {
857        const MAX_RETRIES: u32 = 3;
858        const BACKOFF_MS: u64 = 100;
859
860        // Fetch the original transaction state ONCE before retrying.
861        // This is critical: if conn.set() succeeds but update_indexes() fails,
862        // subsequent retries must still reference the original state to remove
863        // stale index entries. Otherwise, get_by_id() returns the already-updated
864        // record and update_indexes() skips removing the old indexes.
865        let original_tx = self.get_by_id(tx_id.clone()).await?;
866        let mut updated_tx = original_tx.clone();
867        updated_tx.apply_partial_update(update.clone());
868
869        let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
870        let value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
871
872        let mut last_error = None;
873
874        for attempt in 0..MAX_RETRIES {
875            let mut conn = self.client.as_ref().clone();
876
877            // Try to update transaction data
878            let result: Result<(), _> = conn.set(&key, &value).await;
879            match result {
880                Ok(_) => {}
881                Err(e) => {
882                    if attempt < MAX_RETRIES - 1 {
883                        warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to set transaction data, retrying");
884                        last_error = Some(self.map_redis_error(e, "partial_update"));
885                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
886                        continue;
887                    }
888                    return Err(self.map_redis_error(e, "partial_update"));
889                }
890            }
891
892            // Try to update indexes with the original pre-update state
893            // This ensures stale indexes are removed even on retry attempts
894            match self.update_indexes(&updated_tx, Some(&original_tx)).await {
895                Ok(_) => {
896                    debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
897                    return Ok(updated_tx);
898                }
899                Err(e) if attempt < MAX_RETRIES - 1 => {
900                    warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
901                    last_error = Some(e);
902                    tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
903                    continue;
904                }
905                Err(e) => return Err(e),
906            }
907        }
908
909        Err(last_error.unwrap_or_else(|| {
910            RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
911        }))
912    }
913
914    async fn update_network_data(
915        &self,
916        tx_id: String,
917        network_data: NetworkTransactionData,
918    ) -> Result<TransactionRepoModel, RepositoryError> {
919        let update = TransactionUpdateRequest {
920            network_data: Some(network_data),
921            ..Default::default()
922        };
923        self.partial_update(tx_id, update).await
924    }
925
926    async fn set_sent_at(
927        &self,
928        tx_id: String,
929        sent_at: String,
930    ) -> Result<TransactionRepoModel, RepositoryError> {
931        let update = TransactionUpdateRequest {
932            sent_at: Some(sent_at),
933            ..Default::default()
934        };
935        self.partial_update(tx_id, update).await
936    }
937
938    async fn set_confirmed_at(
939        &self,
940        tx_id: String,
941        confirmed_at: String,
942    ) -> Result<TransactionRepoModel, RepositoryError> {
943        let update = TransactionUpdateRequest {
944            confirmed_at: Some(confirmed_at),
945            ..Default::default()
946        };
947        self.partial_update(tx_id, update).await
948    }
949}
950
951#[cfg(test)]
952mod tests {
953    use super::*;
954    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
955    use alloy::primitives::U256;
956    use lazy_static::lazy_static;
957    use redis::Client;
958    use std::str::FromStr;
959    use tokio;
960    use uuid::Uuid;
961
962    use tokio::sync::Mutex;
963
964    // Use a mutex to ensure tests don't run in parallel when modifying env vars
965    lazy_static! {
966        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
967    }
968
969    // Helper function to create test transactions
970    fn create_test_transaction(id: &str) -> TransactionRepoModel {
971        TransactionRepoModel {
972            id: id.to_string(),
973            relayer_id: "relayer-1".to_string(),
974            status: TransactionStatus::Pending,
975            status_reason: None,
976            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
977            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
978            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
979            valid_until: None,
980            delete_at: None,
981            network_type: NetworkType::Evm,
982            priced_at: None,
983            hashes: vec![],
984            network_data: NetworkTransactionData::Evm(EvmTransactionData {
985                gas_price: Some(1000000000),
986                gas_limit: Some(21000),
987                nonce: Some(1),
988                value: U256::from_str("1000000000000000000").unwrap(),
989                data: Some("0x".to_string()),
990                from: "0xSender".to_string(),
991                to: Some("0xRecipient".to_string()),
992                chain_id: 1,
993                signature: None,
994                hash: Some(format!("0x{}", id)),
995                speed: Some(Speed::Fast),
996                max_fee_per_gas: None,
997                max_priority_fee_per_gas: None,
998                raw: None,
999            }),
1000            noop_count: None,
1001            is_canceled: Some(false),
1002        }
1003    }
1004
1005    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1006        let mut tx = create_test_transaction(id);
1007        tx.relayer_id = relayer_id.to_string();
1008        tx
1009    }
1010
1011    fn create_test_transaction_with_status(
1012        id: &str,
1013        relayer_id: &str,
1014        status: TransactionStatus,
1015    ) -> TransactionRepoModel {
1016        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1017        tx.status = status;
1018        tx
1019    }
1020
1021    fn create_test_transaction_with_nonce(
1022        id: &str,
1023        nonce: u64,
1024        relayer_id: &str,
1025    ) -> TransactionRepoModel {
1026        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1027        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1028            evm_data.nonce = Some(nonce);
1029        }
1030        tx
1031    }
1032
1033    async fn setup_test_repo() -> RedisTransactionRepository {
1034        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1035        let redis_url = std::env::var("REDIS_TEST_URL")
1036            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1037
1038        let client = Client::open(redis_url).expect("Failed to create Redis client");
1039        let connection_manager = ConnectionManager::new(client)
1040            .await
1041            .expect("Failed to create connection manager");
1042
1043        let random_id = Uuid::new_v4().to_string();
1044        let key_prefix = format!("test_prefix:{}", random_id);
1045
1046        RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1047            .expect("Failed to create RedisTransactionRepository")
1048    }
1049
1050    #[tokio::test]
1051    #[ignore = "Requires active Redis instance"]
1052    async fn test_new_repository_creation() {
1053        let repo = setup_test_repo().await;
1054        assert!(repo.key_prefix.contains("test_prefix"));
1055    }
1056
1057    #[tokio::test]
1058    #[ignore = "Requires active Redis instance"]
1059    async fn test_new_repository_empty_prefix_fails() {
1060        let redis_url = std::env::var("REDIS_TEST_URL")
1061            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1062        let client = Client::open(redis_url).expect("Failed to create Redis client");
1063        let connection_manager = ConnectionManager::new(client)
1064            .await
1065            .expect("Failed to create connection manager");
1066
1067        let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1068        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1069    }
1070
1071    #[tokio::test]
1072    #[ignore = "Requires active Redis instance"]
1073    async fn test_key_generation() {
1074        let repo = setup_test_repo().await;
1075
1076        assert!(repo
1077            .tx_key("relayer-1", "test-id")
1078            .contains(":relayer:relayer-1:tx:test-id"));
1079        assert!(repo
1080            .tx_to_relayer_key("test-id")
1081            .contains(":relayer:tx_to_relayer:test-id"));
1082        assert!(repo.relayer_list_key().contains(":relayer_list"));
1083        assert!(repo
1084            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1085            .contains(":relayer:relayer-1:status:Pending"));
1086        assert!(repo
1087            .relayer_nonce_key("relayer-1", 42)
1088            .contains(":relayer:relayer-1:nonce:42"));
1089    }
1090
1091    #[tokio::test]
1092    #[ignore = "Requires active Redis instance"]
1093    async fn test_serialize_deserialize_transaction() {
1094        let repo = setup_test_repo().await;
1095        let tx = create_test_transaction("test-1");
1096
1097        let serialized = repo
1098            .serialize_entity(&tx, |t| &t.id, "transaction")
1099            .expect("Serialization should succeed");
1100        let deserialized: TransactionRepoModel = repo
1101            .deserialize_entity(&serialized, "test-1", "transaction")
1102            .expect("Deserialization should succeed");
1103
1104        assert_eq!(tx.id, deserialized.id);
1105        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1106        assert_eq!(tx.status, deserialized.status);
1107    }
1108
1109    #[tokio::test]
1110    #[ignore = "Requires active Redis instance"]
1111    async fn test_extract_nonce() {
1112        let repo = setup_test_repo().await;
1113        let random_id = Uuid::new_v4().to_string();
1114        let relayer_id = Uuid::new_v4().to_string();
1115        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1116
1117        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1118        assert_eq!(nonce, Some(42));
1119    }
1120
1121    #[tokio::test]
1122    #[ignore = "Requires active Redis instance"]
1123    async fn test_create_transaction() {
1124        let repo = setup_test_repo().await;
1125        let random_id = Uuid::new_v4().to_string();
1126        let tx = create_test_transaction(&random_id);
1127
1128        let result = repo.create(tx.clone()).await.unwrap();
1129        assert_eq!(result.id, tx.id);
1130    }
1131
1132    #[tokio::test]
1133    #[ignore = "Requires active Redis instance"]
1134    async fn test_get_transaction() {
1135        let repo = setup_test_repo().await;
1136        let random_id = Uuid::new_v4().to_string();
1137        let tx = create_test_transaction(&random_id);
1138
1139        repo.create(tx.clone()).await.unwrap();
1140        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1141        assert_eq!(stored.id, tx.id);
1142        assert_eq!(stored.relayer_id, tx.relayer_id);
1143    }
1144
1145    #[tokio::test]
1146    #[ignore = "Requires active Redis instance"]
1147    async fn test_update_transaction() {
1148        let repo = setup_test_repo().await;
1149        let random_id = Uuid::new_v4().to_string();
1150        let mut tx = create_test_transaction(&random_id);
1151
1152        repo.create(tx.clone()).await.unwrap();
1153        tx.status = TransactionStatus::Confirmed;
1154
1155        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1156        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1157    }
1158
1159    #[tokio::test]
1160    #[ignore = "Requires active Redis instance"]
1161    async fn test_delete_transaction() {
1162        let repo = setup_test_repo().await;
1163        let random_id = Uuid::new_v4().to_string();
1164        let tx = create_test_transaction(&random_id);
1165
1166        repo.create(tx).await.unwrap();
1167        repo.delete_by_id(random_id.to_string()).await.unwrap();
1168
1169        let result = repo.get_by_id(random_id.to_string()).await;
1170        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1171    }
1172
1173    #[tokio::test]
1174    #[ignore = "Requires active Redis instance"]
1175    async fn test_list_all_transactions() {
1176        let repo = setup_test_repo().await;
1177        let random_id = Uuid::new_v4().to_string();
1178        let random_id2 = Uuid::new_v4().to_string();
1179
1180        let tx1 = create_test_transaction(&random_id);
1181        let tx2 = create_test_transaction(&random_id2);
1182
1183        repo.create(tx1).await.unwrap();
1184        repo.create(tx2).await.unwrap();
1185
1186        let transactions = repo.list_all().await.unwrap();
1187        assert!(transactions.len() >= 2);
1188    }
1189
1190    #[tokio::test]
1191    #[ignore = "Requires active Redis instance"]
1192    async fn test_count_transactions() {
1193        let repo = setup_test_repo().await;
1194        let random_id = Uuid::new_v4().to_string();
1195        let tx = create_test_transaction(&random_id);
1196
1197        let count = repo.count().await.unwrap();
1198        repo.create(tx).await.unwrap();
1199        assert!(repo.count().await.unwrap() > count);
1200    }
1201
1202    #[tokio::test]
1203    #[ignore = "Requires active Redis instance"]
1204    async fn test_get_nonexistent_transaction() {
1205        let repo = setup_test_repo().await;
1206        let result = repo.get_by_id("nonexistent".to_string()).await;
1207        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1208    }
1209
1210    #[tokio::test]
1211    #[ignore = "Requires active Redis instance"]
1212    async fn test_duplicate_transaction_creation() {
1213        let repo = setup_test_repo().await;
1214        let random_id = Uuid::new_v4().to_string();
1215
1216        let tx = create_test_transaction(&random_id);
1217
1218        repo.create(tx.clone()).await.unwrap();
1219        let result = repo.create(tx).await;
1220
1221        assert!(matches!(
1222            result,
1223            Err(RepositoryError::ConstraintViolation(_))
1224        ));
1225    }
1226
1227    #[tokio::test]
1228    #[ignore = "Requires active Redis instance"]
1229    async fn test_update_nonexistent_transaction() {
1230        let repo = setup_test_repo().await;
1231        let tx = create_test_transaction("test-1");
1232
1233        let result = repo.update("nonexistent".to_string(), tx).await;
1234        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1235    }
1236
1237    #[tokio::test]
1238    #[ignore = "Requires active Redis instance"]
1239    async fn test_list_paginated() {
1240        let repo = setup_test_repo().await;
1241
1242        // Create multiple transactions
1243        for _ in 1..=10 {
1244            let random_id = Uuid::new_v4().to_string();
1245            let tx = create_test_transaction(&random_id);
1246            repo.create(tx).await.unwrap();
1247        }
1248
1249        // Test first page with 3 items per page
1250        let query = PaginationQuery {
1251            page: 1,
1252            per_page: 3,
1253        };
1254        let result = repo.list_paginated(query).await.unwrap();
1255        assert_eq!(result.items.len(), 3);
1256        assert!(result.total >= 10);
1257        assert_eq!(result.page, 1);
1258        assert_eq!(result.per_page, 3);
1259
1260        // Test empty page (beyond total items)
1261        let query = PaginationQuery {
1262            page: 1000,
1263            per_page: 3,
1264        };
1265        let result = repo.list_paginated(query).await.unwrap();
1266        assert_eq!(result.items.len(), 0);
1267    }
1268
1269    #[tokio::test]
1270    #[ignore = "Requires active Redis instance"]
1271    async fn test_find_by_relayer_id() {
1272        let repo = setup_test_repo().await;
1273        let random_id = Uuid::new_v4().to_string();
1274        let random_id2 = Uuid::new_v4().to_string();
1275        let random_id3 = Uuid::new_v4().to_string();
1276
1277        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1278        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1279        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1280
1281        repo.create(tx1).await.unwrap();
1282        repo.create(tx2).await.unwrap();
1283        repo.create(tx3).await.unwrap();
1284
1285        // Test finding transactions for relayer-1
1286        let query = PaginationQuery {
1287            page: 1,
1288            per_page: 10,
1289        };
1290        let result = repo
1291            .find_by_relayer_id("relayer-1", query.clone())
1292            .await
1293            .unwrap();
1294        assert!(result.total >= 2);
1295        assert!(result.items.len() >= 2);
1296        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1297
1298        // Test finding transactions for relayer-2
1299        let result = repo
1300            .find_by_relayer_id("relayer-2", query.clone())
1301            .await
1302            .unwrap();
1303        assert!(result.total >= 1);
1304        assert!(!result.items.is_empty());
1305        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1306
1307        // Test finding transactions for non-existent relayer
1308        let result = repo
1309            .find_by_relayer_id("non-existent", query.clone())
1310            .await
1311            .unwrap();
1312        assert_eq!(result.total, 0);
1313        assert_eq!(result.items.len(), 0);
1314    }
1315
1316    #[tokio::test]
1317    #[ignore = "Requires active Redis instance"]
1318    async fn test_find_by_status() {
1319        let repo = setup_test_repo().await;
1320        let random_id = Uuid::new_v4().to_string();
1321        let random_id2 = Uuid::new_v4().to_string();
1322        let random_id3 = Uuid::new_v4().to_string();
1323        let relayer_id = Uuid::new_v4().to_string();
1324        let tx1 = create_test_transaction_with_status(
1325            &random_id,
1326            &relayer_id,
1327            TransactionStatus::Pending,
1328        );
1329        let tx2 =
1330            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1331        let tx3 = create_test_transaction_with_status(
1332            &random_id3,
1333            &relayer_id,
1334            TransactionStatus::Confirmed,
1335        );
1336
1337        repo.create(tx1).await.unwrap();
1338        repo.create(tx2).await.unwrap();
1339        repo.create(tx3).await.unwrap();
1340
1341        // Test finding pending transactions
1342        let result = repo
1343            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1344            .await
1345            .unwrap();
1346        assert_eq!(result.len(), 1);
1347        assert_eq!(result[0].status, TransactionStatus::Pending);
1348
1349        // Test finding multiple statuses
1350        let result = repo
1351            .find_by_status(
1352                &relayer_id,
1353                &[TransactionStatus::Pending, TransactionStatus::Sent],
1354            )
1355            .await
1356            .unwrap();
1357        assert_eq!(result.len(), 2);
1358
1359        // Test finding non-existent status
1360        let result = repo
1361            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1362            .await
1363            .unwrap();
1364        assert_eq!(result.len(), 0);
1365    }
1366
1367    #[tokio::test]
1368    #[ignore = "Requires active Redis instance"]
1369    async fn test_find_by_nonce() {
1370        let repo = setup_test_repo().await;
1371        let random_id = Uuid::new_v4().to_string();
1372        let random_id2 = Uuid::new_v4().to_string();
1373        let relayer_id = Uuid::new_v4().to_string();
1374
1375        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1376        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1377
1378        repo.create(tx1.clone()).await.unwrap();
1379        repo.create(tx2).await.unwrap();
1380
1381        // Test finding existing nonce
1382        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1383        assert!(result.is_some());
1384        assert_eq!(result.unwrap().id, random_id);
1385
1386        // Test finding non-existent nonce
1387        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1388        assert!(result.is_none());
1389
1390        // Test finding nonce for non-existent relayer
1391        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1392        assert!(result.is_none());
1393    }
1394
1395    #[tokio::test]
1396    #[ignore = "Requires active Redis instance"]
1397    async fn test_update_status() {
1398        let repo = setup_test_repo().await;
1399        let random_id = Uuid::new_v4().to_string();
1400        let tx = create_test_transaction(&random_id);
1401
1402        repo.create(tx).await.unwrap();
1403        let updated = repo
1404            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1405            .await
1406            .unwrap();
1407        assert_eq!(updated.status, TransactionStatus::Confirmed);
1408    }
1409
1410    #[tokio::test]
1411    #[ignore = "Requires active Redis instance"]
1412    async fn test_partial_update() {
1413        let repo = setup_test_repo().await;
1414        let random_id = Uuid::new_v4().to_string();
1415        let tx = create_test_transaction(&random_id);
1416
1417        repo.create(tx).await.unwrap();
1418
1419        let update = TransactionUpdateRequest {
1420            status: Some(TransactionStatus::Sent),
1421            status_reason: Some("Transaction sent".to_string()),
1422            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1423            confirmed_at: None,
1424            network_data: None,
1425            hashes: None,
1426            is_canceled: None,
1427            priced_at: None,
1428            noop_count: None,
1429            delete_at: None,
1430        };
1431
1432        let updated = repo
1433            .partial_update(random_id.to_string(), update)
1434            .await
1435            .unwrap();
1436        assert_eq!(updated.status, TransactionStatus::Sent);
1437        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1438        assert_eq!(
1439            updated.sent_at,
1440            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1441        );
1442    }
1443
1444    #[tokio::test]
1445    #[ignore = "Requires active Redis instance"]
1446    async fn test_set_sent_at() {
1447        let repo = setup_test_repo().await;
1448        let random_id = Uuid::new_v4().to_string();
1449        let tx = create_test_transaction(&random_id);
1450
1451        repo.create(tx).await.unwrap();
1452        let updated = repo
1453            .set_sent_at(
1454                random_id.to_string(),
1455                "2025-01-27T16:00:00.000000+00:00".to_string(),
1456            )
1457            .await
1458            .unwrap();
1459        assert_eq!(
1460            updated.sent_at,
1461            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1462        );
1463    }
1464
1465    #[tokio::test]
1466    #[ignore = "Requires active Redis instance"]
1467    async fn test_set_confirmed_at() {
1468        let repo = setup_test_repo().await;
1469        let random_id = Uuid::new_v4().to_string();
1470        let tx = create_test_transaction(&random_id);
1471
1472        repo.create(tx).await.unwrap();
1473        let updated = repo
1474            .set_confirmed_at(
1475                random_id.to_string(),
1476                "2025-01-27T16:00:00.000000+00:00".to_string(),
1477            )
1478            .await
1479            .unwrap();
1480        assert_eq!(
1481            updated.confirmed_at,
1482            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1483        );
1484    }
1485
1486    #[tokio::test]
1487    #[ignore = "Requires active Redis instance"]
1488    async fn test_update_network_data() {
1489        let repo = setup_test_repo().await;
1490        let random_id = Uuid::new_v4().to_string();
1491        let tx = create_test_transaction(&random_id);
1492
1493        repo.create(tx).await.unwrap();
1494
1495        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1496            gas_price: Some(2000000000),
1497            gas_limit: Some(42000),
1498            nonce: Some(2),
1499            value: U256::from_str("2000000000000000000").unwrap(),
1500            data: Some("0x1234".to_string()),
1501            from: "0xNewSender".to_string(),
1502            to: Some("0xNewRecipient".to_string()),
1503            chain_id: 1,
1504            signature: None,
1505            hash: Some("0xnewhash".to_string()),
1506            speed: Some(Speed::SafeLow),
1507            max_fee_per_gas: None,
1508            max_priority_fee_per_gas: None,
1509            raw: None,
1510        });
1511
1512        let updated = repo
1513            .update_network_data(random_id.to_string(), new_network_data.clone())
1514            .await
1515            .unwrap();
1516        assert_eq!(
1517            updated
1518                .network_data
1519                .get_evm_transaction_data()
1520                .unwrap()
1521                .hash,
1522            new_network_data.get_evm_transaction_data().unwrap().hash
1523        );
1524    }
1525
1526    #[tokio::test]
1527    #[ignore = "Requires active Redis instance"]
1528    async fn test_debug_implementation() {
1529        let repo = setup_test_repo().await;
1530        let debug_str = format!("{:?}", repo);
1531        assert!(debug_str.contains("RedisTransactionRepository"));
1532        assert!(debug_str.contains("test_prefix"));
1533    }
1534
1535    #[tokio::test]
1536    #[ignore = "Requires active Redis instance"]
1537    async fn test_error_handling_empty_id() {
1538        let repo = setup_test_repo().await;
1539
1540        let result = repo.get_by_id("".to_string()).await;
1541        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1542
1543        let result = repo
1544            .update("".to_string(), create_test_transaction("test"))
1545            .await;
1546        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1547
1548        let result = repo.delete_by_id("".to_string()).await;
1549        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1550    }
1551
1552    #[tokio::test]
1553    #[ignore = "Requires active Redis instance"]
1554    async fn test_pagination_validation() {
1555        let repo = setup_test_repo().await;
1556
1557        let query = PaginationQuery {
1558            page: 1,
1559            per_page: 0,
1560        };
1561        let result = repo.list_paginated(query).await;
1562        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1563    }
1564
1565    #[tokio::test]
1566    #[ignore = "Requires active Redis instance"]
1567    async fn test_index_consistency() {
1568        let repo = setup_test_repo().await;
1569        let random_id = Uuid::new_v4().to_string();
1570        let relayer_id = Uuid::new_v4().to_string();
1571        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1572
1573        // Create transaction
1574        repo.create(tx.clone()).await.unwrap();
1575
1576        // Verify it can be found by nonce
1577        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1578        assert!(found.is_some());
1579
1580        // Update the transaction with a new nonce
1581        let mut updated_tx = tx.clone();
1582        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1583            evm_data.nonce = Some(43);
1584        }
1585
1586        repo.update(random_id.to_string(), updated_tx)
1587            .await
1588            .unwrap();
1589
1590        // Verify old nonce index is cleaned up
1591        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1592        assert!(old_nonce_result.is_none());
1593
1594        // Verify new nonce index works
1595        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1596        assert!(new_nonce_result.is_some());
1597    }
1598
1599    #[tokio::test]
1600    #[ignore = "Requires active Redis instance"]
1601    async fn test_has_entries() {
1602        let repo = setup_test_repo().await;
1603        assert!(!repo.has_entries().await.unwrap());
1604
1605        let tx_id = uuid::Uuid::new_v4().to_string();
1606        let tx = create_test_transaction(&tx_id);
1607        repo.create(tx.clone()).await.unwrap();
1608
1609        assert!(repo.has_entries().await.unwrap());
1610    }
1611
1612    #[tokio::test]
1613    #[ignore = "Requires active Redis instance"]
1614    async fn test_drop_all_entries() {
1615        let repo = setup_test_repo().await;
1616        let tx_id = uuid::Uuid::new_v4().to_string();
1617        let tx = create_test_transaction(&tx_id);
1618        repo.create(tx.clone()).await.unwrap();
1619        assert!(repo.has_entries().await.unwrap());
1620
1621        repo.drop_all_entries().await.unwrap();
1622        assert!(!repo.has_entries().await.unwrap());
1623    }
1624
1625    // Tests for delete_at field setting on final status updates
1626    #[tokio::test]
1627    #[ignore = "Requires active Redis instance"]
1628    async fn test_update_status_sets_delete_at_for_final_statuses() {
1629        let _lock = ENV_MUTEX.lock().await;
1630
1631        use chrono::{DateTime, Duration, Utc};
1632        use std::env;
1633
1634        // Use a unique test environment variable to avoid conflicts
1635        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1636
1637        let repo = setup_test_repo().await;
1638
1639        let final_statuses = [
1640            TransactionStatus::Canceled,
1641            TransactionStatus::Confirmed,
1642            TransactionStatus::Failed,
1643            TransactionStatus::Expired,
1644        ];
1645
1646        for (i, status) in final_statuses.iter().enumerate() {
1647            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1648            let mut tx = create_test_transaction(&tx_id);
1649
1650            // Ensure transaction has no delete_at initially and is in pending state
1651            tx.delete_at = None;
1652            tx.status = TransactionStatus::Pending;
1653
1654            repo.create(tx).await.unwrap();
1655
1656            let before_update = Utc::now();
1657
1658            // Update to final status
1659            let updated = repo
1660                .update_status(tx_id.clone(), status.clone())
1661                .await
1662                .unwrap();
1663
1664            // Should have delete_at set
1665            assert!(
1666                updated.delete_at.is_some(),
1667                "delete_at should be set for status: {:?}",
1668                status
1669            );
1670
1671            // Verify the timestamp is reasonable (approximately 6 hours from now)
1672            let delete_at_str = updated.delete_at.unwrap();
1673            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1674                .expect("delete_at should be valid RFC3339")
1675                .with_timezone(&Utc);
1676
1677            let duration_from_before = delete_at.signed_duration_since(before_update);
1678            let expected_duration = Duration::hours(6);
1679            let tolerance = Duration::minutes(5);
1680
1681            assert!(
1682                duration_from_before >= expected_duration - tolerance &&
1683                duration_from_before <= expected_duration + tolerance,
1684                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1685                status, duration_from_before
1686            );
1687        }
1688
1689        // Cleanup
1690        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1691    }
1692
1693    #[tokio::test]
1694    #[ignore = "Requires active Redis instance"]
1695    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1696        let _lock = ENV_MUTEX.lock().await;
1697
1698        use std::env;
1699
1700        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1701
1702        let repo = setup_test_repo().await;
1703
1704        let non_final_statuses = [
1705            TransactionStatus::Pending,
1706            TransactionStatus::Sent,
1707            TransactionStatus::Submitted,
1708            TransactionStatus::Mined,
1709        ];
1710
1711        for (i, status) in non_final_statuses.iter().enumerate() {
1712            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1713            let mut tx = create_test_transaction(&tx_id);
1714            tx.delete_at = None;
1715            tx.status = TransactionStatus::Pending;
1716
1717            repo.create(tx).await.unwrap();
1718
1719            // Update to non-final status
1720            let updated = repo
1721                .update_status(tx_id.clone(), status.clone())
1722                .await
1723                .unwrap();
1724
1725            // Should NOT have delete_at set
1726            assert!(
1727                updated.delete_at.is_none(),
1728                "delete_at should NOT be set for status: {:?}",
1729                status
1730            );
1731        }
1732
1733        // Cleanup
1734        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1735    }
1736
1737    #[tokio::test]
1738    #[ignore = "Requires active Redis instance"]
1739    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1740        let _lock = ENV_MUTEX.lock().await;
1741
1742        use chrono::{DateTime, Duration, Utc};
1743        use std::env;
1744
1745        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1746
1747        let repo = setup_test_repo().await;
1748        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1749        let mut tx = create_test_transaction(&tx_id);
1750        tx.delete_at = None;
1751        tx.status = TransactionStatus::Pending;
1752
1753        repo.create(tx).await.unwrap();
1754
1755        let before_update = Utc::now();
1756
1757        // Use partial_update to set status to Confirmed (final status)
1758        let update = TransactionUpdateRequest {
1759            status: Some(TransactionStatus::Confirmed),
1760            status_reason: Some("Transaction completed".to_string()),
1761            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1762            ..Default::default()
1763        };
1764
1765        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1766
1767        // Should have delete_at set
1768        assert!(
1769            updated.delete_at.is_some(),
1770            "delete_at should be set when updating to Confirmed status"
1771        );
1772
1773        // Verify the timestamp is reasonable (approximately 8 hours from now)
1774        let delete_at_str = updated.delete_at.unwrap();
1775        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1776            .expect("delete_at should be valid RFC3339")
1777            .with_timezone(&Utc);
1778
1779        let duration_from_before = delete_at.signed_duration_since(before_update);
1780        let expected_duration = Duration::hours(8);
1781        let tolerance = Duration::minutes(5);
1782
1783        assert!(
1784            duration_from_before >= expected_duration - tolerance
1785                && duration_from_before <= expected_duration + tolerance,
1786            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1787            duration_from_before
1788        );
1789
1790        // Also verify other fields were updated
1791        assert_eq!(updated.status, TransactionStatus::Confirmed);
1792        assert_eq!(
1793            updated.status_reason,
1794            Some("Transaction completed".to_string())
1795        );
1796        assert_eq!(
1797            updated.confirmed_at,
1798            Some("2023-01-01T12:05:00Z".to_string())
1799        );
1800
1801        // Cleanup
1802        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1803    }
1804
1805    #[tokio::test]
1806    #[ignore = "Requires active Redis instance"]
1807    async fn test_update_status_preserves_existing_delete_at() {
1808        let _lock = ENV_MUTEX.lock().await;
1809
1810        use std::env;
1811
1812        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1813
1814        let repo = setup_test_repo().await;
1815        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1816        let mut tx = create_test_transaction(&tx_id);
1817
1818        // Set an existing delete_at value
1819        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1820        tx.delete_at = Some(existing_delete_at.clone());
1821        tx.status = TransactionStatus::Pending;
1822
1823        repo.create(tx).await.unwrap();
1824
1825        // Update to final status
1826        let updated = repo
1827            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1828            .await
1829            .unwrap();
1830
1831        // Should preserve the existing delete_at value
1832        assert_eq!(
1833            updated.delete_at,
1834            Some(existing_delete_at),
1835            "Existing delete_at should be preserved when updating to final status"
1836        );
1837
1838        // Cleanup
1839        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1840    }
1841    #[tokio::test]
1842    #[ignore = "Requires active Redis instance"]
1843    async fn test_partial_update_without_status_change_preserves_delete_at() {
1844        let _lock = ENV_MUTEX.lock().await;
1845
1846        use std::env;
1847
1848        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1849
1850        let repo = setup_test_repo().await;
1851        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1852        let mut tx = create_test_transaction(&tx_id);
1853        tx.delete_at = None;
1854        tx.status = TransactionStatus::Pending;
1855
1856        repo.create(tx).await.unwrap();
1857
1858        // First, update to final status to set delete_at
1859        let updated1 = repo
1860            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1861            .await
1862            .unwrap();
1863
1864        assert!(updated1.delete_at.is_some());
1865        let original_delete_at = updated1.delete_at.clone();
1866
1867        // Now update other fields without changing status
1868        let update = TransactionUpdateRequest {
1869            status: None, // No status change
1870            status_reason: Some("Updated reason".to_string()),
1871            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1872            ..Default::default()
1873        };
1874
1875        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1876
1877        // delete_at should be preserved
1878        assert_eq!(
1879            updated2.delete_at, original_delete_at,
1880            "delete_at should be preserved when status is not updated"
1881        );
1882
1883        // Other fields should be updated
1884        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1885        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1886        assert_eq!(
1887            updated2.confirmed_at,
1888            Some("2023-01-01T12:10:00Z".to_string())
1889        );
1890
1891        // Cleanup
1892        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1893    }
1894}