openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, error};
24
25use super::{JobType, SolanaTokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize)]
31pub enum JobProducerError {
32    #[error("Queue error: {0}")]
33    QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37    fn from(_: RedisError) -> Self {
38        JobProducerError::QueueError("Queue error".to_string())
39    }
40}
41
42impl From<JobProducerError> for RelayerError {
43    fn from(_: JobProducerError) -> Self {
44        RelayerError::QueueError("Queue error".to_string())
45    }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50    queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54    fn clone(&self) -> Self {
55        // We can't clone the Mutex directly, but we can create a new one with a cloned Queue
56        // This requires getting the lock first
57        let queue = self
58            .queue
59            .try_lock()
60            .expect("Failed to lock queue for cloning")
61            .clone();
62
63        Self {
64            queue: Mutex::new(queue),
65        }
66    }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72    async fn produce_transaction_request_job(
73        &self,
74        transaction_process_job: TransactionRequest,
75        scheduled_on: Option<i64>,
76    ) -> Result<(), JobProducerError>;
77
78    async fn produce_submit_transaction_job(
79        &self,
80        transaction_submit_job: TransactionSend,
81        scheduled_on: Option<i64>,
82    ) -> Result<(), JobProducerError>;
83
84    async fn produce_check_transaction_status_job(
85        &self,
86        transaction_status_check_job: TransactionStatusCheck,
87        scheduled_on: Option<i64>,
88    ) -> Result<(), JobProducerError>;
89
90    async fn produce_send_notification_job(
91        &self,
92        notification_send_job: NotificationSend,
93        scheduled_on: Option<i64>,
94    ) -> Result<(), JobProducerError>;
95
96    async fn produce_solana_token_swap_request_job(
97        &self,
98        solana_swap_request_job: SolanaTokenSwapRequest,
99        scheduled_on: Option<i64>,
100    ) -> Result<(), JobProducerError>;
101
102    async fn produce_relayer_health_check_job(
103        &self,
104        relayer_health_check_job: RelayerHealthCheck,
105        scheduled_on: Option<i64>,
106    ) -> Result<(), JobProducerError>;
107
108    async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112    pub fn new(queue: Queue) -> Self {
113        Self {
114            queue: Mutex::new(queue.clone()),
115        }
116    }
117
118    pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119        let queue = self.queue.lock().await;
120
121        Ok(queue.clone())
122    }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127    async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128        let queue = self.queue.lock().await;
129
130        Ok(queue.clone())
131    }
132
133    async fn produce_transaction_request_job(
134        &self,
135        transaction_process_job: TransactionRequest,
136        scheduled_on: Option<i64>,
137    ) -> Result<(), JobProducerError> {
138        debug!(
139            "Producing transaction request job: {:?}",
140            transaction_process_job
141        );
142        let mut queue = self.queue.lock().await;
143        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144            .with_request_id(get_request_id());
145
146        match scheduled_on {
147            Some(scheduled_on) => {
148                queue
149                    .transaction_request_queue
150                    .schedule(job, scheduled_on)
151                    .await?;
152            }
153            None => {
154                queue.transaction_request_queue.push(job).await?;
155            }
156        }
157        debug!("Transaction job produced successfully");
158
159        Ok(())
160    }
161
162    async fn produce_submit_transaction_job(
163        &self,
164        transaction_submit_job: TransactionSend,
165        scheduled_on: Option<i64>,
166    ) -> Result<(), JobProducerError> {
167        let mut queue = self.queue.lock().await;
168        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169            .with_request_id(get_request_id());
170
171        match scheduled_on {
172            Some(on) => {
173                queue.transaction_submission_queue.schedule(job, on).await?;
174            }
175            None => {
176                queue.transaction_submission_queue.push(job).await?;
177            }
178        }
179        debug!("Transaction Submit job produced successfully");
180
181        Ok(())
182    }
183
184    async fn produce_check_transaction_status_job(
185        &self,
186        transaction_status_check_job: TransactionStatusCheck,
187        scheduled_on: Option<i64>,
188    ) -> Result<(), JobProducerError> {
189        let mut queue = self.queue.lock().await;
190        let job = Job::new(
191            JobType::TransactionStatusCheck,
192            transaction_status_check_job.clone(),
193        )
194        .with_request_id(get_request_id());
195
196        // Route to the appropriate queue based on network type
197        use crate::models::NetworkType;
198        let status_queue = match transaction_status_check_job.network_type {
199            Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
200            Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
201            _ => &mut queue.transaction_status_queue, // Generic queue or legacy messages without network_type
202        };
203
204        match scheduled_on {
205            Some(on) => {
206                status_queue.schedule(job, on).await?;
207            }
208            None => {
209                status_queue.push(job).await?;
210            }
211        }
212        debug!(
213            network_type = ?transaction_status_check_job.network_type,
214            "Transaction Status Check job produced successfully"
215        );
216        Ok(())
217    }
218
219    async fn produce_send_notification_job(
220        &self,
221        notification_send_job: NotificationSend,
222        scheduled_on: Option<i64>,
223    ) -> Result<(), JobProducerError> {
224        let mut queue = self.queue.lock().await;
225        let job = Job::new(JobType::NotificationSend, notification_send_job)
226            .with_request_id(get_request_id());
227
228        match scheduled_on {
229            Some(on) => {
230                queue.notification_queue.schedule(job, on).await?;
231            }
232            None => {
233                queue.notification_queue.push(job).await?;
234            }
235        }
236
237        debug!("Notification Send job produced successfully");
238        Ok(())
239    }
240
241    async fn produce_solana_token_swap_request_job(
242        &self,
243        solana_swap_request_job: SolanaTokenSwapRequest,
244        scheduled_on: Option<i64>,
245    ) -> Result<(), JobProducerError> {
246        let mut queue = self.queue.lock().await;
247        let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job)
248            .with_request_id(get_request_id());
249
250        match scheduled_on {
251            Some(on) => {
252                queue
253                    .solana_token_swap_request_queue
254                    .schedule(job, on)
255                    .await?;
256            }
257            None => {
258                queue.solana_token_swap_request_queue.push(job).await?;
259            }
260        }
261
262        debug!("Solana token swap job produced successfully");
263        Ok(())
264    }
265
266    async fn produce_relayer_health_check_job(
267        &self,
268        relayer_health_check_job: RelayerHealthCheck,
269        scheduled_on: Option<i64>,
270    ) -> Result<(), JobProducerError> {
271        let job = Job::new(
272            JobType::RelayerHealthCheck,
273            relayer_health_check_job.clone(),
274        )
275        .with_request_id(get_request_id());
276
277        let mut queue = self.queue.lock().await;
278
279        match scheduled_on {
280            Some(scheduled_on) => {
281                queue
282                    .relayer_health_check_queue
283                    .schedule(job, scheduled_on)
284                    .await?;
285            }
286            None => {
287                queue.relayer_health_check_queue.push(job).await?;
288            }
289        }
290
291        Ok(())
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::models::{
299        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
300        WebhookPayload, U256,
301    };
302    use crate::utils::calculate_scheduled_timestamp;
303
304    #[derive(Clone, Debug)]
305    // Define a simplified queue for testing without using complex mocks
306    struct TestRedisStorage<T> {
307        pub push_called: bool,
308        pub schedule_called: bool,
309        _phantom: std::marker::PhantomData<T>,
310    }
311
312    impl<T> TestRedisStorage<T> {
313        fn new() -> Self {
314            Self {
315                push_called: false,
316                schedule_called: false,
317                _phantom: std::marker::PhantomData,
318            }
319        }
320
321        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
322            self.push_called = true;
323            Ok(())
324        }
325
326        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
327            self.schedule_called = true;
328            Ok(())
329        }
330    }
331
332    // A test version of the Queue
333    #[derive(Clone, Debug)]
334    struct TestQueue {
335        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
336        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
337        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
338        pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
339        pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
340        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
341        pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
342        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
343    }
344
345    impl TestQueue {
346        fn new() -> Self {
347            Self {
348                transaction_request_queue: TestRedisStorage::new(),
349                transaction_submission_queue: TestRedisStorage::new(),
350                transaction_status_queue: TestRedisStorage::new(),
351                transaction_status_queue_evm: TestRedisStorage::new(),
352                transaction_status_queue_stellar: TestRedisStorage::new(),
353                notification_queue: TestRedisStorage::new(),
354                solana_token_swap_request_queue: TestRedisStorage::new(),
355                relayer_health_check_queue: TestRedisStorage::new(),
356            }
357        }
358    }
359
360    // A test version of JobProducer
361    struct TestJobProducer {
362        queue: Mutex<TestQueue>,
363    }
364
365    impl Clone for TestJobProducer {
366        fn clone(&self) -> Self {
367            let queue = self
368                .queue
369                .try_lock()
370                .expect("Failed to lock queue for cloning")
371                .clone();
372            Self {
373                queue: Mutex::new(queue),
374            }
375        }
376    }
377
378    impl TestJobProducer {
379        fn new() -> Self {
380            Self {
381                queue: Mutex::new(TestQueue::new()),
382            }
383        }
384
385        async fn get_queue(&self) -> TestQueue {
386            self.queue.lock().await.clone()
387        }
388    }
389
390    #[async_trait]
391    impl JobProducerTrait for TestJobProducer {
392        async fn get_queue(&self) -> Result<Queue, JobProducerError> {
393            unimplemented!("get_queue not used in tests")
394        }
395
396        async fn produce_transaction_request_job(
397            &self,
398            transaction_process_job: TransactionRequest,
399            scheduled_on: Option<i64>,
400        ) -> Result<(), JobProducerError> {
401            let mut queue = self.queue.lock().await;
402            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
403
404            match scheduled_on {
405                Some(scheduled_on) => {
406                    queue
407                        .transaction_request_queue
408                        .schedule(job, scheduled_on)
409                        .await?;
410                }
411                None => {
412                    queue.transaction_request_queue.push(job).await?;
413                }
414            }
415
416            Ok(())
417        }
418
419        async fn produce_submit_transaction_job(
420            &self,
421            transaction_submit_job: TransactionSend,
422            scheduled_on: Option<i64>,
423        ) -> Result<(), JobProducerError> {
424            let mut queue = self.queue.lock().await;
425            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
426
427            match scheduled_on {
428                Some(on) => {
429                    queue.transaction_submission_queue.schedule(job, on).await?;
430                }
431                None => {
432                    queue.transaction_submission_queue.push(job).await?;
433                }
434            }
435
436            Ok(())
437        }
438
439        async fn produce_check_transaction_status_job(
440            &self,
441            transaction_status_check_job: TransactionStatusCheck,
442            scheduled_on: Option<i64>,
443        ) -> Result<(), JobProducerError> {
444            let mut queue = self.queue.lock().await;
445            let job = Job::new(
446                JobType::TransactionStatusCheck,
447                transaction_status_check_job.clone(),
448            );
449
450            // Route to the appropriate queue based on network type
451            use crate::models::NetworkType;
452            let status_queue = match transaction_status_check_job.network_type {
453                Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
454                Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
455                Some(NetworkType::Solana) => &mut queue.transaction_status_queue, // Use default queue
456                None => &mut queue.transaction_status_queue, // Legacy messages without network_type
457            };
458
459            match scheduled_on {
460                Some(on) => {
461                    status_queue.schedule(job, on).await?;
462                }
463                None => {
464                    status_queue.push(job).await?;
465                }
466            }
467
468            Ok(())
469        }
470
471        async fn produce_send_notification_job(
472            &self,
473            notification_send_job: NotificationSend,
474            scheduled_on: Option<i64>,
475        ) -> Result<(), JobProducerError> {
476            let mut queue = self.queue.lock().await;
477            let job = Job::new(JobType::NotificationSend, notification_send_job);
478
479            match scheduled_on {
480                Some(on) => {
481                    queue.notification_queue.schedule(job, on).await?;
482                }
483                None => {
484                    queue.notification_queue.push(job).await?;
485                }
486            }
487
488            Ok(())
489        }
490
491        async fn produce_solana_token_swap_request_job(
492            &self,
493            solana_token_swap_request_job: SolanaTokenSwapRequest,
494            scheduled_on: Option<i64>,
495        ) -> Result<(), JobProducerError> {
496            let mut queue = self.queue.lock().await;
497            let job = Job::new(
498                JobType::SolanaTokenSwapRequest,
499                solana_token_swap_request_job,
500            );
501
502            match scheduled_on {
503                Some(on) => {
504                    queue
505                        .solana_token_swap_request_queue
506                        .schedule(job, on)
507                        .await?;
508                }
509                None => {
510                    queue.solana_token_swap_request_queue.push(job).await?;
511                }
512            }
513
514            Ok(())
515        }
516
517        async fn produce_relayer_health_check_job(
518            &self,
519            relayer_health_check_job: RelayerHealthCheck,
520            scheduled_on: Option<i64>,
521        ) -> Result<(), JobProducerError> {
522            let mut queue = self.queue.lock().await;
523            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
524
525            match scheduled_on {
526                Some(scheduled_on) => {
527                    queue
528                        .relayer_health_check_queue
529                        .schedule(job, scheduled_on)
530                        .await?;
531                }
532                None => {
533                    queue.relayer_health_check_queue.push(job).await?;
534                }
535            }
536
537            Ok(())
538        }
539    }
540
541    #[tokio::test]
542    async fn test_job_producer_operations() {
543        let producer = TestJobProducer::new();
544
545        // Test transaction request job
546        let request = TransactionRequest::new("tx123", "relayer-1");
547        let result = producer
548            .produce_transaction_request_job(request, None)
549            .await;
550        assert!(result.is_ok());
551
552        let queue = producer.get_queue().await;
553        assert!(queue.transaction_request_queue.push_called);
554
555        // Test scheduled job
556        let producer = TestJobProducer::new();
557        let request = TransactionRequest::new("tx123", "relayer-1");
558        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
559        let result = producer
560            .produce_transaction_request_job(request, Some(scheduled_timestamp))
561            .await;
562        assert!(result.is_ok());
563
564        let queue = producer.get_queue().await;
565        assert!(queue.transaction_request_queue.schedule_called);
566    }
567
568    #[tokio::test]
569    async fn test_submit_transaction_job() {
570        let producer = TestJobProducer::new();
571
572        // Test submit transaction job
573        let submit_job = TransactionSend::submit("tx123", "relayer-1");
574        let result = producer
575            .produce_submit_transaction_job(submit_job, None)
576            .await;
577        assert!(result.is_ok());
578
579        let queue = producer.get_queue().await;
580        assert!(queue.transaction_submission_queue.push_called);
581    }
582
583    #[tokio::test]
584    async fn test_check_status_job() {
585        use crate::models::NetworkType;
586        let producer = TestJobProducer::new();
587
588        // Test status check job for EVM
589        let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
590        let result = producer
591            .produce_check_transaction_status_job(status_job, None)
592            .await;
593        assert!(result.is_ok());
594
595        let queue = producer.get_queue().await;
596        assert!(queue.transaction_status_queue_evm.push_called);
597    }
598
599    #[tokio::test]
600    async fn test_notification_job() {
601        let producer = TestJobProducer::new();
602
603        // Create a simple notification for testing
604        let notification = WebhookNotification::new(
605            "test_event".to_string(),
606            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
607                EvmTransactionResponse {
608                    id: "tx123".to_string(),
609                    hash: Some("0x123".to_string()),
610                    status: TransactionStatus::Confirmed,
611                    status_reason: None,
612                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
613                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
614                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
615                    gas_price: Some(1000000000),
616                    gas_limit: Some(21000),
617                    nonce: Some(1),
618                    value: U256::from(1000000000000000000_u64),
619                    from: "0xabc".to_string(),
620                    to: Some("0xdef".to_string()),
621                    relayer_id: "relayer-1".to_string(),
622                    data: None,
623                    max_fee_per_gas: None,
624                    max_priority_fee_per_gas: None,
625                    signature: None,
626                    speed: None,
627                },
628            ))),
629        );
630        let job = NotificationSend::new("notification-1".to_string(), notification);
631
632        let result = producer.produce_send_notification_job(job, None).await;
633        assert!(result.is_ok());
634
635        let queue = producer.get_queue().await;
636        assert!(queue.notification_queue.push_called);
637    }
638
639    #[tokio::test]
640    async fn test_relayer_health_check_job() {
641        let producer = TestJobProducer::new();
642
643        // Test immediate health check job
644        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
645        let result = producer
646            .produce_relayer_health_check_job(health_check, None)
647            .await;
648        assert!(result.is_ok());
649
650        let queue = producer.get_queue().await;
651        assert!(queue.relayer_health_check_queue.push_called);
652
653        // Test scheduled health check job
654        let producer = TestJobProducer::new();
655        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
656        let scheduled_timestamp = calculate_scheduled_timestamp(60);
657        let result = producer
658            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
659            .await;
660        assert!(result.is_ok());
661
662        let queue = producer.get_queue().await;
663        assert!(queue.relayer_health_check_queue.schedule_called);
664    }
665
666    #[test]
667    fn test_job_producer_error_conversion() {
668        // Test error conversion without using specific Redis error types
669        let job_error = JobProducerError::QueueError("Test error".to_string());
670        let relayer_error: RelayerError = job_error.into();
671
672        match relayer_error {
673            RelayerError::QueueError(msg) => {
674                assert_eq!(msg, "Queue error");
675            }
676            _ => panic!("Unexpected error type"),
677        }
678    }
679
680    #[tokio::test]
681    async fn test_get_queue() {
682        let producer = TestJobProducer::new();
683
684        // Get the queue
685        let queue = producer.get_queue().await;
686
687        // Verify the queue is valid and has the expected structure
688        assert!(!queue.transaction_request_queue.push_called);
689        assert!(!queue.transaction_request_queue.schedule_called);
690        assert!(!queue.transaction_submission_queue.push_called);
691        assert!(!queue.notification_queue.push_called);
692        assert!(!queue.solana_token_swap_request_queue.push_called);
693        assert!(!queue.relayer_health_check_queue.push_called);
694    }
695
696    #[tokio::test]
697    async fn test_produce_relayer_health_check_job_immediate() {
698        let producer = TestJobProducer::new();
699
700        // Test immediate health check job (no scheduling)
701        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
702        let result = producer
703            .produce_relayer_health_check_job(health_check, None)
704            .await;
705
706        // Should succeed
707        assert!(result.is_ok());
708
709        // Verify the job was pushed (not scheduled)
710        let queue = producer.get_queue().await;
711        assert!(queue.relayer_health_check_queue.push_called);
712        assert!(!queue.relayer_health_check_queue.schedule_called);
713
714        // Other queues should not be affected
715        assert!(!queue.transaction_request_queue.push_called);
716        assert!(!queue.transaction_submission_queue.push_called);
717        assert!(!queue.transaction_status_queue.push_called);
718        assert!(!queue.notification_queue.push_called);
719        assert!(!queue.solana_token_swap_request_queue.push_called);
720    }
721
722    #[tokio::test]
723    async fn test_produce_relayer_health_check_job_scheduled() {
724        let producer = TestJobProducer::new();
725
726        // Test scheduled health check job
727        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
728        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
729        let result = producer
730            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
731            .await;
732
733        // Should succeed
734        assert!(result.is_ok());
735
736        // Verify the job was scheduled (not pushed)
737        let queue = producer.get_queue().await;
738        assert!(queue.relayer_health_check_queue.schedule_called);
739        assert!(!queue.relayer_health_check_queue.push_called);
740
741        // Other queues should not be affected
742        assert!(!queue.transaction_request_queue.push_called);
743        assert!(!queue.transaction_submission_queue.push_called);
744        assert!(!queue.transaction_status_queue.push_called);
745        assert!(!queue.notification_queue.push_called);
746        assert!(!queue.solana_token_swap_request_queue.push_called);
747    }
748
749    #[tokio::test]
750    async fn test_produce_relayer_health_check_job_multiple_relayers() {
751        let producer = TestJobProducer::new();
752
753        // Produce health check jobs for multiple relayers
754        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
755
756        for relayer_id in &relayer_ids {
757            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
758            let result = producer
759                .produce_relayer_health_check_job(health_check, None)
760                .await;
761            assert!(result.is_ok());
762        }
763
764        // Verify jobs were produced
765        let queue = producer.get_queue().await;
766        assert!(queue.relayer_health_check_queue.push_called);
767    }
768
769    #[tokio::test]
770    async fn test_status_check_routes_to_evm_queue() {
771        use crate::models::NetworkType;
772        let producer = TestJobProducer::new();
773
774        let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
775        let result = producer
776            .produce_check_transaction_status_job(status_job, None)
777            .await;
778
779        assert!(result.is_ok());
780        let queue = producer.get_queue().await;
781        assert!(queue.transaction_status_queue_evm.push_called);
782        assert!(!queue.transaction_status_queue_stellar.push_called);
783        assert!(!queue.transaction_status_queue.push_called);
784    }
785
786    #[tokio::test]
787    async fn test_status_check_routes_to_stellar_queue() {
788        use crate::models::NetworkType;
789        let producer = TestJobProducer::new();
790
791        let status_job =
792            TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
793        let result = producer
794            .produce_check_transaction_status_job(status_job, None)
795            .await;
796
797        assert!(result.is_ok());
798        let queue = producer.get_queue().await;
799        assert!(queue.transaction_status_queue_stellar.push_called);
800        assert!(!queue.transaction_status_queue_evm.push_called);
801        assert!(!queue.transaction_status_queue.push_called);
802    }
803
804    #[tokio::test]
805    async fn test_status_check_routes_to_default_queue_for_solana() {
806        use crate::models::NetworkType;
807        let producer = TestJobProducer::new();
808
809        let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
810        let result = producer
811            .produce_check_transaction_status_job(status_job, None)
812            .await;
813
814        assert!(result.is_ok());
815        let queue = producer.get_queue().await;
816        assert!(queue.transaction_status_queue.push_called);
817        assert!(!queue.transaction_status_queue_evm.push_called);
818        assert!(!queue.transaction_status_queue_stellar.push_called);
819    }
820
821    #[tokio::test]
822    async fn test_status_check_scheduled_evm() {
823        use crate::models::NetworkType;
824        let producer = TestJobProducer::new();
825
826        let status_job =
827            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
828        let scheduled_timestamp = calculate_scheduled_timestamp(30);
829        let result = producer
830            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
831            .await;
832
833        assert!(result.is_ok());
834        let queue = producer.get_queue().await;
835        assert!(queue.transaction_status_queue_evm.schedule_called);
836        assert!(!queue.transaction_status_queue_evm.push_called);
837    }
838
839    #[tokio::test]
840    async fn test_submit_transaction_scheduled() {
841        let producer = TestJobProducer::new();
842
843        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
844        let scheduled_timestamp = calculate_scheduled_timestamp(15);
845        let result = producer
846            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
847            .await;
848
849        assert!(result.is_ok());
850        let queue = producer.get_queue().await;
851        assert!(queue.transaction_submission_queue.schedule_called);
852        assert!(!queue.transaction_submission_queue.push_called);
853    }
854
855    #[tokio::test]
856    async fn test_notification_job_scheduled() {
857        let producer = TestJobProducer::new();
858
859        let notification = WebhookNotification::new(
860            "test_scheduled_event".to_string(),
861            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
862                EvmTransactionResponse {
863                    id: "tx-notify-scheduled".to_string(),
864                    hash: Some("0xabc123".to_string()),
865                    status: TransactionStatus::Confirmed,
866                    status_reason: None,
867                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
868                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
869                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
870                    gas_price: Some(1000000000),
871                    gas_limit: Some(21000),
872                    nonce: Some(1),
873                    value: U256::from(1000000000000000000_u64),
874                    from: "0xabc".to_string(),
875                    to: Some("0xdef".to_string()),
876                    relayer_id: "relayer-1".to_string(),
877                    data: None,
878                    max_fee_per_gas: None,
879                    max_priority_fee_per_gas: None,
880                    signature: None,
881                    speed: None,
882                },
883            ))),
884        );
885        let job = NotificationSend::new("notification-scheduled".to_string(), notification);
886
887        let scheduled_timestamp = calculate_scheduled_timestamp(5);
888        let result = producer
889            .produce_send_notification_job(job, Some(scheduled_timestamp))
890            .await;
891
892        assert!(result.is_ok());
893        let queue = producer.get_queue().await;
894        assert!(queue.notification_queue.schedule_called);
895        assert!(!queue.notification_queue.push_called);
896    }
897
898    #[tokio::test]
899    async fn test_solana_swap_job_immediate() {
900        let producer = TestJobProducer::new();
901
902        let swap_job = SolanaTokenSwapRequest::new("relayer-solana".to_string());
903        let result = producer
904            .produce_solana_token_swap_request_job(swap_job, None)
905            .await;
906
907        assert!(result.is_ok());
908        let queue = producer.get_queue().await;
909        assert!(queue.solana_token_swap_request_queue.push_called);
910        assert!(!queue.solana_token_swap_request_queue.schedule_called);
911    }
912
913    #[tokio::test]
914    async fn test_solana_swap_job_scheduled() {
915        let producer = TestJobProducer::new();
916
917        let swap_job = SolanaTokenSwapRequest::new("relayer-solana".to_string());
918        let scheduled_timestamp = calculate_scheduled_timestamp(20);
919        let result = producer
920            .produce_solana_token_swap_request_job(swap_job, Some(scheduled_timestamp))
921            .await;
922
923        assert!(result.is_ok());
924        let queue = producer.get_queue().await;
925        assert!(queue.solana_token_swap_request_queue.schedule_called);
926        assert!(!queue.solana_token_swap_request_queue.push_called);
927    }
928
929    #[tokio::test]
930    async fn test_transaction_send_cancel_job() {
931        let producer = TestJobProducer::new();
932
933        let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
934        let result = producer
935            .produce_submit_transaction_job(cancel_job, None)
936            .await;
937
938        assert!(result.is_ok());
939        let queue = producer.get_queue().await;
940        assert!(queue.transaction_submission_queue.push_called);
941    }
942
943    #[tokio::test]
944    async fn test_transaction_send_resubmit_job() {
945        let producer = TestJobProducer::new();
946
947        let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
948        let result = producer
949            .produce_submit_transaction_job(resubmit_job, None)
950            .await;
951
952        assert!(result.is_ok());
953        let queue = producer.get_queue().await;
954        assert!(queue.transaction_submission_queue.push_called);
955    }
956
957    #[tokio::test]
958    async fn test_transaction_send_resend_job() {
959        let producer = TestJobProducer::new();
960
961        let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
962        let result = producer
963            .produce_submit_transaction_job(resend_job, None)
964            .await;
965
966        assert!(result.is_ok());
967        let queue = producer.get_queue().await;
968        assert!(queue.transaction_submission_queue.push_called);
969    }
970
971    #[tokio::test]
972    async fn test_multiple_jobs_different_queues() {
973        let producer = TestJobProducer::new();
974
975        // Produce different types of jobs
976        let request = TransactionRequest::new("tx1", "relayer-1");
977        producer
978            .produce_transaction_request_job(request, None)
979            .await
980            .unwrap();
981
982        let submit = TransactionSend::submit("tx2", "relayer-1");
983        producer
984            .produce_submit_transaction_job(submit, None)
985            .await
986            .unwrap();
987
988        use crate::models::NetworkType;
989        let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
990        producer
991            .produce_check_transaction_status_job(status, None)
992            .await
993            .unwrap();
994
995        // Verify all queues were used
996        let queue = producer.get_queue().await;
997        assert!(queue.transaction_request_queue.push_called);
998        assert!(queue.transaction_submission_queue.push_called);
999        assert!(queue.transaction_status_queue_evm.push_called);
1000    }
1001
1002    #[test]
1003    fn test_job_producer_clone() {
1004        let producer = TestJobProducer::new();
1005        let cloned_producer = producer.clone();
1006
1007        // Both should be valid instances
1008        // The clone creates a new Mutex with a cloned Queue
1009        assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1010    }
1011
1012    #[tokio::test]
1013    async fn test_transaction_request_with_metadata() {
1014        let producer = TestJobProducer::new();
1015
1016        let mut metadata = std::collections::HashMap::new();
1017        metadata.insert("retry_count".to_string(), "3".to_string());
1018
1019        let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1020
1021        let result = producer
1022            .produce_transaction_request_job(request, None)
1023            .await;
1024
1025        assert!(result.is_ok());
1026        let queue = producer.get_queue().await;
1027        assert!(queue.transaction_request_queue.push_called);
1028    }
1029
1030    #[tokio::test]
1031    async fn test_status_check_with_metadata() {
1032        use crate::models::NetworkType;
1033        let producer = TestJobProducer::new();
1034
1035        let mut metadata = std::collections::HashMap::new();
1036        metadata.insert("attempt".to_string(), "2".to_string());
1037
1038        let status =
1039            TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1040                .with_metadata(metadata);
1041
1042        let result = producer
1043            .produce_check_transaction_status_job(status, None)
1044            .await;
1045
1046        assert!(result.is_ok());
1047        let queue = producer.get_queue().await;
1048        assert!(queue.transaction_status_queue_stellar.push_called);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_scheduled_jobs_with_different_delays() {
1053        let producer = TestJobProducer::new();
1054
1055        // Test with various scheduling delays
1056        let delays = vec![1, 10, 60, 300, 3600]; // 1s, 10s, 1m, 5m, 1h
1057
1058        for (idx, delay) in delays.iter().enumerate() {
1059            let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1060            let timestamp = calculate_scheduled_timestamp(*delay);
1061
1062            let result = producer
1063                .produce_transaction_request_job(request, Some(timestamp))
1064                .await;
1065
1066            assert!(
1067                result.is_ok(),
1068                "Failed to schedule job with delay {}",
1069                delay
1070            );
1071        }
1072    }
1073
1074    #[test]
1075    fn test_job_producer_error_display() {
1076        let error = JobProducerError::QueueError("Test queue error".to_string());
1077        let error_string = error.to_string();
1078
1079        assert!(error_string.contains("Queue error"));
1080        assert!(error_string.contains("Test queue error"));
1081    }
1082
1083    #[test]
1084    fn test_job_producer_error_to_relayer_error() {
1085        let job_error = JobProducerError::QueueError("Connection failed".to_string());
1086        let relayer_error: RelayerError = job_error.into();
1087
1088        match relayer_error {
1089            RelayerError::QueueError(msg) => {
1090                assert_eq!(msg, "Queue error");
1091            }
1092            _ => panic!("Expected QueueError variant"),
1093        }
1094    }
1095}