1use crate::{
7 constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8 domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9 jobs::{JobProducer, JobProducerTrait, TransactionRequest},
10 models::{
11 produce_transaction_update_notification_payload, NetworkTransactionRequest,
12 RelayerNetworkPolicy, RelayerRepoModel, TransactionError, TransactionRepoModel,
13 TransactionStatus, TransactionUpdateRequest,
14 },
15 repositories::{
16 RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17 TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18 },
19 services::{
20 provider::{StellarProvider, StellarProviderTrait},
21 signer::{Signer, StellarSigner},
22 },
23 utils::calculate_scheduled_timestamp,
24};
25use async_trait::async_trait;
26use std::sync::Arc;
27use tracing::{error, info};
28
29use super::lane_gate;
30
31#[allow(dead_code)]
32pub struct StellarRelayerTransaction<R, T, J, S, P, C>
33where
34 R: Repository<RelayerRepoModel, String>,
35 T: TransactionRepository,
36 J: JobProducerTrait,
37 S: Signer,
38 P: StellarProviderTrait,
39 C: TransactionCounterTrait,
40{
41 relayer: RelayerRepoModel,
42 relayer_repository: Arc<R>,
43 transaction_repository: Arc<T>,
44 job_producer: Arc<J>,
45 signer: Arc<S>,
46 provider: P,
47 transaction_counter_service: Arc<C>,
48}
49
50#[allow(dead_code)]
51impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
52where
53 R: Repository<RelayerRepoModel, String>,
54 T: TransactionRepository,
55 J: JobProducerTrait,
56 S: Signer,
57 P: StellarProviderTrait,
58 C: TransactionCounterTrait,
59{
60 #[allow(clippy::too_many_arguments)]
76 pub fn new(
77 relayer: RelayerRepoModel,
78 relayer_repository: Arc<R>,
79 transaction_repository: Arc<T>,
80 job_producer: Arc<J>,
81 signer: Arc<S>,
82 provider: P,
83 transaction_counter_service: Arc<C>,
84 ) -> Result<Self, TransactionError> {
85 Ok(Self {
86 relayer,
87 relayer_repository,
88 transaction_repository,
89 job_producer,
90 signer,
91 provider,
92 transaction_counter_service,
93 })
94 }
95
96 pub fn provider(&self) -> &P {
97 &self.provider
98 }
99
100 pub fn relayer(&self) -> &RelayerRepoModel {
101 &self.relayer
102 }
103
104 pub fn job_producer(&self) -> &J {
105 &self.job_producer
106 }
107
108 pub fn transaction_repository(&self) -> &T {
109 &self.transaction_repository
110 }
111
112 pub fn signer(&self) -> &S {
113 &self.signer
114 }
115
116 pub fn transaction_counter_service(&self) -> &C {
117 &self.transaction_counter_service
118 }
119
120 pub fn concurrent_transactions_enabled(&self) -> bool {
121 if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
122 policy
123 .concurrent_transactions
124 .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
125 } else {
126 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
127 }
128 }
129
130 pub async fn send_transaction_request_job(
132 &self,
133 tx: &TransactionRepoModel,
134 delay_seconds: Option<i64>,
135 ) -> Result<(), TransactionError> {
136 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
137 let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
138 self.job_producer()
139 .produce_transaction_request_job(job, scheduled_on)
140 .await?;
141 Ok(())
142 }
143
144 pub(super) async fn send_transaction_update_notification(&self, tx: &TransactionRepoModel) {
149 if let Some(notification_id) = &self.relayer().notification_id {
150 if let Err(e) = self
151 .job_producer()
152 .produce_send_notification_job(
153 produce_transaction_update_notification_payload(notification_id, tx),
154 None,
155 )
156 .await
157 {
158 error!(error = %e, "failed to produce notification job");
159 }
160 }
161 }
162
163 pub async fn finalize_transaction_state(
165 &self,
166 tx_id: String,
167 update_req: TransactionUpdateRequest,
168 ) -> Result<TransactionRepoModel, TransactionError> {
169 let updated_tx = self
170 .transaction_repository()
171 .partial_update(tx_id, update_req)
172 .await?;
173
174 self.send_transaction_update_notification(&updated_tx).await;
175 Ok(updated_tx)
176 }
177
178 pub async fn enqueue_next_pending_transaction(
179 &self,
180 finished_tx_id: &str,
181 ) -> Result<(), TransactionError> {
182 if !self.concurrent_transactions_enabled() {
183 if let Some(next) = self
184 .find_oldest_pending_for_relayer(&self.relayer().id)
185 .await?
186 {
187 info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
189 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
190 self.send_transaction_request_job(&next, None).await?;
191 } else {
192 info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
193 lane_gate::free(&self.relayer().id, finished_tx_id);
194 }
195 }
196 Ok(())
197 }
198
199 async fn find_oldest_pending_for_relayer(
201 &self,
202 relayer_id: &str,
203 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
204 let pending_txs = self
205 .transaction_repository()
206 .find_by_status(relayer_id, &[TransactionStatus::Pending])
207 .await
208 .map_err(TransactionError::from)?;
209
210 Ok(pending_txs.into_iter().next())
211 }
212
213 pub async fn sync_sequence_from_chain(
216 &self,
217 relayer_address: &str,
218 ) -> Result<(), TransactionError> {
219 info!(address = %relayer_address, "syncing sequence number from chain");
220
221 let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
223 .await
224 .map_err(TransactionError::UnexpectedError)?;
225
226 self.transaction_counter_service()
228 .set(&self.relayer().id, relayer_address, next_usable_seq)
229 .await
230 .map_err(|e| {
231 TransactionError::UnexpectedError(format!("Failed to update sequence counter: {e}"))
232 })?;
233
234 info!(sequence = %next_usable_seq, "updated local sequence counter");
235 Ok(())
236 }
237
238 pub async fn reset_transaction_for_retry(
241 &self,
242 tx: TransactionRepoModel,
243 ) -> Result<TransactionRepoModel, TransactionError> {
244 info!("resetting transaction for retry through pipeline");
245
246 let update_req = tx.create_reset_update_request()?;
248
249 let reset_tx = self
251 .transaction_repository()
252 .partial_update(tx.id.clone(), update_req)
253 .await?;
254
255 info!("transaction reset successfully to pre-prepare state");
256 Ok(reset_tx)
257 }
258}
259
260#[async_trait]
261impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
262where
263 R: Repository<RelayerRepoModel, String> + Send + Sync,
264 T: TransactionRepository + Send + Sync,
265 J: JobProducerTrait + Send + Sync,
266 S: Signer + Send + Sync,
267 P: StellarProviderTrait + Send + Sync,
268 C: TransactionCounterTrait + Send + Sync,
269{
270 async fn prepare_transaction(
271 &self,
272 tx: TransactionRepoModel,
273 ) -> Result<TransactionRepoModel, TransactionError> {
274 self.prepare_transaction_impl(tx).await
275 }
276
277 async fn submit_transaction(
278 &self,
279 tx: TransactionRepoModel,
280 ) -> Result<TransactionRepoModel, TransactionError> {
281 self.submit_transaction_impl(tx).await
282 }
283
284 async fn resubmit_transaction(
285 &self,
286 tx: TransactionRepoModel,
287 ) -> Result<TransactionRepoModel, TransactionError> {
288 Ok(tx)
289 }
290
291 async fn handle_transaction_status(
292 &self,
293 tx: TransactionRepoModel,
294 ) -> Result<TransactionRepoModel, TransactionError> {
295 self.handle_transaction_status_impl(tx).await
296 }
297
298 async fn cancel_transaction(
299 &self,
300 tx: TransactionRepoModel,
301 ) -> Result<TransactionRepoModel, TransactionError> {
302 Ok(tx)
303 }
304
305 async fn replace_transaction(
306 &self,
307 _old_tx: TransactionRepoModel,
308 _new_tx_request: NetworkTransactionRequest,
309 ) -> Result<TransactionRepoModel, TransactionError> {
310 Ok(_old_tx)
311 }
312
313 async fn sign_transaction(
314 &self,
315 tx: TransactionRepoModel,
316 ) -> Result<TransactionRepoModel, TransactionError> {
317 Ok(tx)
318 }
319
320 async fn validate_transaction(
321 &self,
322 _tx: TransactionRepoModel,
323 ) -> Result<bool, TransactionError> {
324 Ok(true)
325 }
326}
327
328pub type DefaultStellarTransaction = StellarRelayerTransaction<
329 RelayerRepositoryStorage,
330 TransactionRepositoryStorage,
331 JobProducer,
332 StellarSigner,
333 StellarProvider,
334 TransactionCounterRepositoryStorage,
335>;
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::{
341 models::{NetworkTransactionData, RepositoryError},
342 services::provider::ProviderError,
343 };
344 use std::sync::Arc;
345
346 use crate::domain::transaction::stellar::test_helpers::*;
347
348 #[test]
349 fn new_returns_ok() {
350 let relayer = create_test_relayer();
351 let mocks = default_test_mocks();
352 let result = StellarRelayerTransaction::new(
353 relayer,
354 Arc::new(mocks.relayer_repo),
355 Arc::new(mocks.tx_repo),
356 Arc::new(mocks.job_producer),
357 Arc::new(mocks.signer),
358 mocks.provider,
359 Arc::new(mocks.counter),
360 );
361 assert!(result.is_ok());
362 }
363
364 #[test]
365 fn accessor_methods_return_correct_references() {
366 let relayer = create_test_relayer();
367 let mocks = default_test_mocks();
368 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
369
370 assert_eq!(handler.relayer().id, "relayer-1");
372 assert_eq!(handler.relayer().address, TEST_PK);
373
374 let _ = handler.provider();
376 let _ = handler.job_producer();
377 let _ = handler.transaction_repository();
378 let _ = handler.signer();
379 let _ = handler.transaction_counter_service();
380 }
381
382 #[tokio::test]
383 async fn send_transaction_request_job_success() {
384 let relayer = create_test_relayer();
385 let mut mocks = default_test_mocks();
386
387 mocks
388 .job_producer
389 .expect_produce_transaction_request_job()
390 .withf(|job, delay| {
391 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
392 })
393 .times(1)
394 .returning(|_, _| Box::pin(async { Ok(()) }));
395
396 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
397 let tx = create_test_transaction(&relayer.id);
398
399 let result = handler.send_transaction_request_job(&tx, None).await;
400 assert!(result.is_ok());
401 }
402
403 #[tokio::test]
404 async fn send_transaction_request_job_with_delay() {
405 let relayer = create_test_relayer();
406 let mut mocks = default_test_mocks();
407
408 mocks
409 .job_producer
410 .expect_produce_transaction_request_job()
411 .withf(|job, delay| {
412 job.transaction_id == "tx-1"
413 && job.relayer_id == "relayer-1"
414 && delay.is_some()
415 && delay.unwrap() > chrono::Utc::now().timestamp()
416 })
417 .times(1)
418 .returning(|_, _| Box::pin(async { Ok(()) }));
419
420 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
421 let tx = create_test_transaction(&relayer.id);
422
423 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
424 assert!(result.is_ok());
425 }
426
427 #[tokio::test]
428 async fn finalize_transaction_state_success() {
429 let relayer = create_test_relayer();
430 let mut mocks = default_test_mocks();
431
432 mocks
434 .tx_repo
435 .expect_partial_update()
436 .withf(|tx_id, update| {
437 tx_id == "tx-1"
438 && update.status == Some(TransactionStatus::Confirmed)
439 && update.status_reason == Some("Transaction confirmed".to_string())
440 })
441 .times(1)
442 .returning(|tx_id, update| {
443 let mut tx = create_test_transaction("relayer-1");
444 tx.id = tx_id;
445 tx.status = update.status.unwrap();
446 tx.status_reason = update.status_reason;
447 tx.confirmed_at = update.confirmed_at;
448 Ok::<_, RepositoryError>(tx)
449 });
450
451 mocks
453 .job_producer
454 .expect_produce_send_notification_job()
455 .times(1)
456 .returning(|_, _| Box::pin(async { Ok(()) }));
457
458 let handler = make_stellar_tx_handler(relayer, mocks);
459
460 let update_request = TransactionUpdateRequest {
461 status: Some(TransactionStatus::Confirmed),
462 status_reason: Some("Transaction confirmed".to_string()),
463 confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
464 ..Default::default()
465 };
466
467 let result = handler
468 .finalize_transaction_state("tx-1".to_string(), update_request)
469 .await;
470
471 assert!(result.is_ok());
472 let updated_tx = result.unwrap();
473 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
474 assert_eq!(
475 updated_tx.status_reason,
476 Some("Transaction confirmed".to_string())
477 );
478 }
479
480 #[tokio::test]
481 async fn enqueue_next_pending_transaction_with_pending_tx() {
482 let relayer = create_test_relayer();
483 let mut mocks = default_test_mocks();
484
485 let mut pending_tx = create_test_transaction(&relayer.id);
487 pending_tx.id = "pending-tx-1".to_string();
488
489 mocks
490 .tx_repo
491 .expect_find_by_status()
492 .withf(|relayer_id, statuses| {
493 relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
494 })
495 .times(1)
496 .returning(move |_, _| {
497 let mut tx = create_test_transaction("relayer-1");
498 tx.id = "pending-tx-1".to_string();
499 Ok(vec![tx])
500 });
501
502 mocks
504 .job_producer
505 .expect_produce_transaction_request_job()
506 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
507 .times(1)
508 .returning(|_, _| Box::pin(async { Ok(()) }));
509
510 let handler = make_stellar_tx_handler(relayer, mocks);
511
512 let result = handler
513 .enqueue_next_pending_transaction("finished-tx")
514 .await;
515 assert!(result.is_ok());
516 }
517
518 #[tokio::test]
519 async fn enqueue_next_pending_transaction_no_pending_tx() {
520 let relayer = create_test_relayer();
521 let mut mocks = default_test_mocks();
522
523 mocks
525 .tx_repo
526 .expect_find_by_status()
527 .times(1)
528 .returning(|_, _| Ok(vec![]));
529
530 let handler = make_stellar_tx_handler(relayer, mocks);
531
532 let result = handler
533 .enqueue_next_pending_transaction("finished-tx")
534 .await;
535 assert!(result.is_ok());
536 }
537
538 #[tokio::test]
539 async fn test_sync_sequence_from_chain() {
540 let relayer = create_test_relayer();
541 let mut mocks = default_test_mocks();
542
543 mocks
545 .provider
546 .expect_get_account()
547 .withf(|addr| addr == TEST_PK)
548 .times(1)
549 .returning(|_| {
550 Box::pin(async {
551 use soroban_rs::xdr::{
552 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
553 String32, Thresholds, Uint256,
554 };
555 use stellar_strkey::ed25519;
556
557 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
559 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
560
561 Ok(AccountEntry {
562 account_id,
563 balance: 1000000,
564 seq_num: SequenceNumber(100),
565 num_sub_entries: 0,
566 inflation_dest: None,
567 flags: 0,
568 home_domain: String32::default(),
569 thresholds: Thresholds([1, 1, 1, 1]),
570 signers: Default::default(),
571 ext: AccountEntryExt::V0,
572 })
573 })
574 });
575
576 mocks
578 .counter
579 .expect_set()
580 .withf(|relayer_id, addr, seq| {
581 relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
582 })
583 .times(1)
584 .returning(|_, _, _| Box::pin(async { Ok(()) }));
585
586 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
587
588 let result = handler.sync_sequence_from_chain(&relayer.address).await;
589 assert!(result.is_ok());
590 }
591
592 #[tokio::test]
593 async fn test_sync_sequence_from_chain_provider_error() {
594 let relayer = create_test_relayer();
595 let mut mocks = default_test_mocks();
596
597 mocks.provider.expect_get_account().times(1).returning(|_| {
599 Box::pin(async { Err(ProviderError::Other("Account not found".to_string())) })
600 });
601
602 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
603
604 let result = handler.sync_sequence_from_chain(&relayer.address).await;
605 assert!(result.is_err());
606 match result.unwrap_err() {
607 TransactionError::UnexpectedError(msg) => {
608 assert!(msg.contains("Failed to fetch account from chain"));
609 }
610 _ => panic!("Expected UnexpectedError"),
611 }
612 }
613
614 #[tokio::test]
615 async fn test_sync_sequence_from_chain_counter_error() {
616 let relayer = create_test_relayer();
617 let mut mocks = default_test_mocks();
618
619 mocks.provider.expect_get_account().times(1).returning(|_| {
621 Box::pin(async {
622 use soroban_rs::xdr::{
623 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
624 Thresholds, Uint256,
625 };
626 use stellar_strkey::ed25519;
627
628 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
630 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
631
632 Ok(AccountEntry {
633 account_id,
634 balance: 1000000,
635 seq_num: SequenceNumber(100),
636 num_sub_entries: 0,
637 inflation_dest: None,
638 flags: 0,
639 home_domain: String32::default(),
640 thresholds: Thresholds([1, 1, 1, 1]),
641 signers: Default::default(),
642 ext: AccountEntryExt::V0,
643 })
644 })
645 });
646
647 mocks.counter.expect_set().times(1).returning(|_, _, _| {
649 Box::pin(async {
650 Err(RepositoryError::Unknown(
651 "Counter update failed".to_string(),
652 ))
653 })
654 });
655
656 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
657
658 let result = handler.sync_sequence_from_chain(&relayer.address).await;
659 assert!(result.is_err());
660 match result.unwrap_err() {
661 TransactionError::UnexpectedError(msg) => {
662 assert!(msg.contains("Failed to update sequence counter"));
663 }
664 _ => panic!("Expected UnexpectedError"),
665 }
666 }
667
668 #[test]
669 fn test_concurrent_transactions_enabled() {
670 let mut relayer = create_test_relayer();
672 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
673 policy.concurrent_transactions = Some(true);
674 }
675 let mocks = default_test_mocks();
676 let handler = make_stellar_tx_handler(relayer, mocks);
677 assert!(handler.concurrent_transactions_enabled());
678
679 let mut relayer = create_test_relayer();
681 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
682 policy.concurrent_transactions = Some(false);
683 }
684 let mocks = default_test_mocks();
685 let handler = make_stellar_tx_handler(relayer, mocks);
686 assert!(!handler.concurrent_transactions_enabled());
687
688 let relayer = create_test_relayer();
690 let mocks = default_test_mocks();
691 let handler = make_stellar_tx_handler(relayer, mocks);
692 assert_eq!(
693 handler.concurrent_transactions_enabled(),
694 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
695 );
696 }
697
698 #[tokio::test]
699 async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
700 let mut relayer = create_test_relayer();
702 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
703 policy.concurrent_transactions = Some(true);
704 }
705 let mut mocks = default_test_mocks();
706
707 mocks.tx_repo.expect_find_by_status().times(0); mocks
712 .job_producer
713 .expect_produce_transaction_request_job()
714 .times(0); let handler = make_stellar_tx_handler(relayer, mocks);
717
718 let result = handler
719 .enqueue_next_pending_transaction("finished-tx")
720 .await;
721 assert!(result.is_ok());
722 }
723
724 #[tokio::test]
725 async fn test_reset_transaction_for_retry() {
726 let relayer = create_test_relayer();
727 let mut mocks = default_test_mocks();
728
729 let mut tx = create_test_transaction(&relayer.id);
731 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
732 data.sequence_number = Some(42);
733 data.signatures.push(dummy_signature());
734 data.hash = Some("test-hash".to_string());
735 data.signed_envelope_xdr = Some("test-xdr".to_string());
736 }
737
738 mocks
740 .tx_repo
741 .expect_partial_update()
742 .withf(|tx_id, upd| {
743 tx_id == "tx-1"
744 && upd.status == Some(TransactionStatus::Pending)
745 && upd.sent_at.is_none()
746 && upd.confirmed_at.is_none()
747 })
748 .times(1)
749 .returning(|id, upd| {
750 let mut tx = create_test_transaction("relayer-1");
751 tx.id = id;
752 tx.status = upd.status.unwrap();
753 if let Some(network_data) = upd.network_data {
754 tx.network_data = network_data;
755 }
756 Ok::<_, RepositoryError>(tx)
757 });
758
759 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
760
761 let result = handler.reset_transaction_for_retry(tx).await;
762 assert!(result.is_ok());
763
764 let reset_tx = result.unwrap();
765 assert_eq!(reset_tx.status, TransactionStatus::Pending);
766
767 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
769 assert!(data.sequence_number.is_none());
770 assert!(data.signatures.is_empty());
771 assert!(data.hash.is_none());
772 assert!(data.signed_envelope_xdr.is_none());
773 } else {
774 panic!("Expected Stellar transaction data");
775 }
776 }
777}