1use crate::constants::{
7 MAXIMUM_SOLANA_TX_ATTEMPTS, SOLANA_DEFAULT_TX_VALID_TIMESPAN,
8 SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS, SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS,
9 SOLANA_PENDING_TIMEOUT_MINUTES, SOLANA_SENT_TIMEOUT_MINUTES,
10};
11use crate::models::{NetworkTransactionData, SolanaTransactionData};
12use crate::services::provider::SolanaProviderError;
13use chrono::{DateTime, Duration, Utc};
14use solana_commitment_config::CommitmentConfig;
15use solana_sdk::{signature::Signature, transaction::Transaction as SolanaTransaction};
16use std::str::FromStr;
17use tracing::{debug, error, info, warn};
18
19use super::{utils::decode_solana_transaction, SolanaRelayerTransaction};
20use crate::domain::transaction::common::is_final_state;
21use crate::domain::transaction::solana::utils::{
22 is_resubmitable, map_solana_status_to_transaction_status, too_many_solana_attempts,
23};
24use crate::{
25 jobs::{JobProducerTrait, TransactionRequest, TransactionSend},
26 models::{
27 RelayerRepoModel, SolanaTransactionStatus, TransactionError, TransactionRepoModel,
28 TransactionStatus, TransactionUpdateRequest,
29 },
30 repositories::{transaction::TransactionRepository, RelayerRepository, Repository},
31 services::{provider::SolanaProviderTrait, signer::SolanaSignTrait},
32};
33
34impl<P, RR, TR, J, S> SolanaRelayerTransaction<P, RR, TR, J, S>
35where
36 P: SolanaProviderTrait + Send + Sync + 'static,
37 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
38 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
39 J: JobProducerTrait + Send + Sync + 'static,
40 S: SolanaSignTrait + Send + Sync + 'static,
41{
42 pub async fn handle_transaction_status_impl(
49 &self,
50 mut tx: TransactionRepoModel,
51 ) -> Result<TransactionRepoModel, TransactionError> {
52 debug!(tx_id = %tx.id, status = ?tx.status, "handling solana transaction status");
53
54 if is_final_state(&tx.status) {
56 debug!(status = ?tx.status, "transaction already in final state");
57 return Ok(tx);
58 }
59
60 let detected_status = self.check_onchain_transaction_status(&tx).await?;
62
63 if tx.status != detected_status {
67 tx = self
68 .transaction_repository()
69 .get_by_id(tx.id.clone())
70 .await?;
71 }
72
73 match detected_status {
75 TransactionStatus::Pending => {
76 self.handle_pending_status(tx).await
78 }
79 TransactionStatus::Sent | TransactionStatus::Submitted => {
80 self.handle_resubmit_or_expiration(tx).await
82 }
83 TransactionStatus::Mined
84 | TransactionStatus::Confirmed
85 | TransactionStatus::Failed
86 | TransactionStatus::Canceled
87 | TransactionStatus::Expired => {
88 self.update_transaction_status_if_needed(tx, detected_status)
89 .await
90 }
91 }
92 }
93
94 async fn check_onchain_transaction_status(
100 &self,
101 tx: &TransactionRepoModel,
102 ) -> Result<TransactionStatus, TransactionError> {
103 match tx.status {
105 TransactionStatus::Pending | TransactionStatus::Sent => {
106 return Ok(tx.status.clone());
107 }
108 _ => {}
109 }
110
111 let solana_data = tx.network_data.get_solana_transaction_data()?;
113 let signature_str = solana_data.signature.as_ref().ok_or_else(|| {
114 TransactionError::ValidationError("Transaction signature is missing".to_string())
115 })?;
116
117 let signature = Signature::from_str(signature_str).map_err(|e| {
118 TransactionError::ValidationError(format!("Invalid signature format: {e}"))
119 })?;
120
121 match self.provider().get_transaction_status(&signature).await {
123 Ok(solana_status) => {
124 Ok(map_solana_status_to_transaction_status(solana_status))
126 }
127 Err(e) => {
128 warn!(
130 tx_id = %tx.id,
131 signature = %signature_str,
132 error = %e,
133 "error getting transaction status from chain"
134 );
135 Ok(tx.status.clone())
137 }
138 }
139 }
140
141 async fn update_transaction_status_and_send_notification(
149 &self,
150 tx: TransactionRepoModel,
151 new_status: TransactionStatus,
152 network_data: Option<crate::models::NetworkTransactionData>,
153 ) -> Result<TransactionRepoModel, TransactionError> {
154 let update_request = TransactionUpdateRequest {
155 status: Some(new_status.clone()),
156 network_data,
157 confirmed_at: if matches!(new_status, TransactionStatus::Confirmed) {
158 Some(Utc::now().to_rfc3339())
159 } else {
160 None
161 },
162 ..Default::default()
163 };
164
165 let updated_tx = self
167 .transaction_repository()
168 .partial_update(tx.id.clone(), update_request)
169 .await
170 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
171
172 if let Err(e) = self.send_transaction_update_notification(&updated_tx).await {
175 error!(
176 tx_id = %updated_tx.id,
177 status = ?new_status,
178 "sending transaction update notification failed: {:?}",
179 e
180 );
181 }
182
183 Ok(updated_tx)
184 }
185
186 async fn update_transaction_status_if_needed(
190 &self,
191 tx: TransactionRepoModel,
192 new_status: TransactionStatus,
193 ) -> Result<TransactionRepoModel, TransactionError> {
194 if tx.status != new_status {
195 return self
196 .update_transaction_status_and_send_notification(tx, new_status, None)
197 .await;
198 }
199 Ok(tx)
200 }
201
202 async fn handle_pending_status(
207 &self,
208 tx: TransactionRepoModel,
209 ) -> Result<TransactionRepoModel, TransactionError> {
210 if self.is_valid_until_expired(&tx) {
212 info!(
213 tx_id = %tx.id,
214 valid_until = ?tx.valid_until,
215 "pending transaction valid_until has expired"
216 );
217 return self
218 .mark_as_expired(
219 tx,
220 "Transaction valid_until timestamp has expired".to_string(),
221 )
222 .await;
223 }
224
225 if self.has_exceeded_timeout(&tx)? {
228 warn!(
229 tx_id = %tx.id,
230 timeout_minutes = SOLANA_PENDING_TIMEOUT_MINUTES,
231 "pending transaction has exceeded timeout, marking as failed"
232 );
233 return self
234 .mark_as_failed(
235 tx,
236 format!(
237 "Transaction stuck in Pending status for more than {SOLANA_PENDING_TIMEOUT_MINUTES} minutes"
238 ),
239 )
240 .await;
241 }
242
243 let age = self.get_time_since_sent_or_created_at(&tx).ok_or_else(|| {
246 TransactionError::UnexpectedError(
247 "Both sent_at and created_at are missing or invalid".to_string(),
248 )
249 })?;
250
251 if age.num_seconds() >= SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS {
254 info!(
255 tx_id = %tx.id,
256 age_seconds = age.num_seconds(),
257 "pending transaction may be stuck, scheduling recovery job"
258 );
259
260 let transaction_request = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
261
262 self.job_producer()
263 .produce_transaction_request_job(transaction_request, None)
264 .await
265 .map_err(|e| {
266 TransactionError::UnexpectedError(format!(
267 "Failed to enqueue transaction request job: {e}"
268 ))
269 })?;
270 } else {
271 debug!(
272 tx_id = %tx.id,
273 age_seconds = age.num_seconds(),
274 "pending transaction too young for recovery check"
275 );
276 }
277
278 Ok(tx)
279 }
280
281 fn get_time_since_sent_or_created_at(&self, tx: &TransactionRepoModel) -> Option<Duration> {
286 let timestamp = tx.sent_at.as_ref().or(Some(&tx.created_at))?;
288 match DateTime::parse_from_rfc3339(timestamp) {
289 Ok(dt) => Some(Utc::now().signed_duration_since(dt.with_timezone(&Utc))),
290 Err(e) => {
291 warn!(tx_id = %tx.id, ts = %timestamp, error = %e, "failed to parse timestamp");
292 None
293 }
294 }
295 }
296
297 async fn check_any_signature_on_chain(
311 &self,
312 tx: &TransactionRepoModel,
313 ) -> Result<Option<(String, SolanaTransactionStatus)>, TransactionError> {
314 for (idx, sig_str) in tx.hashes.iter().enumerate() {
316 let signature = match Signature::from_str(sig_str) {
317 Ok(sig) => sig,
318 Err(e) => {
319 warn!(
320 tx_id = %tx.id,
321 signature = %sig_str,
322 error = %e,
323 "invalid signature format in hashes, skipping"
324 );
325 continue;
326 }
327 };
328
329 match self.provider().get_transaction_status(&signature).await {
330 Ok(solana_status) => {
331 info!(
333 tx_id = %tx.id,
334 signature = %sig_str,
335 signature_idx = idx,
336 on_chain_status = ?solana_status,
337 "found transaction on-chain with previous signature"
338 );
339 return Ok(Some((sig_str.clone(), solana_status)));
340 }
341 Err(e) => {
342 debug!(
344 tx_id = %tx.id,
345 signature = %sig_str,
346 signature_idx = idx,
347 error = %e,
348 "signature not found on-chain or RPC error"
349 );
350 continue;
351 }
352 }
353 }
354
355 Ok(None)
357 }
358
359 async fn is_blockhash_valid(
363 &self,
364 transaction: &SolanaTransaction,
365 ) -> Result<bool, TransactionError> {
366 let blockhash = transaction.message.recent_blockhash;
367
368 match self
369 .provider()
370 .is_blockhash_valid(&blockhash, CommitmentConfig::confirmed())
371 .await
372 {
373 Ok(is_valid) => Ok(is_valid),
374 Err(e) => {
375 if matches!(e, SolanaProviderError::BlockhashNotFound(_)) {
377 info!("blockhash not found on chain, treating as expired");
378 return Ok(false);
379 }
380
381 warn!(
383 error = %e,
384 "error checking blockhash validity, propagating error for retry"
385 );
386 Err(TransactionError::UnderlyingSolanaProvider(e))
387 }
388 }
389 }
390
391 async fn mark_as_expired(
393 &self,
394 tx: TransactionRepoModel,
395 reason: String,
396 ) -> Result<TransactionRepoModel, TransactionError> {
397 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as expired");
398
399 let update_request = TransactionUpdateRequest {
400 status: Some(TransactionStatus::Expired),
401 status_reason: Some(reason),
402 ..Default::default()
403 };
404
405 self.transaction_repository()
406 .partial_update(tx.id.clone(), update_request)
407 .await
408 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))
409 }
410
411 async fn mark_as_failed(
413 &self,
414 tx: TransactionRepoModel,
415 reason: String,
416 ) -> Result<TransactionRepoModel, TransactionError> {
417 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as failed");
418
419 let update_request = TransactionUpdateRequest {
420 status: Some(TransactionStatus::Failed),
421 status_reason: Some(reason),
422 ..Default::default()
423 };
424
425 self.transaction_repository()
426 .partial_update(tx.id.clone(), update_request)
427 .await
428 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))
429 }
430
431 fn is_valid_until_expired(&self, tx: &TransactionRepoModel) -> bool {
437 if let Some(valid_until_str) = &tx.valid_until {
439 if let Ok(valid_until) = DateTime::parse_from_rfc3339(valid_until_str) {
440 return Utc::now() > valid_until.with_timezone(&Utc);
441 }
442 }
443
444 if let Ok(created_at) = DateTime::parse_from_rfc3339(&tx.created_at) {
446 let default_valid_until = created_at.with_timezone(&Utc)
447 + Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN);
448 return Utc::now() > default_valid_until;
449 }
450
451 false
454 }
455
456 fn has_exceeded_timeout(&self, tx: &TransactionRepoModel) -> Result<bool, TransactionError> {
458 let age = self.get_time_since_sent_or_created_at(tx).ok_or_else(|| {
459 TransactionError::UnexpectedError(
460 "Both sent_at and created_at are missing or invalid".to_string(),
461 )
462 })?;
463
464 let timeout = match tx.status {
465 TransactionStatus::Pending => Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES),
466 TransactionStatus::Sent => Duration::minutes(SOLANA_SENT_TIMEOUT_MINUTES),
467 _ => return Ok(false), };
470
471 Ok(age >= timeout)
472 }
473
474 async fn handle_resubmit_or_expiration(
486 &self,
487 tx: TransactionRepoModel,
488 ) -> Result<TransactionRepoModel, TransactionError> {
489 if self.is_valid_until_expired(&tx) {
491 info!(
492 tx_id = %tx.id,
493 valid_until = ?tx.valid_until,
494 "transaction valid_until has expired"
495 );
496 return self
497 .mark_as_expired(
498 tx,
499 "Transaction valid_until timestamp has expired".to_string(),
500 )
501 .await;
502 }
503
504 if tx.status == TransactionStatus::Submitted {
506 if too_many_solana_attempts(&tx) {
508 let attempt_count = tx.hashes.len();
509 warn!(
510 tx_id = %tx.id,
511 attempt_count = attempt_count,
512 max_attempts = MAXIMUM_SOLANA_TX_ATTEMPTS,
513 "transaction has exceeded maximum resubmission attempts"
514 );
515 return self
516 .mark_as_failed(
517 tx,
518 format!(
519 "Transaction exceeded maximum resubmission attempts ({attempt_count} > {MAXIMUM_SOLANA_TX_ATTEMPTS})"
520 ),
521 )
522 .await;
523 }
524 } else if self.has_exceeded_timeout(&tx)? {
525 let timeout_minutes = match tx.status {
527 TransactionStatus::Pending => SOLANA_PENDING_TIMEOUT_MINUTES,
528 TransactionStatus::Sent => SOLANA_SENT_TIMEOUT_MINUTES,
529 _ => 0,
530 };
531 let status = tx.status.clone();
532 warn!(
533 tx_id = %tx.id,
534 status = ?status,
535 timeout_minutes = timeout_minutes,
536 "transaction has exceeded timeout for status"
537 );
538 return self
539 .mark_as_failed(
540 tx,
541 format!(
542 "Transaction stuck in {status:?} status for more than {timeout_minutes} minutes"
543 ),
544 )
545 .await;
546 }
547
548 let time_since_sent = match self.get_time_since_sent_or_created_at(&tx) {
550 Some(duration) => duration,
551 None => {
552 debug!(tx_id = %tx.id, "both sent_at and created_at are missing or invalid, skipping resubmit check");
553 return Ok(tx);
554 }
555 };
556
557 if time_since_sent.num_seconds() < SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS {
558 debug!(
559 tx_id = %tx.id,
560 time_since_sent_secs = time_since_sent.num_seconds(),
561 min_age = SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS,
562 "transaction too young for blockhash expiration check"
563 );
564 return Ok(tx);
565 }
566
567 if let Some((found_signature, solana_status)) =
574 self.check_any_signature_on_chain(&tx).await?
575 {
576 info!(
577 tx_id = %tx.id,
578 signature = %found_signature,
579 on_chain_status = ?solana_status,
580 "transaction found on-chain with previous signature, updating to final state"
581 );
582
583 let new_status = map_solana_status_to_transaction_status(solana_status);
585
586 let solana_data = tx.network_data.get_solana_transaction_data()?;
588 let updated_solana_data = SolanaTransactionData {
589 signature: Some(found_signature),
590 ..solana_data
591 };
592 let updated_network_data = NetworkTransactionData::Solana(updated_solana_data);
593
594 return self
596 .update_transaction_status_and_send_notification(
597 tx,
598 new_status,
599 Some(updated_network_data),
600 )
601 .await;
602 }
603
604 let transaction = decode_solana_transaction(&tx)?;
606
607 let blockhash_valid = self.is_blockhash_valid(&transaction).await?;
609
610 if blockhash_valid {
611 debug!(
612 tx_id = %tx.id,
613 "blockhash still valid, no action needed"
614 );
615 return Ok(tx);
616 }
617
618 info!(
619 tx_id = %tx.id,
620 "blockhash has expired, checking if transaction can be resubmitted"
621 );
622
623 if is_resubmitable(&transaction) {
625 info!(
626 tx_id = %tx.id,
627 "transaction is resubmitable, enqueuing resubmit job"
628 );
629
630 self.job_producer()
632 .produce_submit_transaction_job(
633 TransactionSend::resubmit(tx.id.clone(), tx.relayer_id.clone()),
634 None,
635 )
636 .await
637 .map_err(|e| {
638 TransactionError::UnexpectedError(format!(
639 "Failed to enqueue resubmit job: {e}"
640 ))
641 })?;
642
643 info!(tx_id = %tx.id, "resubmit job enqueued successfully");
644 Ok(tx)
645 } else {
646 warn!(
648 tx_id = %tx.id,
649 num_signatures = transaction.message.header.num_required_signatures,
650 "transaction has expired blockhash but cannot be resubmitted (multi-sig)"
651 );
652
653 self.mark_as_expired(
654 tx,
655 format!(
656 "Blockhash expired and transaction requires {} signatures (cannot resubmit)",
657 transaction.message.header.num_required_signatures
658 ),
659 )
660 .await
661 }
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668 use crate::{
669 jobs::{MockJobProducerTrait, TransactionCommand},
670 models::{NetworkTransactionData, SolanaTransactionData},
671 repositories::{MockRelayerRepository, MockTransactionRepository},
672 services::{
673 provider::{MockSolanaProviderTrait, SolanaProviderError},
674 signer::MockSolanaSignTrait,
675 },
676 utils::{
677 base64_encode,
678 mocks::mockutils::{create_mock_solana_relayer, create_mock_solana_transaction},
679 },
680 };
681 use eyre::Result;
682 use mockall::predicate::*;
683 use solana_sdk::{hash::Hash, message::Message, pubkey::Pubkey};
684 use solana_system_interface::instruction as system_instruction;
685 use std::sync::Arc;
686
687 fn create_tx_with_signature(
689 status: TransactionStatus,
690 signature: Option<&str>,
691 ) -> TransactionRepoModel {
692 let mut tx = create_mock_solana_transaction();
693 tx.status = status;
694 if let Some(sig) = signature {
695 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
696 transaction: Some("test".to_string()),
697 instructions: None,
698 signature: Some(sig.to_string()),
699 });
700 }
701 tx
702 }
703
704 #[tokio::test]
705 async fn test_handle_status_already_final() {
706 let provider = MockSolanaProviderTrait::new();
707 let relayer_repo = Arc::new(MockRelayerRepository::new());
708 let tx_repo = Arc::new(MockTransactionRepository::new());
709 let job_producer = Arc::new(MockJobProducerTrait::new());
710 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
711
712 let handler = SolanaRelayerTransaction::new(
713 relayer,
714 relayer_repo,
715 Arc::new(provider),
716 tx_repo,
717 job_producer,
718 Arc::new(MockSolanaSignTrait::new()),
719 )
720 .unwrap();
721
722 let tx_confirmed = create_tx_with_signature(TransactionStatus::Confirmed, None);
724 let result = handler
725 .handle_transaction_status_impl(tx_confirmed.clone())
726 .await;
727 assert!(result.is_ok());
728 assert_eq!(result.unwrap().id, tx_confirmed.id);
729
730 let tx_failed = create_tx_with_signature(TransactionStatus::Failed, None);
732 let result = handler
733 .handle_transaction_status_impl(tx_failed.clone())
734 .await;
735 assert!(result.is_ok());
736 assert_eq!(result.unwrap().id, tx_failed.id);
737
738 let tx_expired = create_tx_with_signature(TransactionStatus::Expired, None);
740 let result = handler
741 .handle_transaction_status_impl(tx_expired.clone())
742 .await;
743 assert!(result.is_ok());
744 assert_eq!(result.unwrap().id, tx_expired.id);
745 }
746
747 #[tokio::test]
748 async fn test_handle_status_processed() -> Result<()> {
749 let mut provider = MockSolanaProviderTrait::new();
750 let relayer_repo = Arc::new(MockRelayerRepository::new());
751 let mut tx_repo = MockTransactionRepository::new();
752 let job_producer = MockJobProducerTrait::new();
753
754 let signature_str =
755 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
756 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
758
759 provider
761 .expect_get_transaction_status()
762 .with(eq(Signature::from_str(signature_str)?))
763 .times(1)
764 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
765
766 let tx_id = tx.id.clone();
767 let tx_id_clone = tx_id.clone();
768
769 tx_repo
771 .expect_get_by_id()
772 .with(eq(tx_id.clone()))
773 .times(1)
774 .returning(move |_| {
775 Ok(create_tx_with_signature(
776 TransactionStatus::Submitted, Some(signature_str),
778 ))
779 });
780
781 tx_repo
783 .expect_partial_update()
784 .withf(move |tx_id_param, update_req| {
785 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
786 })
787 .times(1)
788 .returning(move |_, _| {
789 Ok(create_tx_with_signature(
790 TransactionStatus::Mined,
791 Some(signature_str),
792 ))
793 });
794
795 let handler = SolanaRelayerTransaction::new(
796 create_mock_solana_relayer("test-relayer".to_string(), false),
797 relayer_repo,
798 Arc::new(provider),
799 Arc::new(tx_repo),
800 Arc::new(job_producer),
801 Arc::new(MockSolanaSignTrait::new()),
802 )?;
803
804 let result = handler.handle_transaction_status_impl(tx.clone()).await;
805
806 assert!(result.is_ok());
807 let updated_tx = result.unwrap();
808 assert_eq!(updated_tx.id, tx.id);
809 assert_eq!(updated_tx.status, TransactionStatus::Mined);
811 Ok(())
812 }
813
814 #[tokio::test]
815 async fn test_handle_status_confirmed() -> Result<()> {
816 let mut provider = MockSolanaProviderTrait::new();
817 let relayer_repo = Arc::new(MockRelayerRepository::new());
818 let mut tx_repo = MockTransactionRepository::new();
819 let job_producer = MockJobProducerTrait::new();
820
821 let signature_str =
822 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
823 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
824
825 provider
826 .expect_get_transaction_status()
827 .with(eq(Signature::from_str(signature_str)?))
828 .times(1)
829 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
830
831 let tx_id = tx.id.clone();
832 let tx_id_clone = tx_id.clone();
833
834 tx_repo
836 .expect_get_by_id()
837 .with(eq(tx_id.clone()))
838 .times(1)
839 .returning(move |_| {
840 Ok(create_tx_with_signature(
841 TransactionStatus::Submitted,
842 Some(signature_str),
843 ))
844 });
845
846 tx_repo
847 .expect_partial_update()
848 .withf(move |tx_id_param, update_req| {
849 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
850 })
851 .times(1)
852 .returning(move |_, _| {
853 Ok(create_tx_with_signature(
854 TransactionStatus::Mined,
855 Some(signature_str),
856 ))
857 });
858
859 let handler = SolanaRelayerTransaction::new(
860 create_mock_solana_relayer("test-relayer".to_string(), false),
861 relayer_repo,
862 Arc::new(provider),
863 Arc::new(tx_repo),
864 Arc::new(job_producer),
865 Arc::new(MockSolanaSignTrait::new()),
866 )?;
867
868 let result = handler.handle_transaction_status_impl(tx.clone()).await;
869
870 assert!(result.is_ok());
871 let updated_tx = result.unwrap();
872 assert_eq!(updated_tx.id, tx.id);
873 assert_eq!(updated_tx.status, TransactionStatus::Mined);
874 Ok(())
875 }
876
877 #[tokio::test]
878 async fn test_handle_status_finalized() -> Result<()> {
879 let mut provider = MockSolanaProviderTrait::new();
880 let relayer_repo = Arc::new(MockRelayerRepository::new());
881 let mut tx_repo = MockTransactionRepository::new();
882 let job_producer = MockJobProducerTrait::new();
883
884 let signature_str =
885 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
886 let tx = create_tx_with_signature(TransactionStatus::Mined, Some(signature_str));
887
888 provider
889 .expect_get_transaction_status()
890 .with(eq(Signature::from_str(signature_str)?))
891 .times(1)
892 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Finalized) }));
893
894 let tx_id = tx.id.clone();
895 let tx_id_clone = tx_id.clone();
896
897 tx_repo
899 .expect_get_by_id()
900 .with(eq(tx_id.clone()))
901 .times(1)
902 .returning(move |_| {
903 Ok(create_tx_with_signature(
904 TransactionStatus::Mined,
905 Some(signature_str),
906 ))
907 });
908
909 tx_repo
910 .expect_partial_update()
911 .withf(move |tx_id_param, update_req| {
912 tx_id_param == &tx_id_clone
913 && update_req.status == Some(TransactionStatus::Confirmed)
914 })
915 .times(1)
916 .returning(move |_, _| {
917 Ok(create_tx_with_signature(
918 TransactionStatus::Confirmed,
919 Some(signature_str),
920 ))
921 });
922
923 let handler = SolanaRelayerTransaction::new(
924 create_mock_solana_relayer("test-relayer".to_string(), false),
925 relayer_repo,
926 Arc::new(provider),
927 Arc::new(tx_repo),
928 Arc::new(job_producer),
929 Arc::new(MockSolanaSignTrait::new()),
930 )?;
931
932 let result = handler.handle_transaction_status_impl(tx.clone()).await;
933
934 assert!(result.is_ok());
935 let updated_tx = result.unwrap();
936 assert_eq!(updated_tx.id, tx.id);
937 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
938 Ok(())
939 }
940
941 #[tokio::test]
942 async fn test_handle_status_provider_error() -> Result<()> {
943 let mut provider = MockSolanaProviderTrait::new();
944 let relayer_repo = Arc::new(MockRelayerRepository::new());
945 let tx_repo = Arc::new(MockTransactionRepository::new());
946 let job_producer = MockJobProducerTrait::new();
947
948 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
949 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
951 let error_message = "Provider is down";
952
953 provider
956 .expect_get_transaction_status()
957 .with(eq(Signature::from_str(signature_str)?))
958 .times(1)
959 .returning(move |_| {
960 Box::pin(async { Err(SolanaProviderError::RpcError(error_message.to_string())) })
961 });
962
963 let handler = SolanaRelayerTransaction::new(
967 create_mock_solana_relayer("test-relayer".to_string(), false),
968 relayer_repo,
969 Arc::new(provider),
970 tx_repo,
971 Arc::new(job_producer),
972 Arc::new(MockSolanaSignTrait::new()),
973 )?;
974
975 let result = handler.handle_transaction_status_impl(tx.clone()).await;
976
977 assert!(result.is_ok());
980 let updated_tx = result.unwrap();
981 assert_eq!(updated_tx.status, TransactionStatus::Submitted); Ok(())
983 }
984
985 #[tokio::test]
986 async fn test_handle_status_failed() -> Result<()> {
987 let mut provider = MockSolanaProviderTrait::new();
988 let relayer_repo = Arc::new(MockRelayerRepository::new());
989 let mut tx_repo = MockTransactionRepository::new();
990 let job_producer = MockJobProducerTrait::new();
991
992 let signature_str =
993 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
994 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
995
996 provider
997 .expect_get_transaction_status()
998 .with(eq(Signature::from_str(signature_str)?))
999 .times(1)
1000 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Failed) }));
1001
1002 let tx_id = tx.id.clone();
1003 let tx_id_clone = tx_id.clone();
1004
1005 tx_repo
1007 .expect_get_by_id()
1008 .with(eq(tx_id.clone()))
1009 .times(1)
1010 .returning(move |_| {
1011 Ok(create_tx_with_signature(
1012 TransactionStatus::Submitted,
1013 Some(signature_str),
1014 ))
1015 });
1016
1017 tx_repo
1018 .expect_partial_update()
1019 .withf(move |tx_id_param, update_req| {
1020 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Failed)
1021 })
1022 .times(1)
1023 .returning(move |_, _| {
1024 Ok(create_tx_with_signature(
1025 TransactionStatus::Failed,
1026 Some(signature_str),
1027 ))
1028 });
1029
1030 let handler = SolanaRelayerTransaction::new(
1031 create_mock_solana_relayer("test-relayer".to_string(), false),
1032 relayer_repo,
1033 Arc::new(provider),
1034 Arc::new(tx_repo),
1035 Arc::new(job_producer),
1036 Arc::new(MockSolanaSignTrait::new()),
1037 )?;
1038
1039 let result = handler.handle_transaction_status_impl(tx.clone()).await;
1040
1041 assert!(result.is_ok());
1042 let updated_tx = result.unwrap();
1043 assert_eq!(updated_tx.id, tx.id);
1044 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1045 Ok(())
1046 }
1047
1048 #[tokio::test]
1049 async fn test_default_valid_until_expired() -> Result<()> {
1050 let provider = MockSolanaProviderTrait::new();
1051 let relayer_repo = Arc::new(MockRelayerRepository::new());
1052 let mut tx_repo = MockTransactionRepository::new();
1053 let job_producer = MockJobProducerTrait::new();
1054
1055 let old_created_at = (Utc::now()
1057 - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN + 60000))
1058 .to_rfc3339();
1059 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1060 tx.created_at = old_created_at;
1061 tx.valid_until = None; let tx_id = tx.id.clone();
1064
1065 tx_repo
1067 .expect_partial_update()
1068 .withf(move |tx_id_param, update_req| {
1069 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Expired)
1070 })
1071 .times(1)
1072 .returning(move |_, _| {
1073 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1074 expired_tx.status = TransactionStatus::Expired;
1075 Ok(expired_tx)
1076 });
1077
1078 let handler = SolanaRelayerTransaction::new(
1079 create_mock_solana_relayer("test-relayer".to_string(), false),
1080 relayer_repo,
1081 Arc::new(provider),
1082 Arc::new(tx_repo),
1083 Arc::new(job_producer),
1084 Arc::new(MockSolanaSignTrait::new()),
1085 )?;
1086
1087 let result = handler.handle_transaction_status_impl(tx).await;
1088
1089 assert!(result.is_ok());
1090 let updated_tx = result.unwrap();
1091 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1092 Ok(())
1093 }
1094
1095 #[tokio::test]
1096 async fn test_default_valid_until_not_expired() -> Result<()> {
1097 let mut provider = MockSolanaProviderTrait::new();
1098 let relayer_repo = Arc::new(MockRelayerRepository::new());
1099 let mut tx_repo = MockTransactionRepository::new();
1100 let job_producer = MockJobProducerTrait::new();
1101
1102 let recent_created_at = (Utc::now()
1104 - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN - 60000))
1105 .to_rfc3339();
1106 let signature_str =
1107 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1108 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1109 tx.created_at = recent_created_at.clone();
1110 tx.valid_until = None; let tx_id = tx.id.clone();
1113 let tx_id_clone = tx_id.clone();
1114 let recent_created_at_clone = recent_created_at.clone();
1115
1116 provider
1118 .expect_get_transaction_status()
1119 .with(eq(Signature::from_str(signature_str)?))
1120 .times(1)
1121 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
1122
1123 tx_repo
1125 .expect_get_by_id()
1126 .with(eq(tx_id.clone()))
1127 .times(1)
1128 .returning(move |_| {
1129 let mut tx =
1130 create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1131 tx.created_at = recent_created_at_clone.clone();
1132 tx.valid_until = None;
1133 Ok(tx)
1134 });
1135
1136 tx_repo
1138 .expect_partial_update()
1139 .withf(move |tx_id_param, update_req| {
1140 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
1141 })
1142 .times(1)
1143 .returning(move |_, _| {
1144 Ok(create_tx_with_signature(
1145 TransactionStatus::Mined,
1146 Some(signature_str),
1147 ))
1148 });
1149
1150 let handler = SolanaRelayerTransaction::new(
1151 create_mock_solana_relayer("test-relayer".to_string(), false),
1152 relayer_repo,
1153 Arc::new(provider),
1154 Arc::new(tx_repo),
1155 Arc::new(job_producer),
1156 Arc::new(MockSolanaSignTrait::new()),
1157 )?;
1158
1159 let result = handler.handle_transaction_status_impl(tx.clone()).await;
1160
1161 assert!(result.is_ok());
1162 let updated_tx = result.unwrap();
1163 assert_eq!(updated_tx.status, TransactionStatus::Mined);
1165 Ok(())
1166 }
1167
1168 #[tokio::test]
1169 async fn test_too_many_resubmission_attempts() -> Result<()> {
1170 let mut provider = MockSolanaProviderTrait::new();
1171 let relayer_repo = Arc::new(MockRelayerRepository::new());
1172 let mut tx_repo = MockTransactionRepository::new();
1173 let job_producer = MockJobProducerTrait::new();
1174
1175 let signature_str =
1177 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1178 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1179 tx.hashes = vec!["sig".to_string(); MAXIMUM_SOLANA_TX_ATTEMPTS + 1];
1180 tx.sent_at = Some(Utc::now().to_rfc3339()); let tx_id = tx.id.clone();
1183
1184 provider
1186 .expect_get_transaction_status()
1187 .with(eq(Signature::from_str(signature_str)?))
1188 .times(1)
1189 .returning(|_| {
1190 Box::pin(async {
1191 Err(crate::services::provider::SolanaProviderError::RpcError(
1192 "test error".to_string(),
1193 ))
1194 })
1195 });
1196
1197 tx_repo
1199 .expect_partial_update()
1200 .withf(move |tx_id_param, update_req| {
1201 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
1202 })
1203 .times(1)
1204 .returning(move |_, _| {
1205 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1206 failed_tx.status = TransactionStatus::Failed;
1207 Ok(failed_tx)
1208 });
1209
1210 let handler = SolanaRelayerTransaction::new(
1211 create_mock_solana_relayer("test-relayer".to_string(), false),
1212 relayer_repo,
1213 Arc::new(provider),
1214 Arc::new(tx_repo),
1215 Arc::new(job_producer),
1216 Arc::new(MockSolanaSignTrait::new()),
1217 )?;
1218
1219 let result = handler.handle_transaction_status_impl(tx).await;
1220
1221 assert!(result.is_ok());
1222 let updated_tx = result.unwrap();
1223 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1224 Ok(())
1225 }
1226
1227 #[tokio::test]
1228 async fn test_handle_pending_status_schedules_recovery_job() -> Result<()> {
1229 let provider = MockSolanaProviderTrait::new();
1230 let relayer_repo = Arc::new(MockRelayerRepository::new());
1231 let tx_repo = Arc::new(MockTransactionRepository::new());
1232 let mut job_producer = MockJobProducerTrait::new();
1233
1234 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1236 tx.created_at = (Utc::now()
1237 - Duration::seconds(SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS + 10))
1238 .to_rfc3339();
1239
1240 let tx_id = tx.id.clone();
1241
1242 job_producer
1244 .expect_produce_transaction_request_job()
1245 .withf(move |job, _delay| job.transaction_id == tx_id)
1246 .times(1)
1247 .returning(|_, _| Box::pin(async { Ok(()) }));
1248
1249 let handler = SolanaRelayerTransaction::new(
1250 create_mock_solana_relayer("test-relayer".to_string(), false),
1251 relayer_repo,
1252 Arc::new(provider),
1253 tx_repo,
1254 Arc::new(job_producer),
1255 Arc::new(MockSolanaSignTrait::new()),
1256 )?;
1257
1258 let result = handler.handle_pending_status(tx.clone()).await;
1259
1260 assert!(result.is_ok());
1261 let returned_tx = result.unwrap();
1262 assert_eq!(returned_tx.status, TransactionStatus::Pending); Ok(())
1264 }
1265
1266 #[tokio::test]
1267 async fn test_handle_pending_status_too_young() -> Result<()> {
1268 let provider = MockSolanaProviderTrait::new();
1269 let relayer_repo = Arc::new(MockRelayerRepository::new());
1270 let tx_repo = Arc::new(MockTransactionRepository::new());
1271 let job_producer = Arc::new(MockJobProducerTrait::new());
1272
1273 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1275 tx.created_at = (Utc::now()
1276 - Duration::seconds(SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS - 10))
1277 .to_rfc3339();
1278
1279 let handler = SolanaRelayerTransaction::new(
1280 create_mock_solana_relayer("test-relayer".to_string(), false),
1281 relayer_repo,
1282 Arc::new(provider),
1283 tx_repo,
1284 job_producer,
1285 Arc::new(MockSolanaSignTrait::new()),
1286 )?;
1287
1288 let result = handler.handle_pending_status(tx.clone()).await;
1289
1290 assert!(result.is_ok());
1291 let returned_tx = result.unwrap();
1292 assert_eq!(returned_tx.status, TransactionStatus::Pending); Ok(())
1294 }
1295
1296 #[tokio::test]
1297 async fn test_handle_pending_status_timeout() -> Result<()> {
1298 let provider = MockSolanaProviderTrait::new();
1299 let relayer_repo = Arc::new(MockRelayerRepository::new());
1300 let mut tx_repo = MockTransactionRepository::new();
1301 let job_producer = Arc::new(MockJobProducerTrait::new());
1302
1303 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1305 tx.created_at =
1306 (Utc::now() - Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES + 1)).to_rfc3339();
1307
1308 let tx_id = tx.id.clone();
1309
1310 tx_repo
1312 .expect_partial_update()
1313 .withf(move |tx_id_param, update_req| {
1314 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
1315 })
1316 .times(1)
1317 .returning(move |_, _| {
1318 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1319 failed_tx.status = TransactionStatus::Failed;
1320 Ok(failed_tx)
1321 });
1322
1323 let handler = SolanaRelayerTransaction::new(
1324 create_mock_solana_relayer("test-relayer".to_string(), false),
1325 relayer_repo,
1326 Arc::new(provider),
1327 Arc::new(tx_repo),
1328 job_producer,
1329 Arc::new(MockSolanaSignTrait::new()),
1330 )?;
1331
1332 let result = handler.handle_pending_status(tx).await;
1333
1334 assert!(result.is_ok());
1335 let updated_tx = result.unwrap();
1336 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1337 Ok(())
1338 }
1339
1340 #[tokio::test]
1341 async fn test_handle_resubmit_blockhash_expired_resubmitable() -> Result<()> {
1342 let mut provider = MockSolanaProviderTrait::new();
1343 let relayer_repo = Arc::new(MockRelayerRepository::new());
1344 let tx_repo = Arc::new(MockTransactionRepository::new());
1345 let mut job_producer = MockJobProducerTrait::new();
1346
1347 let payer = Pubkey::new_unique();
1349 let instruction =
1350 solana_system_interface::instruction::transfer(&payer, &Pubkey::new_unique(), 1000);
1351 let mut transaction = SolanaTransaction::new_with_payer(&[instruction], Some(&payer));
1352 transaction.message.recent_blockhash = Hash::from_str("11111111111111111111111111111112")?;
1353 let transaction_bytes = bincode::serialize(&transaction)?;
1354 let transaction_b64 = base64_encode(&transaction_bytes);
1355
1356 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1358 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1359 tx.sent_at = Some(
1360 (Utc::now() - Duration::seconds(SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS + 10))
1361 .to_rfc3339(),
1362 );
1363 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
1364 transaction: Some(transaction_b64),
1365 instructions: None,
1366 signature: Some(signature_str.to_string()),
1367 });
1368
1369 let tx_id = tx.id.clone();
1370
1371 provider
1373 .expect_is_blockhash_valid()
1374 .with(
1375 eq(Hash::from_str("11111111111111111111111111111112")?),
1376 eq(CommitmentConfig::confirmed()),
1377 )
1378 .times(1)
1379 .returning(|_, _| Box::pin(async { Ok(false) })); job_producer
1383 .expect_produce_submit_transaction_job()
1384 .withf(move |job, _delay| {
1385 matches!(job.command, TransactionCommand::Resubmit) && job.transaction_id == tx_id
1386 })
1387 .times(1)
1388 .returning(|_, _| Box::pin(async { Ok(()) }));
1389
1390 let handler = SolanaRelayerTransaction::new(
1391 create_mock_solana_relayer("test-relayer".to_string(), false),
1392 relayer_repo,
1393 Arc::new(provider),
1394 tx_repo,
1395 Arc::new(job_producer),
1396 Arc::new(MockSolanaSignTrait::new()),
1397 )?;
1398
1399 let result = handler.handle_resubmit_or_expiration(tx.clone()).await;
1400
1401 assert!(result.is_ok());
1402 let returned_tx = result.unwrap();
1403 assert_eq!(returned_tx.status, TransactionStatus::Submitted); Ok(())
1405 }
1406
1407 #[tokio::test]
1408 async fn test_handle_resubmit_blockhash_expired_not_resubmitable() -> Result<()> {
1409 let mut provider = MockSolanaProviderTrait::new();
1410 let relayer_repo = Arc::new(MockRelayerRepository::new());
1411 let mut tx_repo = MockTransactionRepository::new();
1412 let job_producer = Arc::new(MockJobProducerTrait::new());
1413
1414 let payer = Pubkey::new_unique();
1416 let recipient = Pubkey::new_unique();
1417 let additional_signer = Pubkey::new_unique();
1418 let instruction = system_instruction::transfer(&payer, &recipient, 1000);
1419
1420 let mut message = Message::new(&[instruction], Some(&payer));
1422 message.account_keys.push(additional_signer);
1423 message.header.num_required_signatures = 2; message.recent_blockhash = Hash::from_str("11111111111111111111111111111112")?;
1425
1426 let transaction = SolanaTransaction::new_unsigned(message);
1427 let transaction_bytes = bincode::serialize(&transaction)?;
1428 let transaction_b64 = base64_encode(&transaction_bytes);
1429
1430 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1431 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1432 tx.sent_at = Some(
1433 (Utc::now() - Duration::seconds(SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS + 10))
1434 .to_rfc3339(),
1435 );
1436 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
1437 transaction: Some(transaction_b64),
1438 instructions: None,
1439 signature: Some(signature_str.to_string()),
1440 });
1441
1442 let tx_id = tx.id.clone();
1443
1444 provider
1446 .expect_is_blockhash_valid()
1447 .with(
1448 eq(Hash::from_str("11111111111111111111111111111112")?),
1449 eq(CommitmentConfig::confirmed()),
1450 )
1451 .times(1)
1452 .returning(|_, _| Box::pin(async { Ok(false) })); tx_repo
1456 .expect_partial_update()
1457 .withf(move |tx_id_param, update_req| {
1458 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Expired)
1459 })
1460 .times(1)
1461 .returning(move |_, _| {
1462 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1463 expired_tx.status = TransactionStatus::Expired;
1464 Ok(expired_tx)
1465 });
1466
1467 let handler = SolanaRelayerTransaction::new(
1468 create_mock_solana_relayer("test-relayer".to_string(), false),
1469 relayer_repo,
1470 Arc::new(provider),
1471 Arc::new(tx_repo),
1472 job_producer,
1473 Arc::new(MockSolanaSignTrait::new()),
1474 )?;
1475
1476 let result = handler.handle_resubmit_or_expiration(tx).await;
1477
1478 assert!(result.is_ok());
1479 let updated_tx = result.unwrap();
1480 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1481 Ok(())
1482 }
1483
1484 #[tokio::test]
1485 async fn test_check_any_signature_on_chain_found() -> Result<()> {
1486 let mut provider = MockSolanaProviderTrait::new();
1487 let relayer_repo = Arc::new(MockRelayerRepository::new());
1488 let tx_repo = Arc::new(MockTransactionRepository::new());
1489 let job_producer = Arc::new(MockJobProducerTrait::new());
1490
1491 let signature1 = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1492 let signature2 = "3XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1493
1494 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature1));
1495 tx.hashes = vec![signature1.to_string(), signature2.to_string()];
1496
1497 provider
1499 .expect_get_transaction_status()
1500 .with(eq(Signature::from_str(signature1)?))
1501 .times(1)
1502 .returning(|_| {
1503 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1504 });
1505
1506 provider
1507 .expect_get_transaction_status()
1508 .with(eq(Signature::from_str(signature2)?))
1509 .times(1)
1510 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
1511
1512 let handler = SolanaRelayerTransaction::new(
1513 create_mock_solana_relayer("test-relayer".to_string(), false),
1514 relayer_repo,
1515 Arc::new(provider),
1516 tx_repo,
1517 job_producer,
1518 Arc::new(MockSolanaSignTrait::new()),
1519 )?;
1520
1521 let result = handler.check_any_signature_on_chain(&tx).await;
1522
1523 assert!(result.is_ok());
1524 let found = result.unwrap();
1525 assert!(found.is_some());
1526 let (found_sig, status) = found.unwrap();
1527 assert_eq!(found_sig, signature2);
1528 assert_eq!(status, SolanaTransactionStatus::Processed);
1529 Ok(())
1530 }
1531
1532 #[tokio::test]
1533 async fn test_check_any_signature_on_chain_not_found() -> Result<()> {
1534 let mut provider = MockSolanaProviderTrait::new();
1535 let relayer_repo = Arc::new(MockRelayerRepository::new());
1536 let tx_repo = Arc::new(MockTransactionRepository::new());
1537 let job_producer = Arc::new(MockJobProducerTrait::new());
1538
1539 let signature1 = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1540 let signature2 = "3XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1541
1542 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature1));
1543 tx.hashes = vec![signature1.to_string(), signature2.to_string()];
1544
1545 provider
1547 .expect_get_transaction_status()
1548 .with(eq(Signature::from_str(signature1)?))
1549 .times(1)
1550 .returning(|_| {
1551 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1552 });
1553
1554 provider
1555 .expect_get_transaction_status()
1556 .with(eq(Signature::from_str(signature2)?))
1557 .times(1)
1558 .returning(|_| {
1559 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1560 });
1561
1562 let handler = SolanaRelayerTransaction::new(
1563 create_mock_solana_relayer("test-relayer".to_string(), false),
1564 relayer_repo,
1565 Arc::new(provider),
1566 tx_repo,
1567 job_producer,
1568 Arc::new(MockSolanaSignTrait::new()),
1569 )?;
1570
1571 let result = handler.check_any_signature_on_chain(&tx).await;
1572
1573 assert!(result.is_ok());
1574 let found = result.unwrap();
1575 assert!(found.is_none());
1576 Ok(())
1577 }
1578
1579 #[tokio::test]
1580 async fn test_is_blockhash_valid_true() -> Result<()> {
1581 let mut provider = MockSolanaProviderTrait::new();
1582 let relayer_repo = Arc::new(MockRelayerRepository::new());
1583 let tx_repo = Arc::new(MockTransactionRepository::new());
1584 let job_producer = Arc::new(MockJobProducerTrait::new());
1585
1586 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1587
1588 provider
1589 .expect_is_blockhash_valid()
1590 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1591 .times(1)
1592 .returning(|_, _| Box::pin(async { Ok(true) }));
1593
1594 let handler = SolanaRelayerTransaction::new(
1595 create_mock_solana_relayer("test-relayer".to_string(), false),
1596 relayer_repo,
1597 Arc::new(provider),
1598 tx_repo,
1599 job_producer,
1600 Arc::new(MockSolanaSignTrait::new()),
1601 )?;
1602
1603 let mut transaction =
1604 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1605 transaction.message.recent_blockhash = blockhash;
1606
1607 let result = handler.is_blockhash_valid(&transaction).await;
1608
1609 assert!(result.is_ok());
1610 assert!(result.unwrap());
1611 Ok(())
1612 }
1613
1614 #[tokio::test]
1615 async fn test_is_blockhash_valid_false() -> Result<()> {
1616 let mut provider = MockSolanaProviderTrait::new();
1617 let relayer_repo = Arc::new(MockRelayerRepository::new());
1618 let tx_repo = Arc::new(MockTransactionRepository::new());
1619 let job_producer = Arc::new(MockJobProducerTrait::new());
1620
1621 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1622
1623 provider
1624 .expect_is_blockhash_valid()
1625 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1626 .times(1)
1627 .returning(|_, _| Box::pin(async { Ok(false) }));
1628
1629 let handler = SolanaRelayerTransaction::new(
1630 create_mock_solana_relayer("test-relayer".to_string(), false),
1631 relayer_repo,
1632 Arc::new(provider),
1633 tx_repo,
1634 job_producer,
1635 Arc::new(MockSolanaSignTrait::new()),
1636 )?;
1637
1638 let mut transaction =
1639 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1640 transaction.message.recent_blockhash = blockhash;
1641
1642 let result = handler.is_blockhash_valid(&transaction).await;
1643
1644 assert!(result.is_ok());
1645 assert!(!result.unwrap());
1646 Ok(())
1647 }
1648
1649 #[tokio::test]
1650 async fn test_is_blockhash_valid_error() -> Result<()> {
1651 let mut provider = MockSolanaProviderTrait::new();
1652 let relayer_repo = Arc::new(MockRelayerRepository::new());
1653 let tx_repo = Arc::new(MockTransactionRepository::new());
1654 let job_producer = Arc::new(MockJobProducerTrait::new());
1655
1656 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1657
1658 provider
1659 .expect_is_blockhash_valid()
1660 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1661 .times(1)
1662 .returning(|_, _| {
1663 Box::pin(async { Err(SolanaProviderError::RpcError("test error".to_string())) })
1664 });
1665
1666 let handler = SolanaRelayerTransaction::new(
1667 create_mock_solana_relayer("test-relayer".to_string(), false),
1668 relayer_repo,
1669 Arc::new(provider),
1670 tx_repo,
1671 job_producer,
1672 Arc::new(MockSolanaSignTrait::new()),
1673 )?;
1674
1675 let mut transaction =
1676 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1677 transaction.message.recent_blockhash = blockhash;
1678
1679 let result = handler.is_blockhash_valid(&transaction).await;
1680
1681 assert!(result.is_err());
1682 let error = result.unwrap_err();
1683 match error {
1684 TransactionError::UnderlyingSolanaProvider(_) => {} _ => panic!("Expected UnderlyingSolanaProvider error"),
1686 }
1687 Ok(())
1688 }
1689
1690 #[tokio::test]
1691 async fn test_get_time_since_sent_or_created_at_with_sent_at() {
1692 let provider = MockSolanaProviderTrait::new();
1693 let relayer_repo = Arc::new(MockRelayerRepository::new());
1694 let tx_repo = Arc::new(MockTransactionRepository::new());
1695 let job_producer = Arc::new(MockJobProducerTrait::new());
1696
1697 let handler = SolanaRelayerTransaction::new(
1698 create_mock_solana_relayer("test-relayer".to_string(), false),
1699 relayer_repo,
1700 Arc::new(provider),
1701 tx_repo,
1702 job_producer,
1703 Arc::new(MockSolanaSignTrait::new()),
1704 )
1705 .unwrap();
1706
1707 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1708 let past_time = Utc::now() - Duration::minutes(5);
1709 tx.sent_at = Some(past_time.to_rfc3339());
1710
1711 let result = handler.get_time_since_sent_or_created_at(&tx);
1712
1713 assert!(result.is_some());
1714 let duration = result.unwrap();
1715 assert!(duration.num_minutes() >= 5);
1716 }
1717
1718 #[tokio::test]
1719 async fn test_get_time_since_sent_or_created_at_with_created_at() {
1720 let provider = MockSolanaProviderTrait::new();
1721 let relayer_repo = Arc::new(MockRelayerRepository::new());
1722 let tx_repo = Arc::new(MockTransactionRepository::new());
1723 let job_producer = Arc::new(MockJobProducerTrait::new());
1724
1725 let handler = SolanaRelayerTransaction::new(
1726 create_mock_solana_relayer("test-relayer".to_string(), false),
1727 relayer_repo,
1728 Arc::new(provider),
1729 tx_repo,
1730 job_producer,
1731 Arc::new(MockSolanaSignTrait::new()),
1732 )
1733 .unwrap();
1734
1735 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1736 let past_time = Utc::now() - Duration::minutes(10);
1737 tx.created_at = past_time.to_rfc3339();
1738 tx.sent_at = None; let result = handler.get_time_since_sent_or_created_at(&tx);
1741
1742 assert!(result.is_some());
1743 let duration = result.unwrap();
1744 assert!(duration.num_minutes() >= 10);
1745 }
1746
1747 #[tokio::test]
1748 async fn test_has_exceeded_timeout_pending() {
1749 let provider = MockSolanaProviderTrait::new();
1750 let relayer_repo = Arc::new(MockRelayerRepository::new());
1751 let tx_repo = Arc::new(MockTransactionRepository::new());
1752 let job_producer = Arc::new(MockJobProducerTrait::new());
1753
1754 let handler = SolanaRelayerTransaction::new(
1755 create_mock_solana_relayer("test-relayer".to_string(), false),
1756 relayer_repo,
1757 Arc::new(provider),
1758 tx_repo,
1759 job_producer,
1760 Arc::new(MockSolanaSignTrait::new()),
1761 )
1762 .unwrap();
1763
1764 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1765 tx.created_at =
1766 (Utc::now() - Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES + 1)).to_rfc3339();
1767
1768 let result = handler.has_exceeded_timeout(&tx);
1769
1770 assert!(result.is_ok());
1771 assert!(result.unwrap());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_has_exceeded_timeout_sent() {
1776 let provider = MockSolanaProviderTrait::new();
1777 let relayer_repo = Arc::new(MockRelayerRepository::new());
1778 let tx_repo = Arc::new(MockTransactionRepository::new());
1779 let job_producer = Arc::new(MockJobProducerTrait::new());
1780
1781 let handler = SolanaRelayerTransaction::new(
1782 create_mock_solana_relayer("test-relayer".to_string(), false),
1783 relayer_repo,
1784 Arc::new(provider),
1785 tx_repo,
1786 job_producer,
1787 Arc::new(MockSolanaSignTrait::new()),
1788 )
1789 .unwrap();
1790
1791 let mut tx = create_tx_with_signature(TransactionStatus::Sent, None);
1792 tx.sent_at =
1793 Some((Utc::now() - Duration::minutes(SOLANA_SENT_TIMEOUT_MINUTES + 1)).to_rfc3339());
1794
1795 let result = handler.has_exceeded_timeout(&tx);
1796
1797 assert!(result.is_ok());
1798 assert!(result.unwrap());
1799 }
1800
1801 #[tokio::test]
1802 async fn test_is_valid_until_expired_user_provided() {
1803 let provider = MockSolanaProviderTrait::new();
1804 let relayer_repo = Arc::new(MockRelayerRepository::new());
1805 let tx_repo = Arc::new(MockTransactionRepository::new());
1806 let job_producer = Arc::new(MockJobProducerTrait::new());
1807
1808 let handler = SolanaRelayerTransaction::new(
1809 create_mock_solana_relayer("test-relayer".to_string(), false),
1810 relayer_repo,
1811 Arc::new(provider),
1812 tx_repo,
1813 job_producer,
1814 Arc::new(MockSolanaSignTrait::new()),
1815 )
1816 .unwrap();
1817
1818 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1819 let past_time = Utc::now() - Duration::minutes(1);
1820 tx.valid_until = Some(past_time.to_rfc3339());
1821
1822 assert!(handler.is_valid_until_expired(&tx));
1823 }
1824
1825 #[tokio::test]
1826 async fn test_is_valid_until_expired_default() {
1827 let provider = MockSolanaProviderTrait::new();
1828 let relayer_repo = Arc::new(MockRelayerRepository::new());
1829 let tx_repo = Arc::new(MockTransactionRepository::new());
1830 let job_producer = Arc::new(MockJobProducerTrait::new());
1831
1832 let handler = SolanaRelayerTransaction::new(
1833 create_mock_solana_relayer("test-relayer".to_string(), false),
1834 relayer_repo,
1835 Arc::new(provider),
1836 tx_repo,
1837 job_producer,
1838 Arc::new(MockSolanaSignTrait::new()),
1839 )
1840 .unwrap();
1841
1842 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1843 let past_time =
1844 Utc::now() - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN + 1000);
1845 tx.created_at = past_time.to_rfc3339();
1846 tx.valid_until = None; assert!(handler.is_valid_until_expired(&tx));
1849 }
1850
1851 #[tokio::test]
1852 async fn test_mark_as_expired() -> Result<()> {
1853 let provider = MockSolanaProviderTrait::new();
1854 let relayer_repo = Arc::new(MockRelayerRepository::new());
1855 let mut tx_repo = MockTransactionRepository::new();
1856 let job_producer = Arc::new(MockJobProducerTrait::new());
1857
1858 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
1859 let tx_id = tx.id.clone();
1860 let reason = "Test expiration";
1861
1862 tx_repo
1863 .expect_partial_update()
1864 .withf(move |tx_id_param, update_req| {
1865 tx_id_param == &tx_id
1866 && update_req.status == Some(TransactionStatus::Expired)
1867 && update_req.status_reason == Some(reason.to_string())
1868 })
1869 .times(1)
1870 .returning(move |_, _| {
1871 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1872 expired_tx.status = TransactionStatus::Expired;
1873 Ok(expired_tx)
1874 });
1875
1876 let handler = SolanaRelayerTransaction::new(
1877 create_mock_solana_relayer("test-relayer".to_string(), false),
1878 relayer_repo,
1879 Arc::new(provider),
1880 Arc::new(tx_repo),
1881 job_producer,
1882 Arc::new(MockSolanaSignTrait::new()),
1883 )?;
1884
1885 let result = handler.mark_as_expired(tx, reason.to_string()).await;
1886
1887 assert!(result.is_ok());
1888 let updated_tx = result.unwrap();
1889 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1890 Ok(())
1891 }
1892
1893 #[tokio::test]
1894 async fn test_mark_as_failed() -> Result<()> {
1895 let provider = MockSolanaProviderTrait::new();
1896 let relayer_repo = Arc::new(MockRelayerRepository::new());
1897 let mut tx_repo = MockTransactionRepository::new();
1898 let job_producer = Arc::new(MockJobProducerTrait::new());
1899
1900 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
1901 let tx_id = tx.id.clone();
1902 let reason = "Test failure";
1903
1904 tx_repo
1905 .expect_partial_update()
1906 .withf(move |tx_id_param, update_req| {
1907 tx_id_param == &tx_id
1908 && update_req.status == Some(TransactionStatus::Failed)
1909 && update_req.status_reason == Some(reason.to_string())
1910 })
1911 .times(1)
1912 .returning(move |_, _| {
1913 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1914 failed_tx.status = TransactionStatus::Failed;
1915 Ok(failed_tx)
1916 });
1917
1918 let handler = SolanaRelayerTransaction::new(
1919 create_mock_solana_relayer("test-relayer".to_string(), false),
1920 relayer_repo,
1921 Arc::new(provider),
1922 Arc::new(tx_repo),
1923 job_producer,
1924 Arc::new(MockSolanaSignTrait::new()),
1925 )?;
1926
1927 let result = handler.mark_as_failed(tx, reason.to_string()).await;
1928
1929 assert!(result.is_ok());
1930 let updated_tx = result.unwrap();
1931 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1932 Ok(())
1933 }
1934
1935 #[tokio::test]
1936 async fn test_update_transaction_status_and_send_notification() -> Result<()> {
1937 let provider = MockSolanaProviderTrait::new();
1938 let relayer_repo = Arc::new(MockRelayerRepository::new());
1939 let mut tx_repo = MockTransactionRepository::new();
1940 let mut job_producer = MockJobProducerTrait::new();
1941
1942 let mut relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
1944 relayer.notification_id = Some("test-notification".to_string());
1945
1946 let tx = create_tx_with_signature(TransactionStatus::Submitted, None);
1947 let tx_id = tx.id.clone();
1948 let new_status = TransactionStatus::Confirmed;
1949
1950 tx_repo
1951 .expect_partial_update()
1952 .withf(move |tx_id_param, update_req| {
1953 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Confirmed)
1954 })
1955 .times(1)
1956 .returning(move |_, _| {
1957 let mut confirmed_tx = create_tx_with_signature(TransactionStatus::Confirmed, None);
1958 confirmed_tx.status = TransactionStatus::Confirmed;
1959 Ok(confirmed_tx)
1960 });
1961
1962 job_producer
1963 .expect_produce_send_notification_job()
1964 .times(1)
1965 .returning(|_, _| Box::pin(async { Ok(()) }));
1966
1967 let handler = SolanaRelayerTransaction::new(
1968 relayer,
1969 relayer_repo,
1970 Arc::new(provider),
1971 Arc::new(tx_repo),
1972 Arc::new(job_producer),
1973 Arc::new(MockSolanaSignTrait::new()),
1974 )?;
1975
1976 let result = handler
1977 .update_transaction_status_and_send_notification(tx, new_status, None)
1978 .await;
1979
1980 assert!(result.is_ok());
1981 let updated_tx = result.unwrap();
1982 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
1983 Ok(())
1984 }
1985}