openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12    constants::WORKER_NOTIFICATION_SENDER_RETRIES,
13    jobs::{handle_result, Job, NotificationSend},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16    repositories::Repository,
17    services::WebhookNotificationService,
18};
19
20/// Handles incoming notification jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), Error>` - Success or failure of notification processing
28#[instrument(
29    level = "debug",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %attempt.current(),
36        notification_id = %job.data.notification_id,
37    )
38)]
39pub async fn notification_handler(
40    job: Job<NotificationSend>,
41    context: Data<ThinData<DefaultAppState>>,
42    attempt: Attempt,
43) -> Result<(), Error> {
44    if let Some(request_id) = job.request_id.clone() {
45        set_request_id(request_id);
46    }
47
48    debug!("handling notification {}", job.data.notification_id);
49
50    let result = handle_request(job.data, context).await;
51
52    handle_result(
53        result,
54        attempt,
55        "Notification",
56        WORKER_NOTIFICATION_SENDER_RETRIES,
57    )
58}
59
60async fn handle_request(
61    request: NotificationSend,
62    context: Data<ThinData<DefaultAppState>>,
63) -> Result<()> {
64    debug!("sending notification {}", request.notification_id);
65    let notification = context
66        .notification_repository
67        .get_by_id(request.notification_id)
68        .await?;
69
70    let notification_service =
71        WebhookNotificationService::new(notification.url, notification.signing_key);
72
73    notification_service
74        .send_notification(request.notification)
75        .await?;
76
77    Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use crate::models::{
84        EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
85        RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
86        WebhookNotification, WebhookPayload, U256,
87    };
88
89    #[tokio::test]
90    async fn test_notification_job_creation() {
91        // Create a basic notification webhook payload
92        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
93            EvmTransactionResponse {
94                id: "tx123".to_string(),
95                hash: Some("0x123".to_string()),
96                status: TransactionStatus::Confirmed,
97                status_reason: None,
98                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
99                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
100                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
101                gas_price: Some(1000000000),
102                gas_limit: Some(21000),
103                nonce: Some(1),
104                value: U256::from(1000000000000000000_u64),
105                from: "0xabc".to_string(),
106                to: Some("0xdef".to_string()),
107                relayer_id: "relayer-1".to_string(),
108                data: None,
109                max_fee_per_gas: None,
110                max_priority_fee_per_gas: None,
111                signature: None,
112                speed: None,
113            },
114        )));
115
116        // Create a notification
117        let notification = WebhookNotification::new("test_event".to_string(), payload);
118        let notification_job =
119            NotificationSend::new("notification-1".to_string(), notification.clone());
120
121        // Create the job
122        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
123
124        // Test the job structure
125        assert_eq!(job.data.notification_id, "notification-1");
126        assert_eq!(job.data.notification.event, "test_event");
127    }
128
129    #[tokio::test]
130    async fn test_notification_job_with_different_payloads() {
131        // Test with different payload types
132
133        let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
134            EvmTransactionResponse {
135                id: "tx123".to_string(),
136                hash: Some("0x123".to_string()),
137                status: TransactionStatus::Confirmed,
138                status_reason: None,
139                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
140                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
141                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
142                gas_price: Some(1000000000),
143                gas_limit: Some(21000),
144                nonce: Some(1),
145                value: U256::from(1000000000000000000_u64),
146                from: "0xabc".to_string(),
147                to: Some("0xdef".to_string()),
148                relayer_id: "relayer-1".to_string(),
149                data: None,
150                max_fee_per_gas: None,
151                max_priority_fee_per_gas: None,
152                signature: None,
153                speed: None,
154            },
155        )));
156
157        let string_notification =
158            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
159        let job = NotificationSend::new("notification-string".to_string(), string_notification);
160        assert_eq!(job.notification.event, "transaction_payload");
161
162        let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
163            relayer: RelayerResponse {
164                id: "relayer-1".to_string(),
165                name: "relayer-1".to_string(),
166                network: "ethereum".to_string(),
167                network_type: NetworkType::Evm,
168                paused: false,
169                policies: Some(RelayerNetworkPolicyResponse::Evm(
170                    RelayerEvmPolicy {
171                        gas_price_cap: None,
172                        whitelist_receivers: None,
173                        eip1559_pricing: None,
174                        private_transactions: Some(false),
175                        min_balance: Some(0),
176                        gas_limit_estimation: None,
177                    }
178                    .into(),
179                )),
180                signer_id: "signer-1".to_string(),
181                notification_id: None,
182                custom_rpc_urls: None,
183                address: Some("0xabc".to_string()),
184                system_disabled: Some(false),
185                ..Default::default()
186            },
187            disable_reason: "test".to_string(),
188        }));
189        let object_notification =
190            WebhookNotification::new("object_event".to_string(), relayer_disabled);
191        let job = NotificationSend::new("notification-object".to_string(), object_notification);
192        assert_eq!(job.notification.event, "object_event");
193    }
194}