openzeppelin_relayer/repositories/notification/
notification_in_memory.rs1use crate::{
7 models::{NotificationConfig, NotificationRepoModel, RepositoryError},
8 repositories::*,
9};
10use async_trait::async_trait;
11use std::collections::HashMap;
12use tokio::sync::{Mutex, MutexGuard};
13
14#[derive(Debug)]
15pub struct InMemoryNotificationRepository {
16 store: Mutex<HashMap<String, NotificationRepoModel>>,
17}
18
19impl Clone for InMemoryNotificationRepository {
20 fn clone(&self) -> Self {
21 let data = self
23 .store
24 .try_lock()
25 .map(|guard| guard.clone())
26 .unwrap_or_else(|_| HashMap::new());
27
28 Self {
29 store: Mutex::new(data),
30 }
31 }
32}
33
34#[allow(dead_code)]
35impl InMemoryNotificationRepository {
36 pub fn new() -> Self {
37 Self {
38 store: Mutex::new(HashMap::new()),
39 }
40 }
41
42 async fn acquire_lock<T>(lock: &Mutex<T>) -> Result<MutexGuard<T>, RepositoryError> {
43 Ok(lock.lock().await)
44 }
45}
46
47impl Default for InMemoryNotificationRepository {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53#[async_trait]
54impl Repository<NotificationRepoModel, String> for InMemoryNotificationRepository {
55 async fn create(
56 &self,
57 notification: NotificationRepoModel,
58 ) -> Result<NotificationRepoModel, RepositoryError> {
59 let mut store = Self::acquire_lock(&self.store).await?;
60 if store.contains_key(¬ification.id) {
61 return Err(RepositoryError::ConstraintViolation(format!(
62 "Notification with ID '{}' already exists",
63 notification.id
64 )));
65 }
66 store.insert(notification.id.clone(), notification.clone());
67 Ok(notification)
68 }
69
70 async fn get_by_id(&self, id: String) -> Result<NotificationRepoModel, RepositoryError> {
71 let store = Self::acquire_lock(&self.store).await?;
72 match store.get(&id) {
73 Some(entity) => Ok(entity.clone()),
74 None => Err(RepositoryError::NotFound(format!(
75 "Notification with ID '{id}' not found"
76 ))),
77 }
78 }
79
80 #[allow(clippy::map_entry)]
81 async fn update(
82 &self,
83 id: String,
84 notification: NotificationRepoModel,
85 ) -> Result<NotificationRepoModel, RepositoryError> {
86 let mut store = Self::acquire_lock(&self.store).await?;
87
88 if !store.contains_key(&id) {
90 return Err(RepositoryError::NotFound(format!(
91 "Notification with ID '{id}' not found"
92 )));
93 }
94
95 if id != notification.id {
96 return Err(RepositoryError::InvalidData(format!(
97 "ID mismatch: URL parameter '{}' does not match entity ID '{}'",
98 id, notification.id
99 )));
100 }
101
102 store.insert(id, notification.clone());
103 Ok(notification)
104 }
105
106 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
107 let mut store = Self::acquire_lock(&self.store).await?;
108
109 match store.remove(&id) {
110 Some(_) => Ok(()),
111 None => Err(RepositoryError::NotFound(format!(
112 "Notification with ID {id} not found"
113 ))),
114 }
115 }
116
117 async fn list_all(&self) -> Result<Vec<NotificationRepoModel>, RepositoryError> {
118 let store = Self::acquire_lock(&self.store).await?;
119 let notifications: Vec<NotificationRepoModel> = store.values().cloned().collect();
120 Ok(notifications)
121 }
122
123 async fn list_paginated(
124 &self,
125 query: PaginationQuery,
126 ) -> Result<PaginatedResult<NotificationRepoModel>, RepositoryError> {
127 let total = self.count().await?;
128 let start = ((query.page - 1) * query.per_page) as usize;
129 let items: Vec<NotificationRepoModel> = self
130 .store
131 .lock()
132 .await
133 .values()
134 .skip(start)
135 .take(query.per_page as usize)
136 .cloned()
137 .collect();
138
139 Ok(PaginatedResult {
140 items,
141 total: total as u64,
142 page: query.page,
143 per_page: query.per_page,
144 })
145 }
146
147 async fn count(&self) -> Result<usize, RepositoryError> {
148 let store = Self::acquire_lock(&self.store).await?;
149 let length = store.len();
150 Ok(length)
151 }
152
153 async fn has_entries(&self) -> Result<bool, RepositoryError> {
154 let store = Self::acquire_lock(&self.store).await?;
155 Ok(!store.is_empty())
156 }
157
158 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
159 let mut store = Self::acquire_lock(&self.store).await?;
160 store.clear();
161 Ok(())
162 }
163}
164
165impl TryFrom<NotificationConfig> for NotificationRepoModel {
166 type Error = ConversionError;
167
168 fn try_from(config: NotificationConfig) -> Result<Self, Self::Error> {
169 let signing_key = config.get_signing_key().map_err(|e| {
170 ConversionError::InvalidConfig(format!("Failed to get signing key: {e}"))
171 })?;
172
173 Ok(NotificationRepoModel {
174 id: config.id.clone(),
175 url: config.url.clone(),
176 notification_type: config.r#type,
177 signing_key,
178 })
179 }
180}
181#[cfg(test)]
182mod tests {
183 use crate::models::NotificationType;
184
185 use super::*;
186
187 fn create_test_notification(id: String) -> NotificationRepoModel {
188 NotificationRepoModel {
189 id: id.clone(),
190 url: "http://localhost".to_string(),
191 notification_type: NotificationType::Webhook,
192 signing_key: None,
193 }
194 }
195
196 #[actix_web::test]
197 async fn test_new_repository_is_empty() {
198 let repo = InMemoryNotificationRepository::new();
199 assert_eq!(repo.count().await.unwrap(), 0);
200 }
201
202 #[actix_web::test]
203 async fn test_add_notification() {
204 let repo = InMemoryNotificationRepository::new();
205 let notification = create_test_notification("test".to_string());
206
207 repo.create(notification.clone()).await.unwrap();
208 assert_eq!(repo.count().await.unwrap(), 1);
209
210 let stored = repo.get_by_id("test".to_string()).await.unwrap();
211 assert_eq!(stored.id, notification.id);
212 }
213
214 #[actix_web::test]
215 async fn test_update_notification() {
216 let repo = InMemoryNotificationRepository::new();
217 let notification = create_test_notification("test".to_string());
218
219 repo.create(notification.clone()).await.unwrap();
221
222 let mut updated_notification = notification.clone();
224 updated_notification.url = "http://updated.example.com".to_string();
225
226 let result = repo
227 .update("test".to_string(), updated_notification.clone())
228 .await;
229 assert!(result.is_ok());
230
231 let updated = result.unwrap();
232 assert_eq!(updated.id, "test");
233 assert_eq!(updated.url, "http://updated.example.com");
234
235 let stored = repo.get_by_id("test".to_string()).await.unwrap();
237 assert_eq!(stored.url, "http://updated.example.com");
238 }
239
240 #[actix_web::test]
241 async fn test_list_notifications() {
242 let repo = InMemoryNotificationRepository::new();
243 let notification1 = create_test_notification("test".to_string());
244 let notification2 = create_test_notification("test2".to_string());
245
246 repo.create(notification1.clone()).await.unwrap();
247 repo.create(notification2).await.unwrap();
248
249 let notifications = repo.list_all().await.unwrap();
250 assert_eq!(notifications.len(), 2);
251 }
252
253 #[actix_web::test]
254 async fn test_update_nonexistent_notification() {
255 let repo = InMemoryNotificationRepository::new();
256 let notification = create_test_notification("test".to_string());
257
258 let result = repo.update("test2".to_string(), notification).await;
259 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
260 }
261
262 #[actix_web::test]
263 async fn test_get_nonexistent_notification() {
264 let repo = InMemoryNotificationRepository::new();
265
266 let result = repo.get_by_id("test".to_string()).await;
267 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
268 }
269
270 #[actix_web::test]
272 async fn test_has_entries() {
273 let repo = InMemoryNotificationRepository::new();
274 assert!(!repo.has_entries().await.unwrap());
275
276 let notification = create_test_notification("test".to_string());
277
278 repo.create(notification.clone()).await.unwrap();
279 assert!(repo.has_entries().await.unwrap());
280 }
281
282 #[actix_web::test]
283 async fn test_drop_all_entries() {
284 let repo = InMemoryNotificationRepository::new();
285 let notification = create_test_notification("test".to_string());
286
287 repo.create(notification.clone()).await.unwrap();
288 assert!(repo.has_entries().await.unwrap());
289
290 repo.drop_all_entries().await.unwrap();
291 assert!(!repo.has_entries().await.unwrap());
292 }
293
294 #[actix_web::test]
295 async fn test_delete_notification() {
296 let repo = InMemoryNotificationRepository::new();
297 let notification = create_test_notification("test".to_string());
298
299 repo.create(notification.clone()).await.unwrap();
301 assert_eq!(repo.count().await.unwrap(), 1);
302
303 let result = repo.delete_by_id("test".to_string()).await;
305 assert!(result.is_ok());
306
307 assert_eq!(repo.count().await.unwrap(), 0);
309 let get_result = repo.get_by_id("test".to_string()).await;
310 assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
311 }
312
313 #[actix_web::test]
314 async fn test_delete_nonexistent_notification() {
315 let repo = InMemoryNotificationRepository::new();
316
317 let result = repo.delete_by_id("nonexistent".to_string()).await;
318 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
319 }
320
321 #[actix_web::test]
322 async fn test_update_with_id_mismatch() {
323 let repo = InMemoryNotificationRepository::new();
324 let notification = create_test_notification("test".to_string());
325
326 repo.create(notification.clone()).await.unwrap();
328
329 let mut updated_notification = notification.clone();
331 updated_notification.id = "different-id".to_string();
332
333 let result = repo.update("test".to_string(), updated_notification).await;
334 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
335 }
336
337 #[actix_web::test]
338 async fn test_update_delete_integration() {
339 let repo = InMemoryNotificationRepository::new();
340 let notification1 = create_test_notification("test1".to_string());
341 let notification2 = create_test_notification("test2".to_string());
342
343 repo.create(notification1.clone()).await.unwrap();
345 repo.create(notification2.clone()).await.unwrap();
346 assert_eq!(repo.count().await.unwrap(), 2);
347
348 let mut updated_notification1 = notification1.clone();
350 updated_notification1.url = "http://updated.example.com".to_string();
351
352 let update_result = repo
353 .update("test1".to_string(), updated_notification1)
354 .await;
355 assert!(update_result.is_ok());
356
357 let stored = repo.get_by_id("test1".to_string()).await.unwrap();
359 assert_eq!(stored.url, "http://updated.example.com");
360
361 let delete_result = repo.delete_by_id("test2".to_string()).await;
363 assert!(delete_result.is_ok());
364
365 assert_eq!(repo.count().await.unwrap(), 1);
367 let remaining = repo.list_all().await.unwrap();
368 assert_eq!(remaining.len(), 1);
369 assert_eq!(remaining[0].id, "test1");
370 assert_eq!(remaining[0].url, "http://updated.example.com");
371 }
372}