openzeppelin_relayer/domain/transaction/stellar/
submit.rs1use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{is_final_state, utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10 constants::STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
11 jobs::JobProducerTrait,
12 models::{
13 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15 },
16 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17 services::{provider::StellarProviderTrait, signer::Signer},
18 utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
22where
23 R: Repository<RelayerRepoModel, String> + Send + Sync,
24 T: TransactionRepository + Send + Sync,
25 J: JobProducerTrait + Send + Sync,
26 S: Signer + Send + Sync,
27 P: StellarProviderTrait + Send + Sync,
28 C: TransactionCounterTrait + Send + Sync,
29{
30 pub async fn submit_transaction_impl(
33 &self,
34 tx: TransactionRepoModel,
35 ) -> Result<TransactionRepoModel, TransactionError> {
36 info!(tx_id = %tx.id, status = ?tx.status, "submitting stellar transaction");
37
38 if is_final_state(&tx.status) {
40 warn!(
41 tx_id = %tx.id,
42 status = ?tx.status,
43 "transaction already in final state, skipping submission"
44 );
45 return Ok(tx);
46 }
47
48 match self.submit_core(tx.clone()).await {
50 Ok(submitted_tx) => Ok(submitted_tx),
51 Err(error) => {
52 self.handle_submit_failure(tx, error).await
54 }
55 }
56 }
57
58 async fn submit_core(
60 &self,
61 tx: TransactionRepoModel,
62 ) -> Result<TransactionRepoModel, TransactionError> {
63 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
64 let tx_envelope = stellar_data
65 .get_envelope_for_submission()
66 .map_err(TransactionError::from)?;
67
68 let hash = self
69 .provider()
70 .send_transaction(&tx_envelope)
71 .await
72 .map_err(TransactionError::from)?;
73
74 let tx_hash_hex = hex::encode(hash.as_slice());
75 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
76
77 let mut hashes = tx.hashes.clone();
78 hashes.push(tx_hash_hex);
79
80 let update_req = TransactionUpdateRequest {
81 status: Some(TransactionStatus::Submitted),
82 sent_at: Some(Utc::now().to_rfc3339()),
83 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
84 hashes: Some(hashes),
85 ..Default::default()
86 };
87
88 let updated_tx = self
89 .transaction_repository()
90 .partial_update(tx.id.clone(), update_req)
91 .await?;
92
93 self.send_transaction_update_notification(&updated_tx).await;
95
96 Ok(updated_tx)
97 }
98
99 async fn handle_submit_failure(
102 &self,
103 tx: TransactionRepoModel,
104 error: TransactionError,
105 ) -> Result<TransactionRepoModel, TransactionError> {
106 let error_reason = format!("Submission failed: {error}");
107 let tx_id = tx.id.clone();
108 warn!(reason = %error_reason, "transaction submission failed");
109
110 if is_bad_sequence_error(&error_reason) {
111 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
113 info!("syncing sequence from chain after bad sequence error");
114 match self
115 .sync_sequence_from_chain(&stellar_data.source_account)
116 .await
117 {
118 Ok(()) => {
119 info!("successfully synced sequence from chain");
120 }
121 Err(sync_error) => {
122 warn!(error = %sync_error, "failed to sync sequence from chain");
123 }
124 }
125 }
126
127 info!("bad sequence error detected, resetting and re-enqueueing");
129
130 match self.reset_transaction_for_retry(tx.clone()).await {
132 Ok(reset_tx) => {
133 if let Err(e) = self
135 .send_transaction_request_job(
136 &reset_tx,
137 Some(calculate_scheduled_timestamp(
138 STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
139 )),
140 )
141 .await
142 {
143 warn!(error = %e, "failed to re-enqueue transaction after reset");
144 } else {
145 info!("transaction reset and re-enqueued for retry through pipeline");
146 }
147
148 return Ok(reset_tx);
150 }
151 Err(reset_error) => {
152 warn!(error = %reset_error, "failed to reset transaction for retry");
153 }
155 }
156 }
157
158 let update_request = TransactionUpdateRequest {
161 status: Some(TransactionStatus::Failed),
162 status_reason: Some(error_reason.clone()),
163 ..Default::default()
164 };
165 let _failed_tx = match self
166 .finalize_transaction_state(tx_id.clone(), update_request)
167 .await
168 {
169 Ok(updated_tx) => updated_tx,
170 Err(finalize_error) => {
171 warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
172 tx
173 }
174 };
175
176 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
178 warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
179 }
180
181 info!(error = %error_reason, "transaction submission failure handled");
182
183 Err(error)
184 }
185
186 pub async fn resubmit_transaction_impl(
188 &self,
189 tx: TransactionRepoModel,
190 ) -> Result<TransactionRepoModel, TransactionError> {
191 self.submit_transaction_impl(tx).await
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use soroban_rs::xdr::{Hash, WriteXdr};
199
200 use crate::domain::transaction::stellar::test_helpers::*;
201
202 mod submit_transaction_tests {
203 use crate::{models::RepositoryError, services::provider::ProviderError};
204
205 use super::*;
206
207 #[tokio::test]
208 async fn submit_transaction_happy_path() {
209 let relayer = create_test_relayer();
210 let mut mocks = default_test_mocks();
211
212 mocks
214 .provider
215 .expect_send_transaction()
216 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
217
218 mocks
220 .tx_repo
221 .expect_partial_update()
222 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
223 .returning(|id, upd| {
224 let mut tx = create_test_transaction("relayer-1");
225 tx.id = id;
226 tx.status = upd.status.unwrap();
227 Ok::<_, RepositoryError>(tx)
228 });
229
230 mocks
232 .job_producer
233 .expect_produce_send_notification_job()
234 .times(1)
235 .returning(|_, _| Box::pin(async { Ok(()) }));
236
237 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
238
239 let mut tx = create_test_transaction(&relayer.id);
240 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
241 d.signatures.push(dummy_signature());
242 }
243
244 let res = handler.submit_transaction_impl(tx).await.unwrap();
245 assert_eq!(res.status, TransactionStatus::Submitted);
246 }
247
248 #[tokio::test]
249 async fn submit_transaction_provider_error_marks_failed() {
250 let relayer = create_test_relayer();
251 let mut mocks = default_test_mocks();
252
253 mocks.provider.expect_send_transaction().returning(|_| {
255 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
256 });
257
258 mocks
260 .tx_repo
261 .expect_partial_update()
262 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
263 .returning(|id, upd| {
264 let mut tx = create_test_transaction("relayer-1");
265 tx.id = id;
266 tx.status = upd.status.unwrap();
267 Ok::<_, RepositoryError>(tx)
268 });
269
270 mocks
272 .job_producer
273 .expect_produce_send_notification_job()
274 .times(1)
275 .returning(|_, _| Box::pin(async { Ok(()) }));
276
277 mocks
279 .tx_repo
280 .expect_find_by_status()
281 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
284 let mut tx = create_test_transaction(&relayer.id);
285 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
286 data.signatures.push(dummy_signature());
287 data.sequence_number = Some(42); }
289
290 let res = handler.submit_transaction_impl(tx).await;
291
292 assert!(res.is_err());
294 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
295 }
296
297 #[tokio::test]
298 async fn submit_transaction_repository_error_marks_failed() {
299 let relayer = create_test_relayer();
300 let mut mocks = default_test_mocks();
301
302 mocks
304 .provider
305 .expect_send_transaction()
306 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
307
308 mocks
310 .tx_repo
311 .expect_partial_update()
312 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
313 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
314
315 mocks
317 .tx_repo
318 .expect_partial_update()
319 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
320 .returning(|id, upd| {
321 let mut tx = create_test_transaction("relayer-1");
322 tx.id = id;
323 tx.status = upd.status.unwrap();
324 Ok::<_, RepositoryError>(tx)
325 });
326
327 mocks
329 .job_producer
330 .expect_produce_send_notification_job()
331 .times(1)
332 .returning(|_, _| Box::pin(async { Ok(()) }));
333
334 mocks
336 .tx_repo
337 .expect_find_by_status()
338 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
341 let mut tx = create_test_transaction(&relayer.id);
342 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
343 data.signatures.push(dummy_signature());
344 data.sequence_number = Some(42); }
346
347 let res = handler.submit_transaction_impl(tx).await;
348
349 assert!(res.is_err());
351 }
352
353 #[tokio::test]
354 async fn submit_transaction_uses_signed_envelope_xdr() {
355 let relayer = create_test_relayer();
356 let mut mocks = default_test_mocks();
357
358 let mut tx = create_test_transaction(&relayer.id);
360 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
361 data.signatures.push(dummy_signature());
362 let envelope = data.get_envelope_for_submission().unwrap();
364 let xdr = envelope
365 .to_xdr_base64(soroban_rs::xdr::Limits::none())
366 .unwrap();
367 data.signed_envelope_xdr = Some(xdr);
368 }
369
370 mocks
372 .provider
373 .expect_send_transaction()
374 .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
375
376 mocks
378 .tx_repo
379 .expect_partial_update()
380 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
381 .returning(|id, upd| {
382 let mut tx = create_test_transaction("relayer-1");
383 tx.id = id;
384 tx.status = upd.status.unwrap();
385 Ok::<_, RepositoryError>(tx)
386 });
387
388 mocks
390 .job_producer
391 .expect_produce_send_notification_job()
392 .times(1)
393 .returning(|_, _| Box::pin(async { Ok(()) }));
394
395 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
396 let res = handler.submit_transaction_impl(tx).await.unwrap();
397
398 assert_eq!(res.status, TransactionStatus::Submitted);
399 }
400
401 #[tokio::test]
402 async fn resubmit_transaction_delegates_to_submit() {
403 let relayer = create_test_relayer();
404 let mut mocks = default_test_mocks();
405
406 mocks
408 .provider
409 .expect_send_transaction()
410 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
411
412 mocks
414 .tx_repo
415 .expect_partial_update()
416 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
417 .returning(|id, upd| {
418 let mut tx = create_test_transaction("relayer-1");
419 tx.id = id;
420 tx.status = upd.status.unwrap();
421 Ok::<_, RepositoryError>(tx)
422 });
423
424 mocks
426 .job_producer
427 .expect_produce_send_notification_job()
428 .times(1)
429 .returning(|_, _| Box::pin(async { Ok(()) }));
430
431 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
432
433 let mut tx = create_test_transaction(&relayer.id);
434 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
435 d.signatures.push(dummy_signature());
436 }
437
438 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
439 assert_eq!(res.status, TransactionStatus::Submitted);
440 }
441
442 #[tokio::test]
443 async fn submit_transaction_failure_enqueues_next_transaction() {
444 let relayer = create_test_relayer();
445 let mut mocks = default_test_mocks();
446
447 mocks.provider.expect_send_transaction().returning(|_| {
449 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
450 });
451
452 mocks
456 .tx_repo
457 .expect_partial_update()
458 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
459 .returning(|id, upd| {
460 let mut tx = create_test_transaction("relayer-1");
461 tx.id = id;
462 tx.status = upd.status.unwrap();
463 Ok::<_, RepositoryError>(tx)
464 });
465
466 mocks
468 .job_producer
469 .expect_produce_send_notification_job()
470 .times(1)
471 .returning(|_, _| Box::pin(async { Ok(()) }));
472
473 let mut pending_tx = create_test_transaction(&relayer.id);
475 pending_tx.id = "next-pending-tx".to_string();
476 pending_tx.status = TransactionStatus::Pending;
477 let captured_pending_tx = pending_tx.clone();
478 mocks
479 .tx_repo
480 .expect_find_by_status()
481 .with(
482 mockall::predicate::eq(relayer.id.clone()),
483 mockall::predicate::eq(vec![TransactionStatus::Pending]),
484 )
485 .times(1)
486 .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
487
488 mocks
490 .job_producer
491 .expect_produce_transaction_request_job()
492 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
493 .times(1)
494 .returning(|_, _| Box::pin(async { Ok(()) }));
495
496 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
497 let mut tx = create_test_transaction(&relayer.id);
498 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
499 data.signatures.push(dummy_signature());
500 data.sequence_number = Some(42); }
502
503 let res = handler.submit_transaction_impl(tx).await;
504
505 assert!(res.is_err());
507 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
508 }
509
510 #[tokio::test]
511 async fn test_submit_bad_sequence_resets_and_retries() {
512 let relayer = create_test_relayer();
513 let mut mocks = default_test_mocks();
514
515 mocks.provider.expect_send_transaction().returning(|_| {
517 Box::pin(async {
518 Err(ProviderError::Other(
519 "transaction submission failed: TxBadSeq".to_string(),
520 ))
521 })
522 });
523
524 mocks.provider.expect_get_account().times(1).returning(|_| {
526 Box::pin(async {
527 use soroban_rs::xdr::{
528 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
529 String32, Thresholds, Uint256,
530 };
531 use stellar_strkey::ed25519;
532
533 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
534 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
535
536 Ok(AccountEntry {
537 account_id,
538 balance: 1000000,
539 seq_num: SequenceNumber(100),
540 num_sub_entries: 0,
541 inflation_dest: None,
542 flags: 0,
543 home_domain: String32::default(),
544 thresholds: Thresholds([1, 1, 1, 1]),
545 signers: Default::default(),
546 ext: AccountEntryExt::V0,
547 })
548 })
549 });
550
551 mocks
553 .counter
554 .expect_set()
555 .times(1)
556 .returning(|_, _, _| Box::pin(async { Ok(()) }));
557
558 mocks
560 .tx_repo
561 .expect_partial_update()
562 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
563 .times(1)
564 .returning(|id, upd| {
565 let mut tx = create_test_transaction("relayer-1");
566 tx.id = id;
567 tx.status = upd.status.unwrap();
568 if let Some(network_data) = upd.network_data {
569 tx.network_data = network_data;
570 }
571 Ok::<_, RepositoryError>(tx)
572 });
573
574 mocks
576 .job_producer
577 .expect_produce_transaction_request_job()
578 .times(1)
579 .returning(|_, _| Box::pin(async { Ok(()) }));
580
581 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
582 let mut tx = create_test_transaction(&relayer.id);
583 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
584 data.signatures.push(dummy_signature());
585 data.sequence_number = Some(42);
586 }
587
588 let result = handler.submit_transaction_impl(tx).await;
589
590 assert!(result.is_ok());
592 let reset_tx = result.unwrap();
593 assert_eq!(reset_tx.status, TransactionStatus::Pending);
594
595 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
597 assert!(data.sequence_number.is_none());
598 assert!(data.signatures.is_empty());
599 assert!(data.hash.is_none());
600 assert!(data.signed_envelope_xdr.is_none());
601 } else {
602 panic!("Expected Stellar transaction data");
603 }
604 }
605 }
606}