openzeppelin_relayer/repositories/notification/
notification_in_memory.rs

1//! This module defines an in-memory notification repository for managing
2//! notifications. It provides full CRUD functionality including create, retrieve,
3//! update, delete, and list operations. The repository is implemented using a
4//! `Mutex`-protected `HashMap` to ensure thread safety in asynchronous contexts.
5
6use crate::{
7    models::{NotificationConfig, NotificationRepoModel, RepositoryError},
8    repositories::*,
9};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use tokio::sync::{Mutex, MutexGuard};
13
14#[derive(Debug)]
15pub struct InMemoryNotificationRepository {
16    store: Mutex<HashMap<String, NotificationRepoModel>>,
17}
18
19impl Clone for InMemoryNotificationRepository {
20    fn clone(&self) -> Self {
21        // Try to get the current data, or use empty HashMap if lock fails
22        let data = self
23            .store
24            .try_lock()
25            .map(|guard| guard.clone())
26            .unwrap_or_else(|_| HashMap::new());
27
28        Self {
29            store: Mutex::new(data),
30        }
31    }
32}
33
34#[allow(dead_code)]
35impl InMemoryNotificationRepository {
36    pub fn new() -> Self {
37        Self {
38            store: Mutex::new(HashMap::new()),
39        }
40    }
41
42    async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
43        Ok(lock.lock().await)
44    }
45}
46
47impl Default for InMemoryNotificationRepository {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53#[async_trait]
54impl Repository<NotificationRepoModel, String> for InMemoryNotificationRepository {
55    async fn create(
56        &self,
57        notification: NotificationRepoModel,
58    ) -> Result<NotificationRepoModel, RepositoryError> {
59        let mut store = Self::acquire_lock(&self.store).await?;
60        if store.contains_key(&notification.id) {
61            return Err(RepositoryError::ConstraintViolation(format!(
62                "Notification with ID '{}' already exists",
63                notification.id
64            )));
65        }
66        store.insert(notification.id.clone(), notification.clone());
67        Ok(notification)
68    }
69
70    async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
71        let store = Self::acquire_lock(&self.store).await?;
72        match store.get(&id) {
73            Some(entity) => Ok(entity.clone()),
74            None => Err(RepositoryError::NotFound(format!(
75                "Notification with ID '{id}' not found"
76            ))),
77        }
78    }
79
80    #[allow(clippy::map_entry)]
81    async fn update(
82        &self,
83        id: String,
84        notification: NotificationRepoModel,
85    ) -> Result<NotificationRepoModel, RepositoryError> {
86        let mut store = Self::acquire_lock(&self.store).await?;
87
88        // Check if notification exists
89        if !store.contains_key(&id) {
90            return Err(RepositoryError::NotFound(format!(
91                "Notification with ID '{id}' not found"
92            )));
93        }
94
95        if id != notification.id {
96            return Err(RepositoryError::InvalidData(format!(
97                "ID mismatch: URL parameter '{}' does not match entity ID '{}'",
98                id, notification.id
99            )));
100        }
101
102        store.insert(id, notification.clone());
103        Ok(notification)
104    }
105
106    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
107        let mut store = Self::acquire_lock(&self.store).await?;
108
109        match store.remove(&id) {
110            Some(_) => Ok(()),
111            None => Err(RepositoryError::NotFound(format!(
112                "Notification with ID {id} not found"
113            ))),
114        }
115    }
116
117    async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
118        let store = Self::acquire_lock(&self.store).await?;
119        let notifications: Vec<NotificationRepoModel> = store.values().cloned().collect();
120        Ok(notifications)
121    }
122
123    async fn list_paginated(
124        &self,
125        query: PaginationQuery,
126    ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
127        let total = self.count().await?;
128        let start = ((query.page - 1) * query.per_page) as usize;
129        let items: Vec<NotificationRepoModel> = self
130            .store
131            .lock()
132            .await
133            .values()
134            .skip(start)
135            .take(query.per_page as usize)
136            .cloned()
137            .collect();
138
139        Ok(PaginatedResult {
140            items,
141            total: total as u64,
142            page: query.page,
143            per_page: query.per_page,
144        })
145    }
146
147    async fn count(&self) -> Result<usize, RepositoryError> {
148        let store = Self::acquire_lock(&self.store).await?;
149        let length = store.len();
150        Ok(length)
151    }
152
153    async fn has_entries(&self) -> Result<bool, RepositoryError> {
154        let store = Self::acquire_lock(&self.store).await?;
155        Ok(!store.is_empty())
156    }
157
158    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
159        let mut store = Self::acquire_lock(&self.store).await?;
160        store.clear();
161        Ok(())
162    }
163}
164
165impl TryFrom<NotificationConfig> for NotificationRepoModel {
166    type Error = ConversionError;
167
168    fn try_from(config: NotificationConfig) -> Result<Self, Self::Error> {
169        let signing_key = config.get_signing_key().map_err(|e| {
170            ConversionError::InvalidConfig(format!("Failed to get signing key: {e}"))
171        })?;
172
173        Ok(NotificationRepoModel {
174            id: config.id.clone(),
175            url: config.url.clone(),
176            notification_type: config.r#type,
177            signing_key,
178        })
179    }
180}
181#[cfg(test)]
182mod tests {
183    use crate::models::NotificationType;
184
185    use super::*;
186
187    fn create_test_notification(id: String) -> NotificationRepoModel {
188        NotificationRepoModel {
189            id: id.clone(),
190            url: "http://localhost".to_string(),
191            notification_type: NotificationType::Webhook,
192            signing_key: None,
193        }
194    }
195
196    #[actix_web::test]
197    async fn test_new_repository_is_empty() {
198        let repo = InMemoryNotificationRepository::new();
199        assert_eq!(repo.count().await.unwrap(), 0);
200    }
201
202    #[actix_web::test]
203    async fn test_add_notification() {
204        let repo = InMemoryNotificationRepository::new();
205        let notification = create_test_notification("test".to_string());
206
207        repo.create(notification.clone()).await.unwrap();
208        assert_eq!(repo.count().await.unwrap(), 1);
209
210        let stored = repo.get_by_id("test".to_string()).await.unwrap();
211        assert_eq!(stored.id, notification.id);
212    }
213
214    #[actix_web::test]
215    async fn test_update_notification() {
216        let repo = InMemoryNotificationRepository::new();
217        let notification = create_test_notification("test".to_string());
218
219        // First create the notification
220        repo.create(notification.clone()).await.unwrap();
221
222        // Update the notification
223        let mut updated_notification = notification.clone();
224        updated_notification.url = "http://updated.example.com".to_string();
225
226        let result = repo
227            .update("test".to_string(), updated_notification.clone())
228            .await;
229        assert!(result.is_ok());
230
231        let updated = result.unwrap();
232        assert_eq!(updated.id, "test");
233        assert_eq!(updated.url, "http://updated.example.com");
234
235        // Verify the update persisted
236        let stored = repo.get_by_id("test".to_string()).await.unwrap();
237        assert_eq!(stored.url, "http://updated.example.com");
238    }
239
240    #[actix_web::test]
241    async fn test_list_notifications() {
242        let repo = InMemoryNotificationRepository::new();
243        let notification1 = create_test_notification("test".to_string());
244        let notification2 = create_test_notification("test2".to_string());
245
246        repo.create(notification1.clone()).await.unwrap();
247        repo.create(notification2).await.unwrap();
248
249        let notifications = repo.list_all().await.unwrap();
250        assert_eq!(notifications.len(), 2);
251    }
252
253    #[actix_web::test]
254    async fn test_update_nonexistent_notification() {
255        let repo = InMemoryNotificationRepository::new();
256        let notification = create_test_notification("test".to_string());
257
258        let result = repo.update("test2".to_string(), notification).await;
259        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
260    }
261
262    #[actix_web::test]
263    async fn test_get_nonexistent_notification() {
264        let repo = InMemoryNotificationRepository::new();
265
266        let result = repo.get_by_id("test".to_string()).await;
267        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
268    }
269
270    // test has_entries
271    #[actix_web::test]
272    async fn test_has_entries() {
273        let repo = InMemoryNotificationRepository::new();
274        assert!(!repo.has_entries().await.unwrap());
275
276        let notification = create_test_notification("test".to_string());
277
278        repo.create(notification.clone()).await.unwrap();
279        assert!(repo.has_entries().await.unwrap());
280    }
281
282    #[actix_web::test]
283    async fn test_drop_all_entries() {
284        let repo = InMemoryNotificationRepository::new();
285        let notification = create_test_notification("test".to_string());
286
287        repo.create(notification.clone()).await.unwrap();
288        assert!(repo.has_entries().await.unwrap());
289
290        repo.drop_all_entries().await.unwrap();
291        assert!(!repo.has_entries().await.unwrap());
292    }
293
294    #[actix_web::test]
295    async fn test_delete_notification() {
296        let repo = InMemoryNotificationRepository::new();
297        let notification = create_test_notification("test".to_string());
298
299        // Create the notification first
300        repo.create(notification.clone()).await.unwrap();
301        assert_eq!(repo.count().await.unwrap(), 1);
302
303        // Delete the notification
304        let result = repo.delete_by_id("test".to_string()).await;
305        assert!(result.is_ok());
306
307        // Verify it's gone
308        assert_eq!(repo.count().await.unwrap(), 0);
309        let get_result = repo.get_by_id("test".to_string()).await;
310        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
311    }
312
313    #[actix_web::test]
314    async fn test_delete_nonexistent_notification() {
315        let repo = InMemoryNotificationRepository::new();
316
317        let result = repo.delete_by_id("nonexistent".to_string()).await;
318        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
319    }
320
321    #[actix_web::test]
322    async fn test_update_with_id_mismatch() {
323        let repo = InMemoryNotificationRepository::new();
324        let notification = create_test_notification("test".to_string());
325
326        // Create the notification first
327        repo.create(notification.clone()).await.unwrap();
328
329        // Try to update with mismatched ID
330        let mut updated_notification = notification.clone();
331        updated_notification.id = "different-id".to_string();
332
333        let result = repo.update("test".to_string(), updated_notification).await;
334        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
335    }
336
337    #[actix_web::test]
338    async fn test_update_delete_integration() {
339        let repo = InMemoryNotificationRepository::new();
340        let notification1 = create_test_notification("test1".to_string());
341        let notification2 = create_test_notification("test2".to_string());
342
343        // Create two notifications
344        repo.create(notification1.clone()).await.unwrap();
345        repo.create(notification2.clone()).await.unwrap();
346        assert_eq!(repo.count().await.unwrap(), 2);
347
348        // Update the first notification
349        let mut updated_notification1 = notification1.clone();
350        updated_notification1.url = "http://updated.example.com".to_string();
351
352        let update_result = repo
353            .update("test1".to_string(), updated_notification1)
354            .await;
355        assert!(update_result.is_ok());
356
357        // Verify the update
358        let stored = repo.get_by_id("test1".to_string()).await.unwrap();
359        assert_eq!(stored.url, "http://updated.example.com");
360
361        // Delete the second notification
362        let delete_result = repo.delete_by_id("test2".to_string()).await;
363        assert!(delete_result.is_ok());
364
365        // Verify final state
366        assert_eq!(repo.count().await.unwrap(), 1);
367        let remaining = repo.list_all().await.unwrap();
368        assert_eq!(remaining.len(), 1);
369        assert_eq!(remaining[0].id, "test1");
370        assert_eq!(remaining[0].url, "http://updated.example.com");
371    }
372}