openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{is_final_state, utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10    constants::STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
11    jobs::JobProducerTrait,
12    models::{
13        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14        TransactionStatus, TransactionUpdateRequest,
15    },
16    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17    services::{provider::StellarProviderTrait, signer::Signer},
18    utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
22where
23    R: Repository<RelayerRepoModel, String> + Send + Sync,
24    T: TransactionRepository + Send + Sync,
25    J: JobProducerTrait + Send + Sync,
26    S: Signer + Send + Sync,
27    P: StellarProviderTrait + Send + Sync,
28    C: TransactionCounterTrait + Send + Sync,
29{
30    /// Main submission method with robust error handling.
31    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
32    pub async fn submit_transaction_impl(
33        &self,
34        tx: TransactionRepoModel,
35    ) -> Result<TransactionRepoModel, TransactionError> {
36        info!(tx_id = %tx.id, status = ?tx.status, "submitting stellar transaction");
37
38        // Defensive check: if transaction is in a final state or unexpected state, don't retry
39        if is_final_state(&tx.status) {
40            warn!(
41                tx_id = %tx.id,
42                status = ?tx.status,
43                "transaction already in final state, skipping submission"
44            );
45            return Ok(tx);
46        }
47
48        // Call core submission logic with error handling
49        match self.submit_core(tx.clone()).await {
50            Ok(submitted_tx) => Ok(submitted_tx),
51            Err(error) => {
52                // Handle submission failure - mark as failed and send notification
53                self.handle_submit_failure(tx, error).await
54            }
55        }
56    }
57
58    /// Core submission logic - pure business logic without error handling concerns.
59    async fn submit_core(
60        &self,
61        tx: TransactionRepoModel,
62    ) -> Result<TransactionRepoModel, TransactionError> {
63        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
64        let tx_envelope = stellar_data
65            .get_envelope_for_submission()
66            .map_err(TransactionError::from)?;
67
68        let hash = self
69            .provider()
70            .send_transaction(&tx_envelope)
71            .await
72            .map_err(TransactionError::from)?;
73
74        let tx_hash_hex = hex::encode(hash.as_slice());
75        let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
76
77        let mut hashes = tx.hashes.clone();
78        hashes.push(tx_hash_hex);
79
80        let update_req = TransactionUpdateRequest {
81            status: Some(TransactionStatus::Submitted),
82            sent_at: Some(Utc::now().to_rfc3339()),
83            network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
84            hashes: Some(hashes),
85            ..Default::default()
86        };
87
88        let updated_tx = self
89            .transaction_repository()
90            .partial_update(tx.id.clone(), update_req)
91            .await?;
92
93        // Send notification
94        self.send_transaction_update_notification(&updated_tx).await;
95
96        Ok(updated_tx)
97    }
98
99    /// Handles submission failures with comprehensive cleanup and error reporting.
100    /// For bad sequence errors, resets the transaction and re-enqueues it for retry.
101    async fn handle_submit_failure(
102        &self,
103        tx: TransactionRepoModel,
104        error: TransactionError,
105    ) -> Result<TransactionRepoModel, TransactionError> {
106        let error_reason = format!("Submission failed: {error}");
107        let tx_id = tx.id.clone();
108        warn!(reason = %error_reason, "transaction submission failed");
109
110        if is_bad_sequence_error(&error_reason) {
111            // For bad sequence errors, sync sequence from chain first
112            if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
113                info!("syncing sequence from chain after bad sequence error");
114                match self
115                    .sync_sequence_from_chain(&stellar_data.source_account)
116                    .await
117                {
118                    Ok(()) => {
119                        info!("successfully synced sequence from chain");
120                    }
121                    Err(sync_error) => {
122                        warn!(error = %sync_error, "failed to sync sequence from chain");
123                    }
124                }
125            }
126
127            // Reset the transaction and re-enqueue it
128            info!("bad sequence error detected, resetting and re-enqueueing");
129
130            // Reset the transaction to pending state
131            match self.reset_transaction_for_retry(tx.clone()).await {
132                Ok(reset_tx) => {
133                    // Re-enqueue the transaction to go through the pipeline again
134                    if let Err(e) = self
135                        .send_transaction_request_job(
136                            &reset_tx,
137                            Some(calculate_scheduled_timestamp(
138                                STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
139                            )),
140                        )
141                        .await
142                    {
143                        warn!(error = %e, "failed to re-enqueue transaction after reset");
144                    } else {
145                        info!("transaction reset and re-enqueued for retry through pipeline");
146                    }
147
148                    // Return success since we're handling the retry
149                    return Ok(reset_tx);
150                }
151                Err(reset_error) => {
152                    warn!(error = %reset_error, "failed to reset transaction for retry");
153                    // Fall through to normal failure handling
154                }
155            }
156        }
157
158        // For non-bad-sequence errors or if reset failed, mark as failed
159        // Step 1: Mark transaction as Failed with detailed reason
160        let update_request = TransactionUpdateRequest {
161            status: Some(TransactionStatus::Failed),
162            status_reason: Some(error_reason.clone()),
163            ..Default::default()
164        };
165        let _failed_tx = match self
166            .finalize_transaction_state(tx_id.clone(), update_request)
167            .await
168        {
169            Ok(updated_tx) => updated_tx,
170            Err(finalize_error) => {
171                warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
172                tx
173            }
174        };
175
176        // Attempt to enqueue next pending transaction or release lane
177        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
178            warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
179        }
180
181        info!(error = %error_reason, "transaction submission failure handled");
182
183        Err(error)
184    }
185
186    /// Resubmit transaction - delegates to submit_transaction_impl
187    pub async fn resubmit_transaction_impl(
188        &self,
189        tx: TransactionRepoModel,
190    ) -> Result<TransactionRepoModel, TransactionError> {
191        self.submit_transaction_impl(tx).await
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use soroban_rs::xdr::{Hash, WriteXdr};
199
200    use crate::domain::transaction::stellar::test_helpers::*;
201
202    mod submit_transaction_tests {
203        use crate::{models::RepositoryError, services::provider::ProviderError};
204
205        use super::*;
206
207        #[tokio::test]
208        async fn submit_transaction_happy_path() {
209            let relayer = create_test_relayer();
210            let mut mocks = default_test_mocks();
211
212            // provider gives a hash
213            mocks
214                .provider
215                .expect_send_transaction()
216                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
217
218            // expect partial update to Submitted
219            mocks
220                .tx_repo
221                .expect_partial_update()
222                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
223                .returning(|id, upd| {
224                    let mut tx = create_test_transaction("relayer-1");
225                    tx.id = id;
226                    tx.status = upd.status.unwrap();
227                    Ok::<_, RepositoryError>(tx)
228                });
229
230            // Expect notification
231            mocks
232                .job_producer
233                .expect_produce_send_notification_job()
234                .times(1)
235                .returning(|_, _| Box::pin(async { Ok(()) }));
236
237            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
238
239            let mut tx = create_test_transaction(&relayer.id);
240            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
241                d.signatures.push(dummy_signature());
242            }
243
244            let res = handler.submit_transaction_impl(tx).await.unwrap();
245            assert_eq!(res.status, TransactionStatus::Submitted);
246        }
247
248        #[tokio::test]
249        async fn submit_transaction_provider_error_marks_failed() {
250            let relayer = create_test_relayer();
251            let mut mocks = default_test_mocks();
252
253            // Provider fails with non-bad-sequence error
254            mocks.provider.expect_send_transaction().returning(|_| {
255                Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
256            });
257
258            // Mock finalize_transaction_state for failure handling
259            mocks
260                .tx_repo
261                .expect_partial_update()
262                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
263                .returning(|id, upd| {
264                    let mut tx = create_test_transaction("relayer-1");
265                    tx.id = id;
266                    tx.status = upd.status.unwrap();
267                    Ok::<_, RepositoryError>(tx)
268                });
269
270            // Mock notification for failed transaction
271            mocks
272                .job_producer
273                .expect_produce_send_notification_job()
274                .times(1)
275                .returning(|_, _| Box::pin(async { Ok(()) }));
276
277            // Mock find_by_status for enqueue_next_pending_transaction
278            mocks
279                .tx_repo
280                .expect_find_by_status()
281                .returning(|_, _| Ok(vec![])); // No pending transactions
282
283            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
284            let mut tx = create_test_transaction(&relayer.id);
285            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
286                data.signatures.push(dummy_signature());
287                data.sequence_number = Some(42); // Set sequence number
288            }
289
290            let res = handler.submit_transaction_impl(tx).await;
291
292            // Should return error but transaction should be marked as failed
293            assert!(res.is_err());
294            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
295        }
296
297        #[tokio::test]
298        async fn submit_transaction_repository_error_marks_failed() {
299            let relayer = create_test_relayer();
300            let mut mocks = default_test_mocks();
301
302            // Provider succeeds
303            mocks
304                .provider
305                .expect_send_transaction()
306                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
307
308            // Repository fails on first update (submission)
309            mocks
310                .tx_repo
311                .expect_partial_update()
312                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
313                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
314
315            // Mock finalize_transaction_state for failure handling
316            mocks
317                .tx_repo
318                .expect_partial_update()
319                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
320                .returning(|id, upd| {
321                    let mut tx = create_test_transaction("relayer-1");
322                    tx.id = id;
323                    tx.status = upd.status.unwrap();
324                    Ok::<_, RepositoryError>(tx)
325                });
326
327            // Mock notification for failed transaction
328            mocks
329                .job_producer
330                .expect_produce_send_notification_job()
331                .times(1)
332                .returning(|_, _| Box::pin(async { Ok(()) }));
333
334            // Mock find_by_status for enqueue_next_pending_transaction
335            mocks
336                .tx_repo
337                .expect_find_by_status()
338                .returning(|_, _| Ok(vec![])); // No pending transactions
339
340            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
341            let mut tx = create_test_transaction(&relayer.id);
342            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
343                data.signatures.push(dummy_signature());
344                data.sequence_number = Some(42); // Set sequence number
345            }
346
347            let res = handler.submit_transaction_impl(tx).await;
348
349            // Should return error but transaction should be marked as failed
350            assert!(res.is_err());
351        }
352
353        #[tokio::test]
354        async fn submit_transaction_uses_signed_envelope_xdr() {
355            let relayer = create_test_relayer();
356            let mut mocks = default_test_mocks();
357
358            // Create a transaction with signed_envelope_xdr set
359            let mut tx = create_test_transaction(&relayer.id);
360            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
361                data.signatures.push(dummy_signature());
362                // Build and store the signed envelope XDR
363                let envelope = data.get_envelope_for_submission().unwrap();
364                let xdr = envelope
365                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
366                    .unwrap();
367                data.signed_envelope_xdr = Some(xdr);
368            }
369
370            // Provider should receive the envelope decoded from signed_envelope_xdr
371            mocks
372                .provider
373                .expect_send_transaction()
374                .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
375
376            // Update to Submitted
377            mocks
378                .tx_repo
379                .expect_partial_update()
380                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
381                .returning(|id, upd| {
382                    let mut tx = create_test_transaction("relayer-1");
383                    tx.id = id;
384                    tx.status = upd.status.unwrap();
385                    Ok::<_, RepositoryError>(tx)
386                });
387
388            // Expect notification
389            mocks
390                .job_producer
391                .expect_produce_send_notification_job()
392                .times(1)
393                .returning(|_, _| Box::pin(async { Ok(()) }));
394
395            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
396            let res = handler.submit_transaction_impl(tx).await.unwrap();
397
398            assert_eq!(res.status, TransactionStatus::Submitted);
399        }
400
401        #[tokio::test]
402        async fn resubmit_transaction_delegates_to_submit() {
403            let relayer = create_test_relayer();
404            let mut mocks = default_test_mocks();
405
406            // provider gives a hash
407            mocks
408                .provider
409                .expect_send_transaction()
410                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
411
412            // expect partial update to Submitted
413            mocks
414                .tx_repo
415                .expect_partial_update()
416                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
417                .returning(|id, upd| {
418                    let mut tx = create_test_transaction("relayer-1");
419                    tx.id = id;
420                    tx.status = upd.status.unwrap();
421                    Ok::<_, RepositoryError>(tx)
422                });
423
424            // Expect notification
425            mocks
426                .job_producer
427                .expect_produce_send_notification_job()
428                .times(1)
429                .returning(|_, _| Box::pin(async { Ok(()) }));
430
431            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
432
433            let mut tx = create_test_transaction(&relayer.id);
434            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
435                d.signatures.push(dummy_signature());
436            }
437
438            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
439            assert_eq!(res.status, TransactionStatus::Submitted);
440        }
441
442        #[tokio::test]
443        async fn submit_transaction_failure_enqueues_next_transaction() {
444            let relayer = create_test_relayer();
445            let mut mocks = default_test_mocks();
446
447            // Provider fails with non-bad-sequence error
448            mocks.provider.expect_send_transaction().returning(|_| {
449                Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
450            });
451
452            // No sync expected for non-bad-sequence errors
453
454            // Mock finalize_transaction_state for failure handling
455            mocks
456                .tx_repo
457                .expect_partial_update()
458                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
459                .returning(|id, upd| {
460                    let mut tx = create_test_transaction("relayer-1");
461                    tx.id = id;
462                    tx.status = upd.status.unwrap();
463                    Ok::<_, RepositoryError>(tx)
464                });
465
466            // Mock notification for failed transaction
467            mocks
468                .job_producer
469                .expect_produce_send_notification_job()
470                .times(1)
471                .returning(|_, _| Box::pin(async { Ok(()) }));
472
473            // Mock find_by_status to return a pending transaction
474            let mut pending_tx = create_test_transaction(&relayer.id);
475            pending_tx.id = "next-pending-tx".to_string();
476            pending_tx.status = TransactionStatus::Pending;
477            let captured_pending_tx = pending_tx.clone();
478            mocks
479                .tx_repo
480                .expect_find_by_status()
481                .with(
482                    mockall::predicate::eq(relayer.id.clone()),
483                    mockall::predicate::eq(vec![TransactionStatus::Pending]),
484                )
485                .times(1)
486                .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
487
488            // Mock produce_transaction_request_job for the next pending transaction
489            mocks
490                .job_producer
491                .expect_produce_transaction_request_job()
492                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
493                .times(1)
494                .returning(|_, _| Box::pin(async { Ok(()) }));
495
496            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
497            let mut tx = create_test_transaction(&relayer.id);
498            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
499                data.signatures.push(dummy_signature());
500                data.sequence_number = Some(42); // Set sequence number
501            }
502
503            let res = handler.submit_transaction_impl(tx).await;
504
505            // Should return error but next transaction should be enqueued
506            assert!(res.is_err());
507            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
508        }
509
510        #[tokio::test]
511        async fn test_submit_bad_sequence_resets_and_retries() {
512            let relayer = create_test_relayer();
513            let mut mocks = default_test_mocks();
514
515            // Mock provider to return bad sequence error
516            mocks.provider.expect_send_transaction().returning(|_| {
517                Box::pin(async {
518                    Err(ProviderError::Other(
519                        "transaction submission failed: TxBadSeq".to_string(),
520                    ))
521                })
522            });
523
524            // Mock get_account for sync_sequence_from_chain
525            mocks.provider.expect_get_account().times(1).returning(|_| {
526                Box::pin(async {
527                    use soroban_rs::xdr::{
528                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
529                        String32, Thresholds, Uint256,
530                    };
531                    use stellar_strkey::ed25519;
532
533                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
534                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
535
536                    Ok(AccountEntry {
537                        account_id,
538                        balance: 1000000,
539                        seq_num: SequenceNumber(100),
540                        num_sub_entries: 0,
541                        inflation_dest: None,
542                        flags: 0,
543                        home_domain: String32::default(),
544                        thresholds: Thresholds([1, 1, 1, 1]),
545                        signers: Default::default(),
546                        ext: AccountEntryExt::V0,
547                    })
548                })
549            });
550
551            // Mock counter set for sync_sequence_from_chain
552            mocks
553                .counter
554                .expect_set()
555                .times(1)
556                .returning(|_, _, _| Box::pin(async { Ok(()) }));
557
558            // Mock partial_update for reset_transaction_for_retry - should reset to Pending
559            mocks
560                .tx_repo
561                .expect_partial_update()
562                .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
563                .times(1)
564                .returning(|id, upd| {
565                    let mut tx = create_test_transaction("relayer-1");
566                    tx.id = id;
567                    tx.status = upd.status.unwrap();
568                    if let Some(network_data) = upd.network_data {
569                        tx.network_data = network_data;
570                    }
571                    Ok::<_, RepositoryError>(tx)
572                });
573
574            // Mock produce_transaction_request_job for re-enqueue
575            mocks
576                .job_producer
577                .expect_produce_transaction_request_job()
578                .times(1)
579                .returning(|_, _| Box::pin(async { Ok(()) }));
580
581            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
582            let mut tx = create_test_transaction(&relayer.id);
583            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
584                data.signatures.push(dummy_signature());
585                data.sequence_number = Some(42);
586            }
587
588            let result = handler.submit_transaction_impl(tx).await;
589
590            // Should return Ok since we're handling the retry
591            assert!(result.is_ok());
592            let reset_tx = result.unwrap();
593            assert_eq!(reset_tx.status, TransactionStatus::Pending);
594
595            // Verify stellar data was reset
596            if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
597                assert!(data.sequence_number.is_none());
598                assert!(data.signatures.is_empty());
599                assert!(data.hash.is_none());
600                assert!(data.signed_envelope_xdr.is_none());
601            } else {
602                panic!("Expected Stellar transaction data");
603            }
604        }
605    }
606}