openzeppelin_relayer/repositories/plugin/
plugin_redis.rs

1//! Redis-backed implementation of the PluginRepository.
2
3use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use async_trait::async_trait;
7use redis::aio::ConnectionManager;
8use redis::AsyncCommands;
9use std::fmt;
10use std::sync::Arc;
11use tracing::{debug, error, warn};
12
13const PLUGIN_PREFIX: &str = "plugin";
14const PLUGIN_LIST_KEY: &str = "plugin_list";
15
16#[derive(Clone)]
17pub struct RedisPluginRepository {
18    pub client: Arc<ConnectionManager>,
19    pub key_prefix: String,
20}
21
22impl RedisRepository for RedisPluginRepository {}
23
24impl RedisPluginRepository {
25    pub fn new(
26        connection_manager: Arc<ConnectionManager>,
27        key_prefix: String,
28    ) -> Result<Self, RepositoryError> {
29        if key_prefix.is_empty() {
30            return Err(RepositoryError::InvalidData(
31                "Redis key prefix cannot be empty".to_string(),
32            ));
33        }
34
35        Ok(Self {
36            client: connection_manager,
37            key_prefix,
38        })
39    }
40
41    /// Generate key for plugin data: plugin:{plugin_id}
42    fn plugin_key(&self, plugin_id: &str) -> String {
43        format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
44    }
45
46    /// Generate key for plugin list: plugin_list (paginated list of plugin IDs)
47    fn plugin_list_key(&self) -> String {
48        format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
49    }
50
51    /// Get plugin by ID using an existing connection.
52    /// This method is useful to prevent creating new connections for
53    /// getting individual plugins on list operations.
54    ///
55    /// # Arguments
56    ///
57    /// * `id` - The ID of the plugin to get.
58    /// * `conn` - The connection to use.
59    async fn get_by_id_with_connection(
60        &self,
61        id: &str,
62        conn: &mut ConnectionManager,
63    ) -> Result<Option<PluginModel>, RepositoryError> {
64        if id.is_empty() {
65            return Err(RepositoryError::InvalidData(
66                "Plugin ID cannot be empty".to_string(),
67            ));
68        }
69        let key = self.plugin_key(id);
70
71        debug!(plugin_id = %id, "fetching plugin data");
72
73        let json: Option<String> = conn
74            .get(&key)
75            .await
76            .map_err(|e| self.map_redis_error(e, &format!("get_plugin_by_id_{id}")))?;
77
78        match json {
79            Some(json) => {
80                debug!(plugin_id = %id, "found plugin data");
81                let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
82                Ok(Some(plugin))
83            }
84            None => {
85                debug!(plugin_id = %id, "no plugin found");
86                Ok(None)
87            }
88        }
89    }
90
91    async fn get_by_ids(
92        &self,
93        ids: &[String],
94    ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
95        if ids.is_empty() {
96            debug!("no plugin IDs provided for batch fetch");
97            return Ok(BatchRetrievalResult {
98                results: vec![],
99                failed_ids: vec![],
100            });
101        }
102
103        let mut conn = self.client.as_ref().clone();
104        let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
105
106        let values: Vec<Option<String>> = conn
107            .mget(&keys)
108            .await
109            .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
110
111        let mut plugins = Vec::new();
112        let mut failed_count = 0;
113        let mut failed_ids = Vec::new();
114        for (i, value) in values.into_iter().enumerate() {
115            match value {
116                Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
117                    Ok(plugin) => plugins.push(plugin),
118                    Err(e) => {
119                        failed_count += 1;
120                        error!(plugin_id = %ids[i], error = %e, "failed to deserialize plugin");
121                        failed_ids.push(ids[i].clone());
122                    }
123                },
124                None => {
125                    warn!(plugin_id = %ids[i], "plugin not found in batch fetch");
126                }
127            }
128        }
129
130        if failed_count > 0 {
131            warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize plugins in batch");
132        }
133
134        Ok(BatchRetrievalResult {
135            results: plugins,
136            failed_ids,
137        })
138    }
139}
140
141impl fmt::Debug for RedisPluginRepository {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        f.debug_struct("RedisPluginRepository")
144            .field("client", &"<ConnectionManager>")
145            .field("key_prefix", &self.key_prefix)
146            .finish()
147    }
148}
149
150#[async_trait]
151impl PluginRepositoryTrait for RedisPluginRepository {
152    async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
153        let mut conn = self.client.as_ref().clone();
154        self.get_by_id_with_connection(id, &mut conn).await
155    }
156
157    async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
158        if plugin.id.is_empty() {
159            return Err(RepositoryError::InvalidData(
160                "Plugin ID cannot be empty".to_string(),
161            ));
162        }
163
164        if plugin.path.is_empty() {
165            return Err(RepositoryError::InvalidData(
166                "Plugin path cannot be empty".to_string(),
167            ));
168        }
169
170        let mut conn = self.client.as_ref().clone();
171        let key = self.plugin_key(&plugin.id);
172        let list_key = self.plugin_list_key();
173
174        debug!(plugin_id = %plugin.id, "adding plugin");
175
176        // Check if plugin already exists
177        let exists: bool = conn
178            .exists(&key)
179            .await
180            .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
181
182        if exists {
183            return Err(RepositoryError::ConstraintViolation(format!(
184                "Plugin with ID {} already exists",
185                plugin.id
186            )));
187        }
188
189        // Serialize plugin
190        let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
191
192        // Use a pipeline to ensure atomicity
193        let mut pipe = redis::pipe();
194        pipe.atomic();
195        pipe.set(&key, &json);
196        pipe.sadd(&list_key, &plugin.id);
197
198        pipe.exec_async(&mut conn).await.map_err(|e| {
199            error!(plugin_id = %plugin.id, error = %e, "failed to add plugin");
200            self.map_redis_error(e, &format!("add_plugin_{}", plugin.id))
201        })?;
202
203        debug!(plugin_id = %plugin.id, "successfully added plugin");
204        Ok(())
205    }
206
207    async fn list_paginated(
208        &self,
209        query: PaginationQuery,
210    ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
211        if query.page == 0 {
212            return Err(RepositoryError::InvalidData(
213                "Page number must be greater than 0".to_string(),
214            ));
215        }
216
217        if query.per_page == 0 {
218            return Err(RepositoryError::InvalidData(
219                "Per page count must be greater than 0".to_string(),
220            ));
221        }
222
223        let mut conn = self.client.as_ref().clone();
224        let plugin_list_key = self.plugin_list_key();
225
226        // Get total count
227        let total: u64 = conn
228            .scard(&plugin_list_key)
229            .await
230            .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
231
232        if total == 0 {
233            return Ok(PaginatedResult {
234                items: vec![],
235                total: 0,
236                page: query.page,
237                per_page: query.per_page,
238            });
239        }
240
241        // Get all IDs and paginate in memory
242        let all_ids: Vec<String> = conn
243            .smembers(&plugin_list_key)
244            .await
245            .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
246
247        let start = ((query.page - 1) * query.per_page) as usize;
248        let end = (start + query.per_page as usize).min(all_ids.len());
249
250        let ids_to_query = &all_ids[start..end];
251        let items = self.get_by_ids(ids_to_query).await?;
252
253        Ok(PaginatedResult {
254            items: items.results.clone(),
255            total,
256            page: query.page,
257            per_page: query.per_page,
258        })
259    }
260
261    async fn count(&self) -> Result<usize, RepositoryError> {
262        let mut conn = self.client.as_ref().clone();
263        let plugin_list_key = self.plugin_list_key();
264
265        let count: u64 = conn
266            .scard(&plugin_list_key)
267            .await
268            .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
269
270        Ok(count as usize)
271    }
272
273    async fn has_entries(&self) -> Result<bool, RepositoryError> {
274        let mut conn = self.client.as_ref().clone();
275        let plugin_list_key = self.plugin_list_key();
276
277        debug!("checking if plugin entries exist");
278
279        let exists: bool = conn
280            .exists(&plugin_list_key)
281            .await
282            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
283
284        debug!(exists = %exists, "plugin entries exist");
285        Ok(exists)
286    }
287
288    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
289        let mut conn = self.client.as_ref().clone();
290        let plugin_list_key = self.plugin_list_key();
291
292        debug!("dropping all plugin entries");
293
294        // Get all plugin IDs first
295        let plugin_ids: Vec<String> = conn
296            .smembers(&plugin_list_key)
297            .await
298            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
299
300        if plugin_ids.is_empty() {
301            debug!("no plugin entries to drop");
302            return Ok(());
303        }
304
305        // Use pipeline for atomic operations
306        let mut pipe = redis::pipe();
307        pipe.atomic();
308
309        // Delete all individual plugin entries
310        for plugin_id in &plugin_ids {
311            let plugin_key = self.plugin_key(plugin_id);
312            pipe.del(&plugin_key);
313        }
314
315        // Delete the plugin list key
316        pipe.del(&plugin_list_key);
317
318        pipe.exec_async(&mut conn)
319            .await
320            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
321
322        debug!(count = %plugin_ids.len(), "dropped plugin entries");
323        Ok(())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
331    use crate::models::PluginModel;
332    use std::{sync::Arc, time::Duration};
333
334    fn create_test_plugin(id: &str, path: &str) -> PluginModel {
335        PluginModel {
336            id: id.to_string(),
337            path: path.to_string(),
338            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
339            emit_logs: false,
340            emit_traces: false,
341        }
342    }
343
344    async fn setup_test_repo() -> RedisPluginRepository {
345        let redis_url =
346            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
347        let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
348        let mut connection_manager = ConnectionManager::new(client)
349            .await
350            .expect("Failed to create Redis connection manager");
351
352        // Clear the plugin lists
353        connection_manager
354            .del::<&str, ()>("test_plugin:plugin_list")
355            .await
356            .unwrap();
357
358        RedisPluginRepository::new(Arc::new(connection_manager), "test_plugin".to_string())
359            .expect("Failed to create Redis plugin repository")
360    }
361
362    #[tokio::test]
363    #[ignore = "Requires active Redis instance"]
364    async fn test_new_repository_creation() {
365        let repo = setup_test_repo().await;
366        assert_eq!(repo.key_prefix, "test_plugin");
367    }
368
369    #[tokio::test]
370    #[ignore = "Requires active Redis instance"]
371    async fn test_new_repository_empty_prefix_fails() {
372        let client =
373            redis::Client::open("redis://127.0.0.1:6379/").expect("Failed to create Redis client");
374        let connection_manager = redis::aio::ConnectionManager::new(client)
375            .await
376            .expect("Failed to create Redis connection manager");
377
378        let result = RedisPluginRepository::new(Arc::new(connection_manager), "".to_string());
379        assert!(result.is_err());
380        assert!(result
381            .unwrap_err()
382            .to_string()
383            .contains("key prefix cannot be empty"));
384    }
385
386    #[tokio::test]
387    #[ignore = "Requires active Redis instance"]
388    async fn test_key_generation() {
389        let repo = setup_test_repo().await;
390
391        let plugin_key = repo.plugin_key("test-plugin");
392        assert_eq!(plugin_key, "test_plugin:plugin:test-plugin");
393
394        let list_key = repo.plugin_list_key();
395        assert_eq!(list_key, "test_plugin:plugin_list");
396    }
397
398    #[tokio::test]
399    #[ignore = "Requires active Redis instance"]
400    async fn test_serialize_deserialize_plugin() {
401        let repo = setup_test_repo().await;
402        let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
403
404        let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
405        let deserialized: PluginModel = repo
406            .deserialize_entity(&json, &plugin.id, "plugin")
407            .unwrap();
408
409        assert_eq!(plugin.id, deserialized.id);
410        assert_eq!(plugin.path, deserialized.path);
411    }
412
413    #[tokio::test]
414    #[ignore = "Requires active Redis instance"]
415    async fn test_add_plugin() {
416        let repo = setup_test_repo().await;
417        let plugin_name = uuid::Uuid::new_v4().to_string();
418        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
419
420        let result = repo.add(plugin).await;
421        assert!(result.is_ok());
422    }
423
424    #[tokio::test]
425    #[ignore = "Requires active Redis instance"]
426    async fn test_get_plugin() {
427        let repo = setup_test_repo().await;
428        let plugin_name = uuid::Uuid::new_v4().to_string();
429        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
430
431        // Add the plugin first
432        repo.add(plugin.clone()).await.unwrap();
433
434        // Get the plugin
435        let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
436        assert!(retrieved.is_some());
437        let retrieved = retrieved.unwrap();
438        assert_eq!(retrieved.id, plugin.id);
439        assert_eq!(retrieved.path, plugin.path);
440    }
441
442    #[tokio::test]
443    #[ignore = "Requires active Redis instance"]
444    async fn test_get_nonexistent_plugin() {
445        let repo = setup_test_repo().await;
446
447        let result = repo.get_by_id("nonexistent-plugin").await;
448        assert!(matches!(result, Ok(None)));
449    }
450
451    #[tokio::test]
452    #[ignore = "Requires active Redis instance"]
453    async fn test_duplicate_plugin_addition() {
454        let repo = setup_test_repo().await;
455        let plugin_name = uuid::Uuid::new_v4().to_string();
456        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
457
458        // Add the plugin first time
459        repo.add(plugin.clone()).await.unwrap();
460
461        // Try to add the same plugin again
462        let result = repo.add(plugin).await;
463        assert!(result.is_err());
464
465        if let Err(RepositoryError::ConstraintViolation(msg)) = result {
466            assert!(msg.contains("already exists"));
467        } else {
468            panic!("Expected ConstraintViolation error");
469        }
470    }
471
472    #[tokio::test]
473    #[ignore = "Requires active Redis instance"]
474    async fn test_debug_implementation() {
475        let repo = setup_test_repo().await;
476        let debug_str = format!("{:?}", repo);
477        assert!(debug_str.contains("RedisPluginRepository"));
478        assert!(debug_str.contains("test_plugin"));
479    }
480
481    #[tokio::test]
482    #[ignore = "Requires active Redis instance"]
483    async fn test_error_handling_empty_id() {
484        let repo = setup_test_repo().await;
485
486        let result = repo.get_by_id("").await;
487        assert!(result.is_err());
488        assert!(result
489            .unwrap_err()
490            .to_string()
491            .contains("ID cannot be empty"));
492    }
493
494    #[tokio::test]
495    #[ignore = "Requires active Redis instance"]
496    async fn test_add_plugin_with_empty_id() {
497        let repo = setup_test_repo().await;
498        let plugin = create_test_plugin("", "/path/to/plugin");
499
500        let result = repo.add(plugin).await;
501        assert!(result.is_err());
502        assert!(result
503            .unwrap_err()
504            .to_string()
505            .contains("ID cannot be empty"));
506    }
507
508    #[tokio::test]
509    #[ignore = "Requires active Redis instance"]
510    async fn test_add_plugin_with_empty_path() {
511        let repo = setup_test_repo().await;
512        let plugin = create_test_plugin("test-plugin", "");
513
514        let result = repo.add(plugin).await;
515        assert!(result.is_err());
516        assert!(result
517            .unwrap_err()
518            .to_string()
519            .contains("path cannot be empty"));
520    }
521
522    #[tokio::test]
523    #[ignore = "Requires active Redis instance"]
524    async fn test_get_by_ids_plugins() {
525        let repo = setup_test_repo().await;
526        let plugin_name1 = uuid::Uuid::new_v4().to_string();
527        let plugin_name2 = uuid::Uuid::new_v4().to_string();
528        let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
529        let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
530
531        repo.add(plugin1.clone()).await.unwrap();
532        repo.add(plugin2.clone()).await.unwrap();
533
534        let retrieved = repo
535            .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
536            .await
537            .unwrap();
538        assert!(retrieved.results.len() == 2);
539        assert_eq!(retrieved.results[0].id, plugin2.id);
540        assert_eq!(retrieved.results[1].id, plugin1.id);
541        assert_eq!(retrieved.failed_ids.len(), 0);
542    }
543
544    #[tokio::test]
545    #[ignore = "Requires active Redis instance"]
546    async fn test_list_paginated_plugins() {
547        let repo = setup_test_repo().await;
548
549        let plugin_id1 = uuid::Uuid::new_v4().to_string();
550        let plugin_id2 = uuid::Uuid::new_v4().to_string();
551        let plugin_id3 = uuid::Uuid::new_v4().to_string();
552        let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
553        let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
554        let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
555
556        repo.add(plugin1.clone()).await.unwrap();
557        repo.add(plugin2.clone()).await.unwrap();
558        repo.add(plugin3.clone()).await.unwrap();
559
560        let query = PaginationQuery {
561            page: 1,
562            per_page: 2,
563        };
564
565        let result = repo.list_paginated(query).await;
566        assert!(result.is_ok());
567        let result = result.unwrap();
568        assert!(result.items.len() == 2);
569    }
570
571    #[tokio::test]
572    #[ignore = "Requires active Redis instance"]
573    async fn test_has_entries() {
574        let repo = setup_test_repo().await;
575        assert!(!repo.has_entries().await.unwrap());
576        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
577            .await
578            .unwrap();
579        assert!(repo.has_entries().await.unwrap());
580        repo.drop_all_entries().await.unwrap();
581        assert!(!repo.has_entries().await.unwrap());
582    }
583
584    #[tokio::test]
585    #[ignore = "Requires active Redis instance"]
586    async fn test_drop_all_entries() {
587        let repo = setup_test_repo().await;
588        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
589            .await
590            .unwrap();
591        assert!(repo.has_entries().await.unwrap());
592        repo.drop_all_entries().await.unwrap();
593        assert!(!repo.has_entries().await.unwrap());
594    }
595}