openzeppelin_relayer/jobs/handlers/
notification_handler.rs1use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12 constants::WORKER_NOTIFICATION_SENDER_RETRIES,
13 jobs::{handle_result, Job, NotificationSend},
14 models::DefaultAppState,
15 observability::request_id::set_request_id,
16 repositories::Repository,
17 services::WebhookNotificationService,
18};
19
20#[instrument(
29 level = "debug",
30 skip(job, context),
31 fields(
32 request_id = ?job.request_id,
33 job_id = %job.message_id,
34 job_type = %job.job_type.to_string(),
35 attempt = %attempt.current(),
36 notification_id = %job.data.notification_id,
37 )
38)]
39pub async fn notification_handler(
40 job: Job<NotificationSend>,
41 context: Data<ThinData<DefaultAppState>>,
42 attempt: Attempt,
43) -> Result<(), Error> {
44 if let Some(request_id) = job.request_id.clone() {
45 set_request_id(request_id);
46 }
47
48 debug!("handling notification {}", job.data.notification_id);
49
50 let result = handle_request(job.data, context).await;
51
52 handle_result(
53 result,
54 attempt,
55 "Notification",
56 WORKER_NOTIFICATION_SENDER_RETRIES,
57 )
58}
59
60async fn handle_request(
61 request: NotificationSend,
62 context: Data<ThinData<DefaultAppState>>,
63) -> Result<()> {
64 debug!("sending notification {}", request.notification_id);
65 let notification = context
66 .notification_repository
67 .get_by_id(request.notification_id)
68 .await?;
69
70 let notification_service =
71 WebhookNotificationService::new(notification.url, notification.signing_key);
72
73 notification_service
74 .send_notification(request.notification)
75 .await?;
76
77 Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use crate::models::{
84 EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
85 RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
86 WebhookNotification, WebhookPayload, U256,
87 };
88
89 #[tokio::test]
90 async fn test_notification_job_creation() {
91 let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
93 EvmTransactionResponse {
94 id: "tx123".to_string(),
95 hash: Some("0x123".to_string()),
96 status: TransactionStatus::Confirmed,
97 status_reason: None,
98 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
99 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
100 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
101 gas_price: Some(1000000000),
102 gas_limit: Some(21000),
103 nonce: Some(1),
104 value: U256::from(1000000000000000000_u64),
105 from: "0xabc".to_string(),
106 to: Some("0xdef".to_string()),
107 relayer_id: "relayer-1".to_string(),
108 data: None,
109 max_fee_per_gas: None,
110 max_priority_fee_per_gas: None,
111 signature: None,
112 speed: None,
113 },
114 )));
115
116 let notification = WebhookNotification::new("test_event".to_string(), payload);
118 let notification_job =
119 NotificationSend::new("notification-1".to_string(), notification.clone());
120
121 let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
123
124 assert_eq!(job.data.notification_id, "notification-1");
126 assert_eq!(job.data.notification.event, "test_event");
127 }
128
129 #[tokio::test]
130 async fn test_notification_job_with_different_payloads() {
131 let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
134 EvmTransactionResponse {
135 id: "tx123".to_string(),
136 hash: Some("0x123".to_string()),
137 status: TransactionStatus::Confirmed,
138 status_reason: None,
139 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
140 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
141 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
142 gas_price: Some(1000000000),
143 gas_limit: Some(21000),
144 nonce: Some(1),
145 value: U256::from(1000000000000000000_u64),
146 from: "0xabc".to_string(),
147 to: Some("0xdef".to_string()),
148 relayer_id: "relayer-1".to_string(),
149 data: None,
150 max_fee_per_gas: None,
151 max_priority_fee_per_gas: None,
152 signature: None,
153 speed: None,
154 },
155 )));
156
157 let string_notification =
158 WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
159 let job = NotificationSend::new("notification-string".to_string(), string_notification);
160 assert_eq!(job.notification.event, "transaction_payload");
161
162 let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
163 relayer: RelayerResponse {
164 id: "relayer-1".to_string(),
165 name: "relayer-1".to_string(),
166 network: "ethereum".to_string(),
167 network_type: NetworkType::Evm,
168 paused: false,
169 policies: Some(RelayerNetworkPolicyResponse::Evm(
170 RelayerEvmPolicy {
171 gas_price_cap: None,
172 whitelist_receivers: None,
173 eip1559_pricing: None,
174 private_transactions: Some(false),
175 min_balance: Some(0),
176 gas_limit_estimation: None,
177 }
178 .into(),
179 )),
180 signer_id: "signer-1".to_string(),
181 notification_id: None,
182 custom_rpc_urls: None,
183 address: Some("0xabc".to_string()),
184 system_disabled: Some(false),
185 ..Default::default()
186 },
187 disable_reason: "test".to_string(),
188 }));
189 let object_notification =
190 WebhookNotification::new("object_event".to_string(), relayer_disabled);
191 let job = NotificationSend::new("notification-object".to_string(), object_notification);
192 assert_eq!(job.notification.event, "object_event");
193 }
194}