openzeppelin_relayer/jobs/
job.rs

1//! Job processing module for handling asynchronous tasks.
2//!
3//! Provides generic job structure for different types of operations:
4//! - Transaction processing
5//! - Status monitoring
6//! - Notifications
7use crate::models::{NetworkType, WebhookNotification};
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use strum::Display;
12use uuid::Uuid;
13
14// Common message structure
15#[derive(Debug, Serialize, Deserialize, Clone)]
16pub struct Job<T> {
17    pub message_id: String,
18    pub version: String,
19    pub timestamp: String,
20    pub job_type: JobType,
21    pub data: T,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub request_id: Option<String>,
24}
25
26impl<T> Job<T> {
27    pub fn new(job_type: JobType, data: T) -> Self {
28        Self {
29            message_id: Uuid::new_v4().to_string(),
30            version: "1.0".to_string(),
31            timestamp: Utc::now().timestamp().to_string(),
32            job_type,
33            data,
34            request_id: None,
35        }
36    }
37    pub fn with_request_id(mut self, id: Option<String>) -> Self {
38        self.request_id = id;
39        self
40    }
41}
42
43// Enum to represent different message types
44#[derive(Debug, Serialize, Deserialize, Display, Clone)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum JobType {
47    TransactionRequest,
48    TransactionSend,
49    TransactionStatusCheck,
50    NotificationSend,
51    SolanaTokenSwapRequest,
52    RelayerHealthCheck,
53}
54
55// Example message data for transaction request
56#[derive(Debug, Serialize, Deserialize, Clone)]
57pub struct TransactionRequest {
58    pub transaction_id: String,
59    pub relayer_id: String,
60    pub metadata: Option<HashMap<String, String>>,
61}
62
63impl TransactionRequest {
64    pub fn new(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
65        Self {
66            transaction_id: transaction_id.into(),
67            relayer_id: relayer_id.into(),
68            metadata: None,
69        }
70    }
71
72    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
73        self.metadata = Some(metadata);
74        self
75    }
76}
77
78#[derive(Debug, Serialize, Deserialize, Clone)]
79pub enum TransactionCommand {
80    Submit,
81    Cancel { reason: String },
82    Resubmit,
83    Resend,
84}
85
86// Example message data for order creation
87#[derive(Debug, Serialize, Deserialize, Clone)]
88pub struct TransactionSend {
89    pub transaction_id: String,
90    pub relayer_id: String,
91    pub command: TransactionCommand,
92    pub metadata: Option<HashMap<String, String>>,
93}
94
95impl TransactionSend {
96    pub fn submit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
97        Self {
98            transaction_id: transaction_id.into(),
99            relayer_id: relayer_id.into(),
100            command: TransactionCommand::Submit,
101            metadata: None,
102        }
103    }
104
105    pub fn cancel(
106        transaction_id: impl Into<String>,
107        relayer_id: impl Into<String>,
108        reason: impl Into<String>,
109    ) -> Self {
110        Self {
111            transaction_id: transaction_id.into(),
112            relayer_id: relayer_id.into(),
113            command: TransactionCommand::Cancel {
114                reason: reason.into(),
115            },
116            metadata: None,
117        }
118    }
119
120    pub fn resubmit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
121        Self {
122            transaction_id: transaction_id.into(),
123            relayer_id: relayer_id.into(),
124            command: TransactionCommand::Resubmit,
125            metadata: None,
126        }
127    }
128
129    pub fn resend(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
130        Self {
131            transaction_id: transaction_id.into(),
132            relayer_id: relayer_id.into(),
133            command: TransactionCommand::Resend,
134            metadata: None,
135        }
136    }
137
138    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
139        self.metadata = Some(metadata);
140        self
141    }
142}
143
144// Struct for individual order item
145#[derive(Debug, Serialize, Deserialize, Clone)]
146pub struct TransactionStatusCheck {
147    pub transaction_id: String,
148    pub relayer_id: String,
149    /// Network type for this transaction status check.
150    /// Optional for backward compatibility with older queued messages.
151    #[serde(default)]
152    pub network_type: Option<NetworkType>,
153    pub metadata: Option<HashMap<String, String>>,
154}
155
156impl TransactionStatusCheck {
157    pub fn new(
158        transaction_id: impl Into<String>,
159        relayer_id: impl Into<String>,
160        network_type: NetworkType,
161    ) -> Self {
162        Self {
163            transaction_id: transaction_id.into(),
164            relayer_id: relayer_id.into(),
165            network_type: Some(network_type),
166            metadata: None,
167        }
168    }
169
170    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
171        self.metadata = Some(metadata);
172        self
173    }
174}
175
176#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
177pub struct NotificationSend {
178    pub notification_id: String,
179    pub notification: WebhookNotification,
180}
181
182impl NotificationSend {
183    pub fn new(notification_id: String, notification: WebhookNotification) -> Self {
184        Self {
185            notification_id,
186            notification,
187        }
188    }
189}
190
191#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
192pub struct SolanaTokenSwapRequest {
193    pub relayer_id: String,
194}
195
196impl SolanaTokenSwapRequest {
197    pub fn new(relayer_id: String) -> Self {
198        Self { relayer_id }
199    }
200}
201
202#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
203pub struct RelayerHealthCheck {
204    pub relayer_id: String,
205    pub retry_count: u32,
206}
207
208impl RelayerHealthCheck {
209    pub fn new(relayer_id: String) -> Self {
210        Self {
211            relayer_id,
212            retry_count: 0,
213        }
214    }
215
216    pub fn with_retry_count(relayer_id: String, retry_count: u32) -> Self {
217        Self {
218            relayer_id,
219            retry_count,
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use std::collections::HashMap;
227    use std::str::FromStr;
228
229    use crate::models::{
230        evm::Speed, EvmTransactionDataSignature, EvmTransactionResponse, TransactionResponse,
231        TransactionStatus, WebhookNotification, WebhookPayload, U256,
232    };
233
234    use super::*;
235
236    #[test]
237    fn test_job_creation() {
238        let job_data = TransactionRequest::new("tx123", "relayer-1");
239        let job = Job::new(JobType::TransactionRequest, job_data.clone());
240
241        assert_eq!(job.job_type.to_string(), "TransactionRequest");
242        assert_eq!(job.version, "1.0");
243        assert_eq!(job.data.transaction_id, "tx123");
244        assert_eq!(job.data.relayer_id, "relayer-1");
245        assert!(job.data.metadata.is_none());
246    }
247
248    #[test]
249    fn test_transaction_request_with_metadata() {
250        let mut metadata = HashMap::new();
251        metadata.insert("chain_id".to_string(), "1".to_string());
252        metadata.insert("gas_price".to_string(), "20000000000".to_string());
253
254        let tx_request =
255            TransactionRequest::new("tx123", "relayer-1").with_metadata(metadata.clone());
256
257        assert_eq!(tx_request.transaction_id, "tx123");
258        assert_eq!(tx_request.relayer_id, "relayer-1");
259        assert!(tx_request.metadata.is_some());
260        assert_eq!(tx_request.metadata.unwrap(), metadata);
261    }
262
263    #[test]
264    fn test_transaction_send_methods() {
265        // Test submit
266        let tx_submit = TransactionSend::submit("tx123", "relayer-1");
267        assert_eq!(tx_submit.transaction_id, "tx123");
268        assert_eq!(tx_submit.relayer_id, "relayer-1");
269        matches!(tx_submit.command, TransactionCommand::Submit);
270
271        // Test cancel
272        let tx_cancel = TransactionSend::cancel("tx123", "relayer-1", "user requested");
273        matches!(tx_cancel.command, TransactionCommand::Cancel { reason } if reason == "user requested");
274
275        // Test resubmit
276        let tx_resubmit = TransactionSend::resubmit("tx123", "relayer-1");
277        matches!(tx_resubmit.command, TransactionCommand::Resubmit);
278
279        // Test resend
280        let tx_resend = TransactionSend::resend("tx123", "relayer-1");
281        matches!(tx_resend.command, TransactionCommand::Resend);
282
283        // Test with_metadata
284        let mut metadata = HashMap::new();
285        metadata.insert("nonce".to_string(), "5".to_string());
286
287        let tx_with_metadata =
288            TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
289
290        assert!(tx_with_metadata.metadata.is_some());
291        assert_eq!(tx_with_metadata.metadata.unwrap(), metadata);
292    }
293
294    #[test]
295    fn test_transaction_status_check() {
296        let tx_status = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
297        assert_eq!(tx_status.transaction_id, "tx123");
298        assert_eq!(tx_status.relayer_id, "relayer-1");
299        assert_eq!(tx_status.network_type, Some(NetworkType::Evm));
300        assert!(tx_status.metadata.is_none());
301
302        let mut metadata = HashMap::new();
303        metadata.insert("retries".to_string(), "3".to_string());
304
305        let tx_status_with_metadata =
306            TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Stellar)
307                .with_metadata(metadata.clone());
308
309        assert!(tx_status_with_metadata.metadata.is_some());
310        assert_eq!(tx_status_with_metadata.metadata.unwrap(), metadata);
311    }
312
313    #[test]
314    fn test_transaction_status_check_backward_compatibility() {
315        // Simulate an old message without network_type field
316        let old_json = r#"{
317            "transaction_id": "tx456",
318            "relayer_id": "relayer-2",
319            "metadata": null
320        }"#;
321
322        // Should deserialize successfully with network_type defaulting to None
323        let deserialized: TransactionStatusCheck = serde_json::from_str(old_json).unwrap();
324        assert_eq!(deserialized.transaction_id, "tx456");
325        assert_eq!(deserialized.relayer_id, "relayer-2");
326        assert_eq!(deserialized.network_type, None);
327        assert!(deserialized.metadata.is_none());
328
329        // New messages should include network_type
330        let new_status = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Solana);
331        assert_eq!(new_status.network_type, Some(NetworkType::Solana));
332    }
333
334    #[test]
335    fn test_job_serialization() {
336        let tx_request = TransactionRequest::new("tx123", "relayer-1");
337        let job = Job::new(JobType::TransactionRequest, tx_request);
338
339        let serialized = serde_json::to_string(&job).unwrap();
340        let deserialized: Job<TransactionRequest> = serde_json::from_str(&serialized).unwrap();
341
342        assert_eq!(deserialized.job_type.to_string(), "TransactionRequest");
343        assert_eq!(deserialized.data.transaction_id, "tx123");
344        assert_eq!(deserialized.data.relayer_id, "relayer-1");
345    }
346
347    #[test]
348    fn test_notification_send_serialization() {
349        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
350            EvmTransactionResponse {
351                id: "tx123".to_string(),
352                hash: Some("0x123".to_string()),
353                status: TransactionStatus::Confirmed,
354                status_reason: None,
355                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
356                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
357                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
358                gas_price: Some(1000000000),
359                gas_limit: Some(21000),
360                nonce: Some(1),
361                value: U256::from_str("1000000000000000000").unwrap(),
362                from: "0xabc".to_string(),
363                to: Some("0xdef".to_string()),
364                relayer_id: "relayer-1".to_string(),
365                data: Some("0x123".to_string()),
366                max_fee_per_gas: Some(1000000000),
367                max_priority_fee_per_gas: Some(1000000000),
368                signature: Some(EvmTransactionDataSignature {
369                    r: "0x123".to_string(),
370                    s: "0x123".to_string(),
371                    v: 1,
372                    sig: "0x123".to_string(),
373                }),
374                speed: Some(Speed::Fast),
375            },
376        )));
377
378        let notification = WebhookNotification::new("transaction".to_string(), payload);
379        let notification_send =
380            NotificationSend::new("notification-test".to_string(), notification);
381
382        let serialized = serde_json::to_string(&notification_send).unwrap();
383
384        match serde_json::from_str::<NotificationSend>(&serialized) {
385            Ok(deserialized) => {
386                assert_eq!(notification_send, deserialized);
387            }
388            Err(e) => {
389                panic!("Deserialization error: {}", e);
390            }
391        }
392    }
393
394    #[test]
395    fn test_notification_send_serialization_none_values() {
396        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
397            EvmTransactionResponse {
398                id: "tx123".to_string(),
399                hash: None,
400                status: TransactionStatus::Confirmed,
401                status_reason: None,
402                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
403                sent_at: None,
404                confirmed_at: None,
405                gas_price: None,
406                gas_limit: Some(21000),
407                nonce: None,
408                value: U256::from_str("1000000000000000000").unwrap(),
409                from: "0xabc".to_string(),
410                to: None,
411                relayer_id: "relayer-1".to_string(),
412                data: None,
413                max_fee_per_gas: None,
414                max_priority_fee_per_gas: None,
415                signature: None,
416                speed: None,
417            },
418        )));
419
420        let notification = WebhookNotification::new("transaction".to_string(), payload);
421        let notification_send =
422            NotificationSend::new("notification-test".to_string(), notification);
423
424        let serialized = serde_json::to_string(&notification_send).unwrap();
425
426        match serde_json::from_str::<NotificationSend>(&serialized) {
427            Ok(deserialized) => {
428                assert_eq!(notification_send, deserialized);
429            }
430            Err(e) => {
431                panic!("Deserialization error: {}", e);
432            }
433        }
434    }
435
436    #[test]
437    fn test_relayer_health_check_new() {
438        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
439
440        assert_eq!(health_check.relayer_id, "relayer-1");
441        assert_eq!(health_check.retry_count, 0);
442    }
443
444    #[test]
445    fn test_relayer_health_check_with_retry_count() {
446        let health_check = RelayerHealthCheck::with_retry_count("relayer-1".to_string(), 5);
447
448        assert_eq!(health_check.relayer_id, "relayer-1");
449        assert_eq!(health_check.retry_count, 5);
450    }
451
452    #[test]
453    fn test_relayer_health_check_correct_field_values() {
454        // Test with zero retry count
455        let health_check_zero = RelayerHealthCheck::new("relayer-test-123".to_string());
456        assert_eq!(health_check_zero.relayer_id, "relayer-test-123");
457        assert_eq!(health_check_zero.retry_count, 0);
458
459        // Test with specific retry count
460        let health_check_custom =
461            RelayerHealthCheck::with_retry_count("relayer-abc".to_string(), 10);
462        assert_eq!(health_check_custom.relayer_id, "relayer-abc");
463        assert_eq!(health_check_custom.retry_count, 10);
464
465        // Test with large retry count
466        let health_check_large =
467            RelayerHealthCheck::with_retry_count("relayer-xyz".to_string(), 999);
468        assert_eq!(health_check_large.relayer_id, "relayer-xyz");
469        assert_eq!(health_check_large.retry_count, 999);
470    }
471
472    #[test]
473    fn test_relayer_health_check_job_serialization() {
474        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
475        let job = Job::new(JobType::RelayerHealthCheck, health_check);
476
477        let serialized = serde_json::to_string(&job).unwrap();
478        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
479
480        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
481        assert_eq!(deserialized.data.relayer_id, "relayer-1");
482        assert_eq!(deserialized.data.retry_count, 0);
483    }
484
485    #[test]
486    fn test_relayer_health_check_job_serialization_with_retry_count() {
487        let health_check = RelayerHealthCheck::with_retry_count("relayer-2".to_string(), 3);
488        let job = Job::new(JobType::RelayerHealthCheck, health_check.clone());
489
490        let serialized = serde_json::to_string(&job).unwrap();
491        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
492
493        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
494        assert_eq!(deserialized.data.relayer_id, health_check.relayer_id);
495        assert_eq!(deserialized.data.retry_count, health_check.retry_count);
496        assert_eq!(deserialized.data, health_check);
497    }
498
499    #[test]
500    fn test_relayer_health_check_equality_after_deserialization() {
501        let original_health_check =
502            RelayerHealthCheck::with_retry_count("relayer-test".to_string(), 7);
503        let job = Job::new(JobType::RelayerHealthCheck, original_health_check.clone());
504
505        let serialized = serde_json::to_string(&job).unwrap();
506        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
507
508        // Assert job type string
509        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
510
511        // Assert data equality
512        assert_eq!(deserialized.data, original_health_check);
513        assert_eq!(
514            deserialized.data.relayer_id,
515            original_health_check.relayer_id
516        );
517        assert_eq!(
518            deserialized.data.retry_count,
519            original_health_check.retry_count
520        );
521    }
522}