openzeppelin_relayer/repositories/plugin/
mod.rs

1//! Plugin Repository Module
2//!
3//! This module provides the plugin repository layer for the OpenZeppelin Relayer service.
4//! It implements a specialized repository pattern for managing plugin configurations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **Plugin Management**: Store and retrieve plugin configurations
10//! - **Path Resolution**: Manage plugin script paths for execution
11//! - **Duplicate Prevention**: Ensure unique plugin IDs
12//! - **Configuration Loading**: Convert from file configurations to repository models
13//!
14//! ## Repository Implementations
15//!
16//! - [`InMemoryPluginRepository`]: Fast in-memory storage for testing/development
17//! - [`RedisPluginRepository`]: Redis-backed storage for production environments
18//!
19//! ## Plugin System
20//!
21//! The plugin system allows extending the relayer functionality through external scripts.
22//! Each plugin is identified by a unique ID and contains a path to the executable script.
23//!
24
25pub mod plugin_in_memory;
26pub mod plugin_redis;
27
28pub use plugin_in_memory::*;
29pub use plugin_redis::*;
30
31use async_trait::async_trait;
32use redis::aio::ConnectionManager;
33use std::{sync::Arc, time::Duration};
34
35#[cfg(test)]
36use mockall::automock;
37
38use crate::{
39    config::PluginFileConfig,
40    constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
41    models::{PaginationQuery, PluginModel, RepositoryError},
42    repositories::{ConversionError, PaginatedResult},
43};
44
45#[async_trait]
46#[allow(dead_code)]
47#[cfg_attr(test, automock)]
48pub trait PluginRepositoryTrait {
49    async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError>;
50    async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError>;
51    async fn list_paginated(
52        &self,
53        query: PaginationQuery,
54    ) -> Result<PaginatedResult<PluginModel>, RepositoryError>;
55    async fn count(&self) -> Result<usize, RepositoryError>;
56    async fn has_entries(&self) -> Result<bool, RepositoryError>;
57    async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
58}
59
60/// Enum wrapper for different plugin repository implementations
61#[derive(Debug, Clone)]
62pub enum PluginRepositoryStorage {
63    InMemory(InMemoryPluginRepository),
64    Redis(RedisPluginRepository),
65}
66
67impl PluginRepositoryStorage {
68    pub fn new_in_memory() -> Self {
69        Self::InMemory(InMemoryPluginRepository::new())
70    }
71
72    pub fn new_redis(
73        connection_manager: Arc<ConnectionManager>,
74        key_prefix: String,
75    ) -> Result<Self, RepositoryError> {
76        let redis_repo = RedisPluginRepository::new(connection_manager, key_prefix)?;
77        Ok(Self::Redis(redis_repo))
78    }
79}
80
81#[async_trait]
82impl PluginRepositoryTrait for PluginRepositoryStorage {
83    async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
84        match self {
85            PluginRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
86            PluginRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
87        }
88    }
89
90    async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
91        match self {
92            PluginRepositoryStorage::InMemory(repo) => repo.add(plugin).await,
93            PluginRepositoryStorage::Redis(repo) => repo.add(plugin).await,
94        }
95    }
96
97    async fn list_paginated(
98        &self,
99        query: PaginationQuery,
100    ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
101        match self {
102            PluginRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
103            PluginRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
104        }
105    }
106
107    async fn count(&self) -> Result<usize, RepositoryError> {
108        match self {
109            PluginRepositoryStorage::InMemory(repo) => repo.count().await,
110            PluginRepositoryStorage::Redis(repo) => repo.count().await,
111        }
112    }
113
114    async fn has_entries(&self) -> Result<bool, RepositoryError> {
115        match self {
116            PluginRepositoryStorage::InMemory(repo) => repo.has_entries().await,
117            PluginRepositoryStorage::Redis(repo) => repo.has_entries().await,
118        }
119    }
120
121    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
122        match self {
123            PluginRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
124            PluginRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
125        }
126    }
127}
128
129impl TryFrom<PluginFileConfig> for PluginModel {
130    type Error = ConversionError;
131
132    fn try_from(config: PluginFileConfig) -> Result<Self, Self::Error> {
133        let timeout = Duration::from_secs(config.timeout.unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS));
134
135        Ok(PluginModel {
136            id: config.id.clone(),
137            path: config.path.clone(),
138            timeout,
139            emit_logs: config.emit_logs,
140            emit_traces: config.emit_traces,
141        })
142    }
143}
144
145impl PartialEq for PluginModel {
146    fn eq(&self, other: &Self) -> bool {
147        self.id == other.id && self.path == other.path
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use crate::{config::PluginFileConfig, constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS};
154    use std::time::Duration;
155
156    use super::*;
157
158    #[tokio::test]
159    async fn test_try_from() {
160        let plugin = PluginFileConfig {
161            id: "test-plugin".to_string(),
162            path: "test-path".to_string(),
163            timeout: None,
164            emit_logs: false,
165            emit_traces: false,
166        };
167        let result = PluginModel::try_from(plugin);
168        assert!(result.is_ok());
169        assert_eq!(
170            result.unwrap(),
171            PluginModel {
172                id: "test-plugin".to_string(),
173                path: "test-path".to_string(),
174                timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
175                emit_logs: false,
176                emit_traces: false,
177            }
178        );
179    }
180
181    // Helper function to create a test plugin
182    fn create_test_plugin(id: &str, path: &str) -> PluginModel {
183        PluginModel {
184            id: id.to_string(),
185            path: path.to_string(),
186            timeout: Duration::from_secs(30),
187            emit_logs: false,
188            emit_traces: false,
189        }
190    }
191
192    #[tokio::test]
193    async fn test_plugin_repository_storage_get_by_id_existing() {
194        let storage = PluginRepositoryStorage::new_in_memory();
195        let plugin = create_test_plugin("test-plugin", "/path/to/script.js");
196
197        // Add the plugin first
198        storage.add(plugin.clone()).await.unwrap();
199
200        // Get the plugin
201        let result = storage.get_by_id("test-plugin").await.unwrap();
202        assert_eq!(result, Some(plugin));
203    }
204
205    #[tokio::test]
206    async fn test_plugin_repository_storage_get_by_id_non_existing() {
207        let storage = PluginRepositoryStorage::new_in_memory();
208
209        let result = storage.get_by_id("non-existent").await.unwrap();
210        assert_eq!(result, None);
211    }
212
213    #[tokio::test]
214    async fn test_plugin_repository_storage_add_success() {
215        let storage = PluginRepositoryStorage::new_in_memory();
216        let plugin = create_test_plugin("test-plugin", "/path/to/script.js");
217
218        let result = storage.add(plugin).await;
219        assert!(result.is_ok());
220    }
221
222    #[tokio::test]
223    async fn test_plugin_repository_storage_add_duplicate() {
224        let storage = PluginRepositoryStorage::new_in_memory();
225        let plugin = create_test_plugin("test-plugin", "/path/to/script.js");
226
227        // Add the plugin first time
228        storage.add(plugin.clone()).await.unwrap();
229
230        // Try to add the same plugin again - should succeed (overwrite)
231        let result = storage.add(plugin).await;
232        assert!(result.is_ok());
233    }
234
235    #[tokio::test]
236    async fn test_plugin_repository_storage_count_empty() {
237        let storage = PluginRepositoryStorage::new_in_memory();
238
239        let count = storage.count().await.unwrap();
240        assert_eq!(count, 0);
241    }
242
243    #[tokio::test]
244    async fn test_plugin_repository_storage_count_with_plugins() {
245        let storage = PluginRepositoryStorage::new_in_memory();
246
247        // Add multiple plugins
248        storage
249            .add(create_test_plugin("plugin1", "/path/1.js"))
250            .await
251            .unwrap();
252        storage
253            .add(create_test_plugin("plugin2", "/path/2.js"))
254            .await
255            .unwrap();
256        storage
257            .add(create_test_plugin("plugin3", "/path/3.js"))
258            .await
259            .unwrap();
260
261        let count = storage.count().await.unwrap();
262        assert_eq!(count, 3);
263    }
264
265    #[tokio::test]
266    async fn test_plugin_repository_storage_has_entries_empty() {
267        let storage = PluginRepositoryStorage::new_in_memory();
268
269        let has_entries = storage.has_entries().await.unwrap();
270        assert!(!has_entries);
271    }
272
273    #[tokio::test]
274    async fn test_plugin_repository_storage_has_entries_with_plugins() {
275        let storage = PluginRepositoryStorage::new_in_memory();
276
277        storage
278            .add(create_test_plugin("plugin1", "/path/1.js"))
279            .await
280            .unwrap();
281
282        let has_entries = storage.has_entries().await.unwrap();
283        assert!(has_entries);
284    }
285
286    #[tokio::test]
287    async fn test_plugin_repository_storage_drop_all_entries_empty() {
288        let storage = PluginRepositoryStorage::new_in_memory();
289
290        let result = storage.drop_all_entries().await;
291        assert!(result.is_ok());
292
293        let count = storage.count().await.unwrap();
294        assert_eq!(count, 0);
295    }
296
297    #[tokio::test]
298    async fn test_plugin_repository_storage_drop_all_entries_with_plugins() {
299        let storage = PluginRepositoryStorage::new_in_memory();
300
301        // Add multiple plugins
302        storage
303            .add(create_test_plugin("plugin1", "/path/1.js"))
304            .await
305            .unwrap();
306        storage
307            .add(create_test_plugin("plugin2", "/path/2.js"))
308            .await
309            .unwrap();
310
311        let result = storage.drop_all_entries().await;
312        assert!(result.is_ok());
313
314        let count = storage.count().await.unwrap();
315        assert_eq!(count, 0);
316
317        let has_entries = storage.has_entries().await.unwrap();
318        assert!(!has_entries);
319    }
320
321    #[tokio::test]
322    async fn test_plugin_repository_storage_list_paginated_empty() {
323        let storage = PluginRepositoryStorage::new_in_memory();
324
325        let query = PaginationQuery {
326            page: 1,
327            per_page: 10,
328        };
329        let result = storage.list_paginated(query).await.unwrap();
330
331        assert_eq!(result.items.len(), 0);
332        assert_eq!(result.total, 0);
333        assert_eq!(result.page, 1);
334        assert_eq!(result.per_page, 10);
335    }
336
337    #[tokio::test]
338    async fn test_plugin_repository_storage_list_paginated_with_plugins() {
339        let storage = PluginRepositoryStorage::new_in_memory();
340
341        // Add multiple plugins
342        storage
343            .add(create_test_plugin("plugin1", "/path/1.js"))
344            .await
345            .unwrap();
346        storage
347            .add(create_test_plugin("plugin2", "/path/2.js"))
348            .await
349            .unwrap();
350        storage
351            .add(create_test_plugin("plugin3", "/path/3.js"))
352            .await
353            .unwrap();
354
355        let query = PaginationQuery {
356            page: 1,
357            per_page: 2,
358        };
359        let result = storage.list_paginated(query).await.unwrap();
360
361        assert_eq!(result.items.len(), 2);
362        assert_eq!(result.total, 3);
363        assert_eq!(result.page, 1);
364        assert_eq!(result.per_page, 2);
365    }
366
367    #[tokio::test]
368    async fn test_plugin_repository_storage_workflow() {
369        let storage = PluginRepositoryStorage::new_in_memory();
370
371        // Initially empty
372        assert!(!storage.has_entries().await.unwrap());
373        assert_eq!(storage.count().await.unwrap(), 0);
374
375        // Add plugins
376        let plugin1 = create_test_plugin("auth-plugin", "/scripts/auth.js");
377        let plugin2 = create_test_plugin("email-plugin", "/scripts/email.js");
378
379        storage.add(plugin1.clone()).await.unwrap();
380        storage.add(plugin2.clone()).await.unwrap();
381
382        // Check state
383        assert!(storage.has_entries().await.unwrap());
384        assert_eq!(storage.count().await.unwrap(), 2);
385
386        // Retrieve specific plugin
387        let retrieved = storage.get_by_id("auth-plugin").await.unwrap();
388        assert_eq!(retrieved, Some(plugin1));
389
390        // List all plugins
391        let query = PaginationQuery {
392            page: 1,
393            per_page: 10,
394        };
395        let result = storage.list_paginated(query).await.unwrap();
396        assert_eq!(result.items.len(), 2);
397        assert_eq!(result.total, 2);
398
399        // Clear all plugins
400        storage.drop_all_entries().await.unwrap();
401        assert!(!storage.has_entries().await.unwrap());
402        assert_eq!(storage.count().await.unwrap(), 0);
403    }
404}