openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{
31        DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32    },
33    repositories::{PaginatedResult, Repository},
34};
35use async_trait::async_trait;
36use redis::aio::ConnectionManager;
37use std::sync::Arc;
38
39#[async_trait]
40pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
41    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
42    async fn list_by_signer_id(
43        &self,
44        signer_id: &str,
45    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
46    async fn list_by_notification_id(
47        &self,
48        notification_id: &str,
49    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
50    async fn partial_update(
51        &self,
52        id: String,
53        update: UpdateRelayerRequest,
54    ) -> Result<RelayerRepoModel, RepositoryError>;
55    async fn enable_relayer(&self, relayer_id: String)
56        -> Result<RelayerRepoModel, RepositoryError>;
57    async fn disable_relayer(
58        &self,
59        relayer_id: String,
60        reason: DisabledReason,
61    ) -> Result<RelayerRepoModel, RepositoryError>;
62    async fn update_policy(
63        &self,
64        id: String,
65        policy: RelayerNetworkPolicy,
66    ) -> Result<RelayerRepoModel, RepositoryError>;
67}
68
69/// Enum wrapper for different relayer repository implementations
70#[derive(Debug, Clone)]
71pub enum RelayerRepositoryStorage {
72    InMemory(InMemoryRelayerRepository),
73    Redis(RedisRelayerRepository),
74}
75
76impl RelayerRepositoryStorage {
77    pub fn new_in_memory() -> Self {
78        Self::InMemory(InMemoryRelayerRepository::new())
79    }
80
81    pub fn new_redis(
82        connection_manager: Arc<ConnectionManager>,
83        key_prefix: String,
84    ) -> Result<Self, RepositoryError> {
85        Ok(Self::Redis(RedisRelayerRepository::new(
86            connection_manager,
87            key_prefix,
88        )?))
89    }
90}
91
92impl Default for RelayerRepositoryStorage {
93    fn default() -> Self {
94        Self::new_in_memory()
95    }
96}
97
98#[async_trait]
99impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
100    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
101        match self {
102            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
103            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
104        }
105    }
106
107    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
108        match self {
109            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
110            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
111        }
112    }
113
114    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
115        match self {
116            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
117            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
118        }
119    }
120
121    async fn list_paginated(
122        &self,
123        query: PaginationQuery,
124    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
125        match self {
126            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
127            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
128        }
129    }
130
131    async fn update(
132        &self,
133        id: String,
134        entity: RelayerRepoModel,
135    ) -> Result<RelayerRepoModel, RepositoryError> {
136        match self {
137            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
138            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
139        }
140    }
141
142    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
143        match self {
144            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
145            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
146        }
147    }
148
149    async fn count(&self) -> Result<usize, RepositoryError> {
150        match self {
151            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
152            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
153        }
154    }
155
156    async fn has_entries(&self) -> Result<bool, RepositoryError> {
157        match self {
158            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
159            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
160        }
161    }
162
163    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
164        match self {
165            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
166            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
167        }
168    }
169}
170
171#[async_trait]
172impl RelayerRepository for RelayerRepositoryStorage {
173    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
174        match self {
175            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
176            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
177        }
178    }
179
180    async fn list_by_signer_id(
181        &self,
182        signer_id: &str,
183    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
184        match self {
185            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
186            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
187        }
188    }
189
190    async fn list_by_notification_id(
191        &self,
192        notification_id: &str,
193    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
194        match self {
195            RelayerRepositoryStorage::InMemory(repo) => {
196                repo.list_by_notification_id(notification_id).await
197            }
198            RelayerRepositoryStorage::Redis(repo) => {
199                repo.list_by_notification_id(notification_id).await
200            }
201        }
202    }
203
204    async fn partial_update(
205        &self,
206        id: String,
207        update: UpdateRelayerRequest,
208    ) -> Result<RelayerRepoModel, RepositoryError> {
209        match self {
210            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
211            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
212        }
213    }
214
215    async fn enable_relayer(
216        &self,
217        relayer_id: String,
218    ) -> Result<RelayerRepoModel, RepositoryError> {
219        match self {
220            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
221            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
222        }
223    }
224
225    async fn disable_relayer(
226        &self,
227        relayer_id: String,
228        reason: DisabledReason,
229    ) -> Result<RelayerRepoModel, RepositoryError> {
230        match self {
231            RelayerRepositoryStorage::InMemory(repo) => {
232                repo.disable_relayer(relayer_id, reason).await
233            }
234            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
235        }
236    }
237
238    async fn update_policy(
239        &self,
240        id: String,
241        policy: RelayerNetworkPolicy,
242    ) -> Result<RelayerRepoModel, RepositoryError> {
243        match self {
244            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
245            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
246        }
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
254
255    fn create_test_relayer(id: String) -> RelayerRepoModel {
256        RelayerRepoModel {
257            id: id.clone(),
258            name: format!("Relayer {}", id.clone()),
259            network: "TestNet".to_string(),
260            paused: false,
261            network_type: NetworkType::Evm,
262            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
263                min_balance: Some(0),
264                gas_limit_estimation: Some(true),
265                gas_price_cap: None,
266                whitelist_receivers: None,
267                eip1559_pricing: Some(false),
268                private_transactions: Some(false),
269            }),
270            signer_id: "test".to_string(),
271            address: "0x".to_string(),
272            notification_id: None,
273            system_disabled: false,
274            custom_rpc_urls: None,
275            ..Default::default()
276        }
277    }
278
279    #[actix_web::test]
280    async fn test_in_memory_repository_impl() {
281        let impl_repo = RelayerRepositoryStorage::new_in_memory();
282        let relayer = create_test_relayer("test-relayer".to_string());
283
284        // Test create
285        let created = impl_repo.create(relayer.clone()).await.unwrap();
286        assert_eq!(created.id, relayer.id);
287
288        // Test get
289        let retrieved = impl_repo
290            .get_by_id("test-relayer".to_string())
291            .await
292            .unwrap();
293        assert_eq!(retrieved.id, relayer.id);
294
295        // Test list all
296        let all_relayers = impl_repo.list_all().await.unwrap();
297        assert!(!all_relayers.is_empty());
298
299        // Test count
300        let count = impl_repo.count().await.unwrap();
301        assert!(count >= 1);
302
303        // Test update
304        let mut updated_relayer = relayer.clone();
305        updated_relayer.name = "Updated Name".to_string();
306        let updated = impl_repo
307            .update(relayer.id.clone(), updated_relayer)
308            .await
309            .unwrap();
310        assert_eq!(updated.name, "Updated Name");
311
312        // Test delete
313        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
314        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
315        assert!(get_result.is_err());
316    }
317
318    #[actix_web::test]
319    async fn test_relayer_repository_trait_methods() {
320        let impl_repo = RelayerRepositoryStorage::new_in_memory();
321        let relayer = create_test_relayer("test-relayer".to_string());
322
323        // Create the relayer first
324        impl_repo.create(relayer.clone()).await.unwrap();
325
326        // Test list_active
327        let active_relayers = impl_repo.list_active().await.unwrap();
328        assert!(!active_relayers.is_empty());
329
330        // Test partial_update
331        let update = UpdateRelayerRequest {
332            paused: Some(true),
333            ..Default::default()
334        };
335        let updated = impl_repo
336            .partial_update(relayer.id.clone(), update)
337            .await
338            .unwrap();
339        assert!(updated.paused);
340
341        // Test enable/disable
342        let disabled = impl_repo
343            .disable_relayer(
344                relayer.id.clone(),
345                DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
346            )
347            .await
348            .unwrap();
349        assert!(disabled.system_disabled);
350        assert_eq!(
351            disabled.disabled_reason,
352            Some(DisabledReason::BalanceCheckFailed(
353                "Test disable reason".to_string()
354            ))
355        );
356
357        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
358        assert!(!enabled.system_disabled);
359        assert_eq!(enabled.disabled_reason, None);
360
361        // Test update_policy
362        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
363            min_balance: Some(1000000000000000000),
364            gas_limit_estimation: Some(true),
365            gas_price_cap: Some(50_000_000_000),
366            whitelist_receivers: None,
367            eip1559_pricing: Some(true),
368            private_transactions: Some(false),
369        });
370        let policy_updated = impl_repo
371            .update_policy(relayer.id.clone(), new_policy)
372            .await
373            .unwrap();
374
375        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
376            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
377            assert_eq!(evm_policy.eip1559_pricing, Some(true));
378        } else {
379            panic!("Expected EVM policy");
380        }
381    }
382
383    #[actix_web::test]
384    async fn test_create_repository_in_memory() {
385        let result = RelayerRepositoryStorage::new_in_memory();
386
387        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
388    }
389
390    #[actix_web::test]
391    async fn test_pagination() {
392        let impl_repo = RelayerRepositoryStorage::new_in_memory();
393        let relayer1 = create_test_relayer("test-relayer-1".to_string());
394        let relayer2 = create_test_relayer("test-relayer-2".to_string());
395
396        impl_repo.create(relayer1).await.unwrap();
397        impl_repo.create(relayer2).await.unwrap();
398
399        let query = PaginationQuery {
400            page: 1,
401            per_page: 10,
402        };
403
404        let result = impl_repo.list_paginated(query).await.unwrap();
405        assert!(result.total >= 2);
406        assert_eq!(result.page, 1);
407        assert_eq!(result.per_page, 10);
408    }
409
410    #[actix_web::test]
411    async fn test_delete_relayer() {
412        let impl_repo = RelayerRepositoryStorage::new_in_memory();
413        let relayer = create_test_relayer("delete-test".to_string());
414
415        // Create relayer
416        impl_repo.create(relayer.clone()).await.unwrap();
417
418        // Delete relayer
419        impl_repo
420            .delete_by_id("delete-test".to_string())
421            .await
422            .unwrap();
423
424        // Verify deletion
425        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
426        assert!(get_result.is_err());
427        assert!(matches!(
428            get_result.unwrap_err(),
429            RepositoryError::NotFound(_)
430        ));
431
432        // Test deleting non-existent relayer
433        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
434        assert!(delete_result.is_err());
435    }
436
437    #[actix_web::test]
438    async fn test_has_entries() {
439        let repo = InMemoryRelayerRepository::new();
440        assert!(!repo.has_entries().await.unwrap());
441
442        let relayer = create_test_relayer("test".to_string());
443
444        repo.create(relayer.clone()).await.unwrap();
445        assert!(repo.has_entries().await.unwrap());
446
447        repo.delete_by_id(relayer.id.clone()).await.unwrap();
448        assert!(!repo.has_entries().await.unwrap());
449    }
450
451    #[actix_web::test]
452    async fn test_drop_all_entries() {
453        let repo = InMemoryRelayerRepository::new();
454        let relayer = create_test_relayer("test".to_string());
455
456        repo.create(relayer.clone()).await.unwrap();
457        assert!(repo.has_entries().await.unwrap());
458
459        repo.drop_all_entries().await.unwrap();
460        assert!(!repo.has_entries().await.unwrap());
461    }
462}
463
464#[cfg(test)]
465mockall::mock! {
466    pub RelayerRepository {}
467
468    #[async_trait]
469    impl Repository<RelayerRepoModel, String> for RelayerRepository {
470        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
471        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
472        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
473        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
474        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
475        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
476        async fn count(&self) -> Result<usize, RepositoryError>;
477        async fn has_entries(&self) -> Result<bool, RepositoryError>;
478        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
479    }
480
481    #[async_trait]
482    impl RelayerRepository for RelayerRepository {
483        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
484        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
485        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
486        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
487        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
488        async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
489        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
490    }
491}