1use crate::{
10 jobs::{
11 Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, error};
24
25use super::{JobType, SolanaTokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize)]
31pub enum JobProducerError {
32 #[error("Queue error: {0}")]
33 QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37 fn from(_: RedisError) -> Self {
38 JobProducerError::QueueError("Queue error".to_string())
39 }
40}
41
42impl From<JobProducerError> for RelayerError {
43 fn from(_: JobProducerError) -> Self {
44 RelayerError::QueueError("Queue error".to_string())
45 }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50 queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54 fn clone(&self) -> Self {
55 let queue = self
58 .queue
59 .try_lock()
60 .expect("Failed to lock queue for cloning")
61 .clone();
62
63 Self {
64 queue: Mutex::new(queue),
65 }
66 }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72 async fn produce_transaction_request_job(
73 &self,
74 transaction_process_job: TransactionRequest,
75 scheduled_on: Option<i64>,
76 ) -> Result<(), JobProducerError>;
77
78 async fn produce_submit_transaction_job(
79 &self,
80 transaction_submit_job: TransactionSend,
81 scheduled_on: Option<i64>,
82 ) -> Result<(), JobProducerError>;
83
84 async fn produce_check_transaction_status_job(
85 &self,
86 transaction_status_check_job: TransactionStatusCheck,
87 scheduled_on: Option<i64>,
88 ) -> Result<(), JobProducerError>;
89
90 async fn produce_send_notification_job(
91 &self,
92 notification_send_job: NotificationSend,
93 scheduled_on: Option<i64>,
94 ) -> Result<(), JobProducerError>;
95
96 async fn produce_solana_token_swap_request_job(
97 &self,
98 solana_swap_request_job: SolanaTokenSwapRequest,
99 scheduled_on: Option<i64>,
100 ) -> Result<(), JobProducerError>;
101
102 async fn produce_relayer_health_check_job(
103 &self,
104 relayer_health_check_job: RelayerHealthCheck,
105 scheduled_on: Option<i64>,
106 ) -> Result<(), JobProducerError>;
107
108 async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112 pub fn new(queue: Queue) -> Self {
113 Self {
114 queue: Mutex::new(queue.clone()),
115 }
116 }
117
118 pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119 let queue = self.queue.lock().await;
120
121 Ok(queue.clone())
122 }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128 let queue = self.queue.lock().await;
129
130 Ok(queue.clone())
131 }
132
133 async fn produce_transaction_request_job(
134 &self,
135 transaction_process_job: TransactionRequest,
136 scheduled_on: Option<i64>,
137 ) -> Result<(), JobProducerError> {
138 debug!(
139 "Producing transaction request job: {:?}",
140 transaction_process_job
141 );
142 let mut queue = self.queue.lock().await;
143 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144 .with_request_id(get_request_id());
145
146 match scheduled_on {
147 Some(scheduled_on) => {
148 queue
149 .transaction_request_queue
150 .schedule(job, scheduled_on)
151 .await?;
152 }
153 None => {
154 queue.transaction_request_queue.push(job).await?;
155 }
156 }
157 debug!("Transaction job produced successfully");
158
159 Ok(())
160 }
161
162 async fn produce_submit_transaction_job(
163 &self,
164 transaction_submit_job: TransactionSend,
165 scheduled_on: Option<i64>,
166 ) -> Result<(), JobProducerError> {
167 let mut queue = self.queue.lock().await;
168 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169 .with_request_id(get_request_id());
170
171 match scheduled_on {
172 Some(on) => {
173 queue.transaction_submission_queue.schedule(job, on).await?;
174 }
175 None => {
176 queue.transaction_submission_queue.push(job).await?;
177 }
178 }
179 debug!("Transaction Submit job produced successfully");
180
181 Ok(())
182 }
183
184 async fn produce_check_transaction_status_job(
185 &self,
186 transaction_status_check_job: TransactionStatusCheck,
187 scheduled_on: Option<i64>,
188 ) -> Result<(), JobProducerError> {
189 let mut queue = self.queue.lock().await;
190 let job = Job::new(
191 JobType::TransactionStatusCheck,
192 transaction_status_check_job.clone(),
193 )
194 .with_request_id(get_request_id());
195
196 use crate::models::NetworkType;
198 let status_queue = match transaction_status_check_job.network_type {
199 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
200 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
201 _ => &mut queue.transaction_status_queue, };
203
204 match scheduled_on {
205 Some(on) => {
206 status_queue.schedule(job, on).await?;
207 }
208 None => {
209 status_queue.push(job).await?;
210 }
211 }
212 debug!(
213 network_type = ?transaction_status_check_job.network_type,
214 "Transaction Status Check job produced successfully"
215 );
216 Ok(())
217 }
218
219 async fn produce_send_notification_job(
220 &self,
221 notification_send_job: NotificationSend,
222 scheduled_on: Option<i64>,
223 ) -> Result<(), JobProducerError> {
224 let mut queue = self.queue.lock().await;
225 let job = Job::new(JobType::NotificationSend, notification_send_job)
226 .with_request_id(get_request_id());
227
228 match scheduled_on {
229 Some(on) => {
230 queue.notification_queue.schedule(job, on).await?;
231 }
232 None => {
233 queue.notification_queue.push(job).await?;
234 }
235 }
236
237 debug!("Notification Send job produced successfully");
238 Ok(())
239 }
240
241 async fn produce_solana_token_swap_request_job(
242 &self,
243 solana_swap_request_job: SolanaTokenSwapRequest,
244 scheduled_on: Option<i64>,
245 ) -> Result<(), JobProducerError> {
246 let mut queue = self.queue.lock().await;
247 let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job)
248 .with_request_id(get_request_id());
249
250 match scheduled_on {
251 Some(on) => {
252 queue
253 .solana_token_swap_request_queue
254 .schedule(job, on)
255 .await?;
256 }
257 None => {
258 queue.solana_token_swap_request_queue.push(job).await?;
259 }
260 }
261
262 debug!("Solana token swap job produced successfully");
263 Ok(())
264 }
265
266 async fn produce_relayer_health_check_job(
267 &self,
268 relayer_health_check_job: RelayerHealthCheck,
269 scheduled_on: Option<i64>,
270 ) -> Result<(), JobProducerError> {
271 let job = Job::new(
272 JobType::RelayerHealthCheck,
273 relayer_health_check_job.clone(),
274 )
275 .with_request_id(get_request_id());
276
277 let mut queue = self.queue.lock().await;
278
279 match scheduled_on {
280 Some(scheduled_on) => {
281 queue
282 .relayer_health_check_queue
283 .schedule(job, scheduled_on)
284 .await?;
285 }
286 None => {
287 queue.relayer_health_check_queue.push(job).await?;
288 }
289 }
290
291 Ok(())
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::models::{
299 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
300 WebhookPayload, U256,
301 };
302 use crate::utils::calculate_scheduled_timestamp;
303
304 #[derive(Clone, Debug)]
305 struct TestRedisStorage<T> {
307 pub push_called: bool,
308 pub schedule_called: bool,
309 _phantom: std::marker::PhantomData<T>,
310 }
311
312 impl<T> TestRedisStorage<T> {
313 fn new() -> Self {
314 Self {
315 push_called: false,
316 schedule_called: false,
317 _phantom: std::marker::PhantomData,
318 }
319 }
320
321 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
322 self.push_called = true;
323 Ok(())
324 }
325
326 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
327 self.schedule_called = true;
328 Ok(())
329 }
330 }
331
332 #[derive(Clone, Debug)]
334 struct TestQueue {
335 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
336 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
337 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
338 pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
339 pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
340 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
341 pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
342 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
343 }
344
345 impl TestQueue {
346 fn new() -> Self {
347 Self {
348 transaction_request_queue: TestRedisStorage::new(),
349 transaction_submission_queue: TestRedisStorage::new(),
350 transaction_status_queue: TestRedisStorage::new(),
351 transaction_status_queue_evm: TestRedisStorage::new(),
352 transaction_status_queue_stellar: TestRedisStorage::new(),
353 notification_queue: TestRedisStorage::new(),
354 solana_token_swap_request_queue: TestRedisStorage::new(),
355 relayer_health_check_queue: TestRedisStorage::new(),
356 }
357 }
358 }
359
360 struct TestJobProducer {
362 queue: Mutex<TestQueue>,
363 }
364
365 impl Clone for TestJobProducer {
366 fn clone(&self) -> Self {
367 let queue = self
368 .queue
369 .try_lock()
370 .expect("Failed to lock queue for cloning")
371 .clone();
372 Self {
373 queue: Mutex::new(queue),
374 }
375 }
376 }
377
378 impl TestJobProducer {
379 fn new() -> Self {
380 Self {
381 queue: Mutex::new(TestQueue::new()),
382 }
383 }
384
385 async fn get_queue(&self) -> TestQueue {
386 self.queue.lock().await.clone()
387 }
388 }
389
390 #[async_trait]
391 impl JobProducerTrait for TestJobProducer {
392 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
393 unimplemented!("get_queue not used in tests")
394 }
395
396 async fn produce_transaction_request_job(
397 &self,
398 transaction_process_job: TransactionRequest,
399 scheduled_on: Option<i64>,
400 ) -> Result<(), JobProducerError> {
401 let mut queue = self.queue.lock().await;
402 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
403
404 match scheduled_on {
405 Some(scheduled_on) => {
406 queue
407 .transaction_request_queue
408 .schedule(job, scheduled_on)
409 .await?;
410 }
411 None => {
412 queue.transaction_request_queue.push(job).await?;
413 }
414 }
415
416 Ok(())
417 }
418
419 async fn produce_submit_transaction_job(
420 &self,
421 transaction_submit_job: TransactionSend,
422 scheduled_on: Option<i64>,
423 ) -> Result<(), JobProducerError> {
424 let mut queue = self.queue.lock().await;
425 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
426
427 match scheduled_on {
428 Some(on) => {
429 queue.transaction_submission_queue.schedule(job, on).await?;
430 }
431 None => {
432 queue.transaction_submission_queue.push(job).await?;
433 }
434 }
435
436 Ok(())
437 }
438
439 async fn produce_check_transaction_status_job(
440 &self,
441 transaction_status_check_job: TransactionStatusCheck,
442 scheduled_on: Option<i64>,
443 ) -> Result<(), JobProducerError> {
444 let mut queue = self.queue.lock().await;
445 let job = Job::new(
446 JobType::TransactionStatusCheck,
447 transaction_status_check_job.clone(),
448 );
449
450 use crate::models::NetworkType;
452 let status_queue = match transaction_status_check_job.network_type {
453 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
454 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
455 Some(NetworkType::Solana) => &mut queue.transaction_status_queue, None => &mut queue.transaction_status_queue, };
458
459 match scheduled_on {
460 Some(on) => {
461 status_queue.schedule(job, on).await?;
462 }
463 None => {
464 status_queue.push(job).await?;
465 }
466 }
467
468 Ok(())
469 }
470
471 async fn produce_send_notification_job(
472 &self,
473 notification_send_job: NotificationSend,
474 scheduled_on: Option<i64>,
475 ) -> Result<(), JobProducerError> {
476 let mut queue = self.queue.lock().await;
477 let job = Job::new(JobType::NotificationSend, notification_send_job);
478
479 match scheduled_on {
480 Some(on) => {
481 queue.notification_queue.schedule(job, on).await?;
482 }
483 None => {
484 queue.notification_queue.push(job).await?;
485 }
486 }
487
488 Ok(())
489 }
490
491 async fn produce_solana_token_swap_request_job(
492 &self,
493 solana_token_swap_request_job: SolanaTokenSwapRequest,
494 scheduled_on: Option<i64>,
495 ) -> Result<(), JobProducerError> {
496 let mut queue = self.queue.lock().await;
497 let job = Job::new(
498 JobType::SolanaTokenSwapRequest,
499 solana_token_swap_request_job,
500 );
501
502 match scheduled_on {
503 Some(on) => {
504 queue
505 .solana_token_swap_request_queue
506 .schedule(job, on)
507 .await?;
508 }
509 None => {
510 queue.solana_token_swap_request_queue.push(job).await?;
511 }
512 }
513
514 Ok(())
515 }
516
517 async fn produce_relayer_health_check_job(
518 &self,
519 relayer_health_check_job: RelayerHealthCheck,
520 scheduled_on: Option<i64>,
521 ) -> Result<(), JobProducerError> {
522 let mut queue = self.queue.lock().await;
523 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
524
525 match scheduled_on {
526 Some(scheduled_on) => {
527 queue
528 .relayer_health_check_queue
529 .schedule(job, scheduled_on)
530 .await?;
531 }
532 None => {
533 queue.relayer_health_check_queue.push(job).await?;
534 }
535 }
536
537 Ok(())
538 }
539 }
540
541 #[tokio::test]
542 async fn test_job_producer_operations() {
543 let producer = TestJobProducer::new();
544
545 let request = TransactionRequest::new("tx123", "relayer-1");
547 let result = producer
548 .produce_transaction_request_job(request, None)
549 .await;
550 assert!(result.is_ok());
551
552 let queue = producer.get_queue().await;
553 assert!(queue.transaction_request_queue.push_called);
554
555 let producer = TestJobProducer::new();
557 let request = TransactionRequest::new("tx123", "relayer-1");
558 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
560 .produce_transaction_request_job(request, Some(scheduled_timestamp))
561 .await;
562 assert!(result.is_ok());
563
564 let queue = producer.get_queue().await;
565 assert!(queue.transaction_request_queue.schedule_called);
566 }
567
568 #[tokio::test]
569 async fn test_submit_transaction_job() {
570 let producer = TestJobProducer::new();
571
572 let submit_job = TransactionSend::submit("tx123", "relayer-1");
574 let result = producer
575 .produce_submit_transaction_job(submit_job, None)
576 .await;
577 assert!(result.is_ok());
578
579 let queue = producer.get_queue().await;
580 assert!(queue.transaction_submission_queue.push_called);
581 }
582
583 #[tokio::test]
584 async fn test_check_status_job() {
585 use crate::models::NetworkType;
586 let producer = TestJobProducer::new();
587
588 let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
590 let result = producer
591 .produce_check_transaction_status_job(status_job, None)
592 .await;
593 assert!(result.is_ok());
594
595 let queue = producer.get_queue().await;
596 assert!(queue.transaction_status_queue_evm.push_called);
597 }
598
599 #[tokio::test]
600 async fn test_notification_job() {
601 let producer = TestJobProducer::new();
602
603 let notification = WebhookNotification::new(
605 "test_event".to_string(),
606 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
607 EvmTransactionResponse {
608 id: "tx123".to_string(),
609 hash: Some("0x123".to_string()),
610 status: TransactionStatus::Confirmed,
611 status_reason: None,
612 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
613 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
614 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
615 gas_price: Some(1000000000),
616 gas_limit: Some(21000),
617 nonce: Some(1),
618 value: U256::from(1000000000000000000_u64),
619 from: "0xabc".to_string(),
620 to: Some("0xdef".to_string()),
621 relayer_id: "relayer-1".to_string(),
622 data: None,
623 max_fee_per_gas: None,
624 max_priority_fee_per_gas: None,
625 signature: None,
626 speed: None,
627 },
628 ))),
629 );
630 let job = NotificationSend::new("notification-1".to_string(), notification);
631
632 let result = producer.produce_send_notification_job(job, None).await;
633 assert!(result.is_ok());
634
635 let queue = producer.get_queue().await;
636 assert!(queue.notification_queue.push_called);
637 }
638
639 #[tokio::test]
640 async fn test_relayer_health_check_job() {
641 let producer = TestJobProducer::new();
642
643 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
645 let result = producer
646 .produce_relayer_health_check_job(health_check, None)
647 .await;
648 assert!(result.is_ok());
649
650 let queue = producer.get_queue().await;
651 assert!(queue.relayer_health_check_queue.push_called);
652
653 let producer = TestJobProducer::new();
655 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
656 let scheduled_timestamp = calculate_scheduled_timestamp(60);
657 let result = producer
658 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
659 .await;
660 assert!(result.is_ok());
661
662 let queue = producer.get_queue().await;
663 assert!(queue.relayer_health_check_queue.schedule_called);
664 }
665
666 #[test]
667 fn test_job_producer_error_conversion() {
668 let job_error = JobProducerError::QueueError("Test error".to_string());
670 let relayer_error: RelayerError = job_error.into();
671
672 match relayer_error {
673 RelayerError::QueueError(msg) => {
674 assert_eq!(msg, "Queue error");
675 }
676 _ => panic!("Unexpected error type"),
677 }
678 }
679
680 #[tokio::test]
681 async fn test_get_queue() {
682 let producer = TestJobProducer::new();
683
684 let queue = producer.get_queue().await;
686
687 assert!(!queue.transaction_request_queue.push_called);
689 assert!(!queue.transaction_request_queue.schedule_called);
690 assert!(!queue.transaction_submission_queue.push_called);
691 assert!(!queue.notification_queue.push_called);
692 assert!(!queue.solana_token_swap_request_queue.push_called);
693 assert!(!queue.relayer_health_check_queue.push_called);
694 }
695
696 #[tokio::test]
697 async fn test_produce_relayer_health_check_job_immediate() {
698 let producer = TestJobProducer::new();
699
700 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
702 let result = producer
703 .produce_relayer_health_check_job(health_check, None)
704 .await;
705
706 assert!(result.is_ok());
708
709 let queue = producer.get_queue().await;
711 assert!(queue.relayer_health_check_queue.push_called);
712 assert!(!queue.relayer_health_check_queue.schedule_called);
713
714 assert!(!queue.transaction_request_queue.push_called);
716 assert!(!queue.transaction_submission_queue.push_called);
717 assert!(!queue.transaction_status_queue.push_called);
718 assert!(!queue.notification_queue.push_called);
719 assert!(!queue.solana_token_swap_request_queue.push_called);
720 }
721
722 #[tokio::test]
723 async fn test_produce_relayer_health_check_job_scheduled() {
724 let producer = TestJobProducer::new();
725
726 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
728 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
730 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
731 .await;
732
733 assert!(result.is_ok());
735
736 let queue = producer.get_queue().await;
738 assert!(queue.relayer_health_check_queue.schedule_called);
739 assert!(!queue.relayer_health_check_queue.push_called);
740
741 assert!(!queue.transaction_request_queue.push_called);
743 assert!(!queue.transaction_submission_queue.push_called);
744 assert!(!queue.transaction_status_queue.push_called);
745 assert!(!queue.notification_queue.push_called);
746 assert!(!queue.solana_token_swap_request_queue.push_called);
747 }
748
749 #[tokio::test]
750 async fn test_produce_relayer_health_check_job_multiple_relayers() {
751 let producer = TestJobProducer::new();
752
753 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
755
756 for relayer_id in &relayer_ids {
757 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
758 let result = producer
759 .produce_relayer_health_check_job(health_check, None)
760 .await;
761 assert!(result.is_ok());
762 }
763
764 let queue = producer.get_queue().await;
766 assert!(queue.relayer_health_check_queue.push_called);
767 }
768
769 #[tokio::test]
770 async fn test_status_check_routes_to_evm_queue() {
771 use crate::models::NetworkType;
772 let producer = TestJobProducer::new();
773
774 let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
775 let result = producer
776 .produce_check_transaction_status_job(status_job, None)
777 .await;
778
779 assert!(result.is_ok());
780 let queue = producer.get_queue().await;
781 assert!(queue.transaction_status_queue_evm.push_called);
782 assert!(!queue.transaction_status_queue_stellar.push_called);
783 assert!(!queue.transaction_status_queue.push_called);
784 }
785
786 #[tokio::test]
787 async fn test_status_check_routes_to_stellar_queue() {
788 use crate::models::NetworkType;
789 let producer = TestJobProducer::new();
790
791 let status_job =
792 TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
793 let result = producer
794 .produce_check_transaction_status_job(status_job, None)
795 .await;
796
797 assert!(result.is_ok());
798 let queue = producer.get_queue().await;
799 assert!(queue.transaction_status_queue_stellar.push_called);
800 assert!(!queue.transaction_status_queue_evm.push_called);
801 assert!(!queue.transaction_status_queue.push_called);
802 }
803
804 #[tokio::test]
805 async fn test_status_check_routes_to_default_queue_for_solana() {
806 use crate::models::NetworkType;
807 let producer = TestJobProducer::new();
808
809 let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
810 let result = producer
811 .produce_check_transaction_status_job(status_job, None)
812 .await;
813
814 assert!(result.is_ok());
815 let queue = producer.get_queue().await;
816 assert!(queue.transaction_status_queue.push_called);
817 assert!(!queue.transaction_status_queue_evm.push_called);
818 assert!(!queue.transaction_status_queue_stellar.push_called);
819 }
820
821 #[tokio::test]
822 async fn test_status_check_scheduled_evm() {
823 use crate::models::NetworkType;
824 let producer = TestJobProducer::new();
825
826 let status_job =
827 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
828 let scheduled_timestamp = calculate_scheduled_timestamp(30);
829 let result = producer
830 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
831 .await;
832
833 assert!(result.is_ok());
834 let queue = producer.get_queue().await;
835 assert!(queue.transaction_status_queue_evm.schedule_called);
836 assert!(!queue.transaction_status_queue_evm.push_called);
837 }
838
839 #[tokio::test]
840 async fn test_submit_transaction_scheduled() {
841 let producer = TestJobProducer::new();
842
843 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
844 let scheduled_timestamp = calculate_scheduled_timestamp(15);
845 let result = producer
846 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
847 .await;
848
849 assert!(result.is_ok());
850 let queue = producer.get_queue().await;
851 assert!(queue.transaction_submission_queue.schedule_called);
852 assert!(!queue.transaction_submission_queue.push_called);
853 }
854
855 #[tokio::test]
856 async fn test_notification_job_scheduled() {
857 let producer = TestJobProducer::new();
858
859 let notification = WebhookNotification::new(
860 "test_scheduled_event".to_string(),
861 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
862 EvmTransactionResponse {
863 id: "tx-notify-scheduled".to_string(),
864 hash: Some("0xabc123".to_string()),
865 status: TransactionStatus::Confirmed,
866 status_reason: None,
867 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
868 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
869 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
870 gas_price: Some(1000000000),
871 gas_limit: Some(21000),
872 nonce: Some(1),
873 value: U256::from(1000000000000000000_u64),
874 from: "0xabc".to_string(),
875 to: Some("0xdef".to_string()),
876 relayer_id: "relayer-1".to_string(),
877 data: None,
878 max_fee_per_gas: None,
879 max_priority_fee_per_gas: None,
880 signature: None,
881 speed: None,
882 },
883 ))),
884 );
885 let job = NotificationSend::new("notification-scheduled".to_string(), notification);
886
887 let scheduled_timestamp = calculate_scheduled_timestamp(5);
888 let result = producer
889 .produce_send_notification_job(job, Some(scheduled_timestamp))
890 .await;
891
892 assert!(result.is_ok());
893 let queue = producer.get_queue().await;
894 assert!(queue.notification_queue.schedule_called);
895 assert!(!queue.notification_queue.push_called);
896 }
897
898 #[tokio::test]
899 async fn test_solana_swap_job_immediate() {
900 let producer = TestJobProducer::new();
901
902 let swap_job = SolanaTokenSwapRequest::new("relayer-solana".to_string());
903 let result = producer
904 .produce_solana_token_swap_request_job(swap_job, None)
905 .await;
906
907 assert!(result.is_ok());
908 let queue = producer.get_queue().await;
909 assert!(queue.solana_token_swap_request_queue.push_called);
910 assert!(!queue.solana_token_swap_request_queue.schedule_called);
911 }
912
913 #[tokio::test]
914 async fn test_solana_swap_job_scheduled() {
915 let producer = TestJobProducer::new();
916
917 let swap_job = SolanaTokenSwapRequest::new("relayer-solana".to_string());
918 let scheduled_timestamp = calculate_scheduled_timestamp(20);
919 let result = producer
920 .produce_solana_token_swap_request_job(swap_job, Some(scheduled_timestamp))
921 .await;
922
923 assert!(result.is_ok());
924 let queue = producer.get_queue().await;
925 assert!(queue.solana_token_swap_request_queue.schedule_called);
926 assert!(!queue.solana_token_swap_request_queue.push_called);
927 }
928
929 #[tokio::test]
930 async fn test_transaction_send_cancel_job() {
931 let producer = TestJobProducer::new();
932
933 let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
934 let result = producer
935 .produce_submit_transaction_job(cancel_job, None)
936 .await;
937
938 assert!(result.is_ok());
939 let queue = producer.get_queue().await;
940 assert!(queue.transaction_submission_queue.push_called);
941 }
942
943 #[tokio::test]
944 async fn test_transaction_send_resubmit_job() {
945 let producer = TestJobProducer::new();
946
947 let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
948 let result = producer
949 .produce_submit_transaction_job(resubmit_job, None)
950 .await;
951
952 assert!(result.is_ok());
953 let queue = producer.get_queue().await;
954 assert!(queue.transaction_submission_queue.push_called);
955 }
956
957 #[tokio::test]
958 async fn test_transaction_send_resend_job() {
959 let producer = TestJobProducer::new();
960
961 let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
962 let result = producer
963 .produce_submit_transaction_job(resend_job, None)
964 .await;
965
966 assert!(result.is_ok());
967 let queue = producer.get_queue().await;
968 assert!(queue.transaction_submission_queue.push_called);
969 }
970
971 #[tokio::test]
972 async fn test_multiple_jobs_different_queues() {
973 let producer = TestJobProducer::new();
974
975 let request = TransactionRequest::new("tx1", "relayer-1");
977 producer
978 .produce_transaction_request_job(request, None)
979 .await
980 .unwrap();
981
982 let submit = TransactionSend::submit("tx2", "relayer-1");
983 producer
984 .produce_submit_transaction_job(submit, None)
985 .await
986 .unwrap();
987
988 use crate::models::NetworkType;
989 let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
990 producer
991 .produce_check_transaction_status_job(status, None)
992 .await
993 .unwrap();
994
995 let queue = producer.get_queue().await;
997 assert!(queue.transaction_request_queue.push_called);
998 assert!(queue.transaction_submission_queue.push_called);
999 assert!(queue.transaction_status_queue_evm.push_called);
1000 }
1001
1002 #[test]
1003 fn test_job_producer_clone() {
1004 let producer = TestJobProducer::new();
1005 let cloned_producer = producer.clone();
1006
1007 assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1010 }
1011
1012 #[tokio::test]
1013 async fn test_transaction_request_with_metadata() {
1014 let producer = TestJobProducer::new();
1015
1016 let mut metadata = std::collections::HashMap::new();
1017 metadata.insert("retry_count".to_string(), "3".to_string());
1018
1019 let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1020
1021 let result = producer
1022 .produce_transaction_request_job(request, None)
1023 .await;
1024
1025 assert!(result.is_ok());
1026 let queue = producer.get_queue().await;
1027 assert!(queue.transaction_request_queue.push_called);
1028 }
1029
1030 #[tokio::test]
1031 async fn test_status_check_with_metadata() {
1032 use crate::models::NetworkType;
1033 let producer = TestJobProducer::new();
1034
1035 let mut metadata = std::collections::HashMap::new();
1036 metadata.insert("attempt".to_string(), "2".to_string());
1037
1038 let status =
1039 TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1040 .with_metadata(metadata);
1041
1042 let result = producer
1043 .produce_check_transaction_status_job(status, None)
1044 .await;
1045
1046 assert!(result.is_ok());
1047 let queue = producer.get_queue().await;
1048 assert!(queue.transaction_status_queue_stellar.push_called);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_scheduled_jobs_with_different_delays() {
1053 let producer = TestJobProducer::new();
1054
1055 let delays = vec![1, 10, 60, 300, 3600]; for (idx, delay) in delays.iter().enumerate() {
1059 let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1060 let timestamp = calculate_scheduled_timestamp(*delay);
1061
1062 let result = producer
1063 .produce_transaction_request_job(request, Some(timestamp))
1064 .await;
1065
1066 assert!(
1067 result.is_ok(),
1068 "Failed to schedule job with delay {}",
1069 delay
1070 );
1071 }
1072 }
1073
1074 #[test]
1075 fn test_job_producer_error_display() {
1076 let error = JobProducerError::QueueError("Test queue error".to_string());
1077 let error_string = error.to_string();
1078
1079 assert!(error_string.contains("Queue error"));
1080 assert!(error_string.contains("Test queue error"));
1081 }
1082
1083 #[test]
1084 fn test_job_producer_error_to_relayer_error() {
1085 let job_error = JobProducerError::QueueError("Connection failed".to_string());
1086 let relayer_error: RelayerError = job_error.into();
1087
1088 match relayer_error {
1089 RelayerError::QueueError(msg) => {
1090 assert_eq!(msg, "Queue error");
1091 }
1092 _ => panic!("Expected QueueError variant"),
1093 }
1094 }
1095}