openzeppelin_relayer/repositories/plugin/
plugin_redis.rs1use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use async_trait::async_trait;
7use redis::aio::ConnectionManager;
8use redis::AsyncCommands;
9use std::fmt;
10use std::sync::Arc;
11use tracing::{debug, error, warn};
12
13const PLUGIN_PREFIX: &str = "plugin";
14const PLUGIN_LIST_KEY: &str = "plugin_list";
15
16#[derive(Clone)]
17pub struct RedisPluginRepository {
18 pub client: Arc<ConnectionManager>,
19 pub key_prefix: String,
20}
21
22impl RedisRepository for RedisPluginRepository {}
23
24impl RedisPluginRepository {
25 pub fn new(
26 connection_manager: Arc<ConnectionManager>,
27 key_prefix: String,
28 ) -> Result<Self, RepositoryError> {
29 if key_prefix.is_empty() {
30 return Err(RepositoryError::InvalidData(
31 "Redis key prefix cannot be empty".to_string(),
32 ));
33 }
34
35 Ok(Self {
36 client: connection_manager,
37 key_prefix,
38 })
39 }
40
41 fn plugin_key(&self, plugin_id: &str) -> String {
43 format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
44 }
45
46 fn plugin_list_key(&self) -> String {
48 format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
49 }
50
51 async fn get_by_id_with_connection(
60 &self,
61 id: &str,
62 conn: &mut ConnectionManager,
63 ) -> Result<Option<PluginModel>, RepositoryError> {
64 if id.is_empty() {
65 return Err(RepositoryError::InvalidData(
66 "Plugin ID cannot be empty".to_string(),
67 ));
68 }
69 let key = self.plugin_key(id);
70
71 debug!(plugin_id = %id, "fetching plugin data");
72
73 let json: Option<String> = conn
74 .get(&key)
75 .await
76 .map_err(|e| self.map_redis_error(e, &format!("get_plugin_by_id_{id}")))?;
77
78 match json {
79 Some(json) => {
80 debug!(plugin_id = %id, "found plugin data");
81 let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
82 Ok(Some(plugin))
83 }
84 None => {
85 debug!(plugin_id = %id, "no plugin found");
86 Ok(None)
87 }
88 }
89 }
90
91 async fn get_by_ids(
92 &self,
93 ids: &[String],
94 ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
95 if ids.is_empty() {
96 debug!("no plugin IDs provided for batch fetch");
97 return Ok(BatchRetrievalResult {
98 results: vec![],
99 failed_ids: vec![],
100 });
101 }
102
103 let mut conn = self.client.as_ref().clone();
104 let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
105
106 let values: Vec<Option<String>> = conn
107 .mget(&keys)
108 .await
109 .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
110
111 let mut plugins = Vec::new();
112 let mut failed_count = 0;
113 let mut failed_ids = Vec::new();
114 for (i, value) in values.into_iter().enumerate() {
115 match value {
116 Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
117 Ok(plugin) => plugins.push(plugin),
118 Err(e) => {
119 failed_count += 1;
120 error!(plugin_id = %ids[i], error = %e, "failed to deserialize plugin");
121 failed_ids.push(ids[i].clone());
122 }
123 },
124 None => {
125 warn!(plugin_id = %ids[i], "plugin not found in batch fetch");
126 }
127 }
128 }
129
130 if failed_count > 0 {
131 warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize plugins in batch");
132 }
133
134 Ok(BatchRetrievalResult {
135 results: plugins,
136 failed_ids,
137 })
138 }
139}
140
141impl fmt::Debug for RedisPluginRepository {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.debug_struct("RedisPluginRepository")
144 .field("client", &"<ConnectionManager>")
145 .field("key_prefix", &self.key_prefix)
146 .finish()
147 }
148}
149
150#[async_trait]
151impl PluginRepositoryTrait for RedisPluginRepository {
152 async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
153 let mut conn = self.client.as_ref().clone();
154 self.get_by_id_with_connection(id, &mut conn).await
155 }
156
157 async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
158 if plugin.id.is_empty() {
159 return Err(RepositoryError::InvalidData(
160 "Plugin ID cannot be empty".to_string(),
161 ));
162 }
163
164 if plugin.path.is_empty() {
165 return Err(RepositoryError::InvalidData(
166 "Plugin path cannot be empty".to_string(),
167 ));
168 }
169
170 let mut conn = self.client.as_ref().clone();
171 let key = self.plugin_key(&plugin.id);
172 let list_key = self.plugin_list_key();
173
174 debug!(plugin_id = %plugin.id, "adding plugin");
175
176 let exists: bool = conn
178 .exists(&key)
179 .await
180 .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
181
182 if exists {
183 return Err(RepositoryError::ConstraintViolation(format!(
184 "Plugin with ID {} already exists",
185 plugin.id
186 )));
187 }
188
189 let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
191
192 let mut pipe = redis::pipe();
194 pipe.atomic();
195 pipe.set(&key, &json);
196 pipe.sadd(&list_key, &plugin.id);
197
198 pipe.exec_async(&mut conn).await.map_err(|e| {
199 error!(plugin_id = %plugin.id, error = %e, "failed to add plugin");
200 self.map_redis_error(e, &format!("add_plugin_{}", plugin.id))
201 })?;
202
203 debug!(plugin_id = %plugin.id, "successfully added plugin");
204 Ok(())
205 }
206
207 async fn list_paginated(
208 &self,
209 query: PaginationQuery,
210 ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
211 if query.page == 0 {
212 return Err(RepositoryError::InvalidData(
213 "Page number must be greater than 0".to_string(),
214 ));
215 }
216
217 if query.per_page == 0 {
218 return Err(RepositoryError::InvalidData(
219 "Per page count must be greater than 0".to_string(),
220 ));
221 }
222
223 let mut conn = self.client.as_ref().clone();
224 let plugin_list_key = self.plugin_list_key();
225
226 let total: u64 = conn
228 .scard(&plugin_list_key)
229 .await
230 .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
231
232 if total == 0 {
233 return Ok(PaginatedResult {
234 items: vec![],
235 total: 0,
236 page: query.page,
237 per_page: query.per_page,
238 });
239 }
240
241 let all_ids: Vec<String> = conn
243 .smembers(&plugin_list_key)
244 .await
245 .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
246
247 let start = ((query.page - 1) * query.per_page) as usize;
248 let end = (start + query.per_page as usize).min(all_ids.len());
249
250 let ids_to_query = &all_ids[start..end];
251 let items = self.get_by_ids(ids_to_query).await?;
252
253 Ok(PaginatedResult {
254 items: items.results.clone(),
255 total,
256 page: query.page,
257 per_page: query.per_page,
258 })
259 }
260
261 async fn count(&self) -> Result<usize, RepositoryError> {
262 let mut conn = self.client.as_ref().clone();
263 let plugin_list_key = self.plugin_list_key();
264
265 let count: u64 = conn
266 .scard(&plugin_list_key)
267 .await
268 .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
269
270 Ok(count as usize)
271 }
272
273 async fn has_entries(&self) -> Result<bool, RepositoryError> {
274 let mut conn = self.client.as_ref().clone();
275 let plugin_list_key = self.plugin_list_key();
276
277 debug!("checking if plugin entries exist");
278
279 let exists: bool = conn
280 .exists(&plugin_list_key)
281 .await
282 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
283
284 debug!(exists = %exists, "plugin entries exist");
285 Ok(exists)
286 }
287
288 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
289 let mut conn = self.client.as_ref().clone();
290 let plugin_list_key = self.plugin_list_key();
291
292 debug!("dropping all plugin entries");
293
294 let plugin_ids: Vec<String> = conn
296 .smembers(&plugin_list_key)
297 .await
298 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
299
300 if plugin_ids.is_empty() {
301 debug!("no plugin entries to drop");
302 return Ok(());
303 }
304
305 let mut pipe = redis::pipe();
307 pipe.atomic();
308
309 for plugin_id in &plugin_ids {
311 let plugin_key = self.plugin_key(plugin_id);
312 pipe.del(&plugin_key);
313 }
314
315 pipe.del(&plugin_list_key);
317
318 pipe.exec_async(&mut conn)
319 .await
320 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
321
322 debug!(count = %plugin_ids.len(), "dropped plugin entries");
323 Ok(())
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
331 use crate::models::PluginModel;
332 use std::{sync::Arc, time::Duration};
333
334 fn create_test_plugin(id: &str, path: &str) -> PluginModel {
335 PluginModel {
336 id: id.to_string(),
337 path: path.to_string(),
338 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
339 emit_logs: false,
340 emit_traces: false,
341 }
342 }
343
344 async fn setup_test_repo() -> RedisPluginRepository {
345 let redis_url =
346 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
347 let client = redis::Client::open(redis_url).expect("Failed to create Redis client");
348 let mut connection_manager = ConnectionManager::new(client)
349 .await
350 .expect("Failed to create Redis connection manager");
351
352 connection_manager
354 .del::<&str, ()>("test_plugin:plugin_list")
355 .await
356 .unwrap();
357
358 RedisPluginRepository::new(Arc::new(connection_manager), "test_plugin".to_string())
359 .expect("Failed to create Redis plugin repository")
360 }
361
362 #[tokio::test]
363 #[ignore = "Requires active Redis instance"]
364 async fn test_new_repository_creation() {
365 let repo = setup_test_repo().await;
366 assert_eq!(repo.key_prefix, "test_plugin");
367 }
368
369 #[tokio::test]
370 #[ignore = "Requires active Redis instance"]
371 async fn test_new_repository_empty_prefix_fails() {
372 let client =
373 redis::Client::open("redis://127.0.0.1:6379/").expect("Failed to create Redis client");
374 let connection_manager = redis::aio::ConnectionManager::new(client)
375 .await
376 .expect("Failed to create Redis connection manager");
377
378 let result = RedisPluginRepository::new(Arc::new(connection_manager), "".to_string());
379 assert!(result.is_err());
380 assert!(result
381 .unwrap_err()
382 .to_string()
383 .contains("key prefix cannot be empty"));
384 }
385
386 #[tokio::test]
387 #[ignore = "Requires active Redis instance"]
388 async fn test_key_generation() {
389 let repo = setup_test_repo().await;
390
391 let plugin_key = repo.plugin_key("test-plugin");
392 assert_eq!(plugin_key, "test_plugin:plugin:test-plugin");
393
394 let list_key = repo.plugin_list_key();
395 assert_eq!(list_key, "test_plugin:plugin_list");
396 }
397
398 #[tokio::test]
399 #[ignore = "Requires active Redis instance"]
400 async fn test_serialize_deserialize_plugin() {
401 let repo = setup_test_repo().await;
402 let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
403
404 let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
405 let deserialized: PluginModel = repo
406 .deserialize_entity(&json, &plugin.id, "plugin")
407 .unwrap();
408
409 assert_eq!(plugin.id, deserialized.id);
410 assert_eq!(plugin.path, deserialized.path);
411 }
412
413 #[tokio::test]
414 #[ignore = "Requires active Redis instance"]
415 async fn test_add_plugin() {
416 let repo = setup_test_repo().await;
417 let plugin_name = uuid::Uuid::new_v4().to_string();
418 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
419
420 let result = repo.add(plugin).await;
421 assert!(result.is_ok());
422 }
423
424 #[tokio::test]
425 #[ignore = "Requires active Redis instance"]
426 async fn test_get_plugin() {
427 let repo = setup_test_repo().await;
428 let plugin_name = uuid::Uuid::new_v4().to_string();
429 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
430
431 repo.add(plugin.clone()).await.unwrap();
433
434 let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
436 assert!(retrieved.is_some());
437 let retrieved = retrieved.unwrap();
438 assert_eq!(retrieved.id, plugin.id);
439 assert_eq!(retrieved.path, plugin.path);
440 }
441
442 #[tokio::test]
443 #[ignore = "Requires active Redis instance"]
444 async fn test_get_nonexistent_plugin() {
445 let repo = setup_test_repo().await;
446
447 let result = repo.get_by_id("nonexistent-plugin").await;
448 assert!(matches!(result, Ok(None)));
449 }
450
451 #[tokio::test]
452 #[ignore = "Requires active Redis instance"]
453 async fn test_duplicate_plugin_addition() {
454 let repo = setup_test_repo().await;
455 let plugin_name = uuid::Uuid::new_v4().to_string();
456 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
457
458 repo.add(plugin.clone()).await.unwrap();
460
461 let result = repo.add(plugin).await;
463 assert!(result.is_err());
464
465 if let Err(RepositoryError::ConstraintViolation(msg)) = result {
466 assert!(msg.contains("already exists"));
467 } else {
468 panic!("Expected ConstraintViolation error");
469 }
470 }
471
472 #[tokio::test]
473 #[ignore = "Requires active Redis instance"]
474 async fn test_debug_implementation() {
475 let repo = setup_test_repo().await;
476 let debug_str = format!("{:?}", repo);
477 assert!(debug_str.contains("RedisPluginRepository"));
478 assert!(debug_str.contains("test_plugin"));
479 }
480
481 #[tokio::test]
482 #[ignore = "Requires active Redis instance"]
483 async fn test_error_handling_empty_id() {
484 let repo = setup_test_repo().await;
485
486 let result = repo.get_by_id("").await;
487 assert!(result.is_err());
488 assert!(result
489 .unwrap_err()
490 .to_string()
491 .contains("ID cannot be empty"));
492 }
493
494 #[tokio::test]
495 #[ignore = "Requires active Redis instance"]
496 async fn test_add_plugin_with_empty_id() {
497 let repo = setup_test_repo().await;
498 let plugin = create_test_plugin("", "/path/to/plugin");
499
500 let result = repo.add(plugin).await;
501 assert!(result.is_err());
502 assert!(result
503 .unwrap_err()
504 .to_string()
505 .contains("ID cannot be empty"));
506 }
507
508 #[tokio::test]
509 #[ignore = "Requires active Redis instance"]
510 async fn test_add_plugin_with_empty_path() {
511 let repo = setup_test_repo().await;
512 let plugin = create_test_plugin("test-plugin", "");
513
514 let result = repo.add(plugin).await;
515 assert!(result.is_err());
516 assert!(result
517 .unwrap_err()
518 .to_string()
519 .contains("path cannot be empty"));
520 }
521
522 #[tokio::test]
523 #[ignore = "Requires active Redis instance"]
524 async fn test_get_by_ids_plugins() {
525 let repo = setup_test_repo().await;
526 let plugin_name1 = uuid::Uuid::new_v4().to_string();
527 let plugin_name2 = uuid::Uuid::new_v4().to_string();
528 let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
529 let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
530
531 repo.add(plugin1.clone()).await.unwrap();
532 repo.add(plugin2.clone()).await.unwrap();
533
534 let retrieved = repo
535 .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
536 .await
537 .unwrap();
538 assert!(retrieved.results.len() == 2);
539 assert_eq!(retrieved.results[0].id, plugin2.id);
540 assert_eq!(retrieved.results[1].id, plugin1.id);
541 assert_eq!(retrieved.failed_ids.len(), 0);
542 }
543
544 #[tokio::test]
545 #[ignore = "Requires active Redis instance"]
546 async fn test_list_paginated_plugins() {
547 let repo = setup_test_repo().await;
548
549 let plugin_id1 = uuid::Uuid::new_v4().to_string();
550 let plugin_id2 = uuid::Uuid::new_v4().to_string();
551 let plugin_id3 = uuid::Uuid::new_v4().to_string();
552 let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
553 let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
554 let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
555
556 repo.add(plugin1.clone()).await.unwrap();
557 repo.add(plugin2.clone()).await.unwrap();
558 repo.add(plugin3.clone()).await.unwrap();
559
560 let query = PaginationQuery {
561 page: 1,
562 per_page: 2,
563 };
564
565 let result = repo.list_paginated(query).await;
566 assert!(result.is_ok());
567 let result = result.unwrap();
568 assert!(result.items.len() == 2);
569 }
570
571 #[tokio::test]
572 #[ignore = "Requires active Redis instance"]
573 async fn test_has_entries() {
574 let repo = setup_test_repo().await;
575 assert!(!repo.has_entries().await.unwrap());
576 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
577 .await
578 .unwrap();
579 assert!(repo.has_entries().await.unwrap());
580 repo.drop_all_entries().await.unwrap();
581 assert!(!repo.has_entries().await.unwrap());
582 }
583
584 #[tokio::test]
585 #[ignore = "Requires active Redis instance"]
586 async fn test_drop_all_entries() {
587 let repo = setup_test_repo().await;
588 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
589 .await
590 .unwrap();
591 assert!(repo.has_entries().await.unwrap());
592 repo.drop_all_entries().await.unwrap();
593 assert!(!repo.has_entries().await.unwrap());
594 }
595}