1use crate::{
6 config::ServerConfig,
7 constants::{
8 DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9 DEFAULT_CONCURRENCY_SOLANA_SWAP, DEFAULT_CONCURRENCY_STATUS_CHECKER,
10 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM, DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
11 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12 WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13 WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_CLEANUP_RETRIES,
14 WORKER_TRANSACTION_REQUEST_RETRIES, WORKER_TRANSACTION_STATUS_CHECKER_RETRIES,
15 WORKER_TRANSACTION_SUBMIT_RETRIES,
16 },
17 jobs::{
18 notification_handler, relayer_health_check_handler, solana_token_swap_cron_handler,
19 solana_token_swap_request_handler, transaction_cleanup_handler,
20 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
21 JobProducerTrait,
22 },
23 models::{
24 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
25 ThinDataAppState, TransactionRepoModel,
26 },
27 repositories::{
28 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29 Repository, TransactionCounterTrait, TransactionRepository,
30 },
31};
32use apalis::prelude::*;
33
34use apalis::layers::retry::backoff::MakeBackoff;
35use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
36use apalis::layers::ErrorHandlingLayer;
37
38pub use tower::util::rng::HasherRng;
40
41use apalis_cron::CronStream;
42use eyre::Result;
43use std::{str::FromStr, time::Duration};
44use tokio::signal::unix::SignalKind;
45use tracing::{debug, error, info};
46
47const TRANSACTION_REQUEST: &str = "transaction_request";
48const TRANSACTION_SENDER: &str = "transaction_sender";
49const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
51const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
53const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
54const NOTIFICATION_SENDER: &str = "notification_sender";
55const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
56const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
57const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
58
59fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
69 let maker = ExponentialBackoffMaker::new(
70 Duration::from_millis(initial_ms),
71 Duration::from_millis(max_ms),
72 jitter,
73 HasherRng::default(),
74 )?;
75
76 Ok(maker)
77}
78
79pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
80 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
81) -> Result<()>
82where
83 J: JobProducerTrait + Send + Sync + 'static,
84 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
85 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
86 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
87 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
88 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
89 TCR: TransactionCounterTrait + Send + Sync + 'static,
90 PR: PluginRepositoryTrait + Send + Sync + 'static,
91 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
92{
93 let queue = app_state.job_producer.get_queue().await?;
94
95 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
96 .layer(ErrorHandlingLayer::new())
97 .retry(
98 RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
99 .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
100 )
101 .enable_tracing()
102 .catch_panic()
103 .concurrency(ServerConfig::get_worker_concurrency(
104 TRANSACTION_REQUEST,
105 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
106 ))
107 .data(app_state.clone())
108 .backend(queue.transaction_request_queue.clone())
109 .build_fn(transaction_request_handler);
110
111 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
112 .layer(ErrorHandlingLayer::new())
113 .enable_tracing()
114 .catch_panic()
115 .retry(
116 RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
117 .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
118 )
119 .concurrency(ServerConfig::get_worker_concurrency(
120 TRANSACTION_SENDER,
121 DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
122 ))
123 .data(app_state.clone())
124 .backend(queue.transaction_submission_queue.clone())
125 .build_fn(transaction_submission_handler);
126
127 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
130 .layer(ErrorHandlingLayer::new())
131 .enable_tracing()
132 .catch_panic()
133 .retry(
134 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
135 .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
136 )
137 .concurrency(ServerConfig::get_worker_concurrency(
138 TRANSACTION_STATUS_CHECKER,
139 DEFAULT_CONCURRENCY_STATUS_CHECKER,
140 ))
141 .data(app_state.clone())
142 .backend(queue.transaction_status_queue.clone())
143 .build_fn(transaction_status_handler);
144
145 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
148 .layer(ErrorHandlingLayer::new())
149 .enable_tracing()
150 .catch_panic()
151 .retry(
152 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
153 .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
154 )
155 .concurrency(ServerConfig::get_worker_concurrency(
156 TRANSACTION_STATUS_CHECKER_EVM,
157 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
158 ))
159 .data(app_state.clone())
160 .backend(queue.transaction_status_queue_evm.clone())
161 .build_fn(transaction_status_handler);
162
163 let transaction_status_queue_worker_stellar =
166 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
167 .layer(ErrorHandlingLayer::new())
168 .enable_tracing()
169 .catch_panic()
170 .retry(
171 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
172 .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
173 )
174 .concurrency(ServerConfig::get_worker_concurrency(
175 TRANSACTION_STATUS_CHECKER_STELLAR,
176 DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
177 ))
178 .data(app_state.clone())
179 .backend(queue.transaction_status_queue_stellar.clone())
180 .build_fn(transaction_status_handler);
181
182 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
183 .layer(ErrorHandlingLayer::new())
184 .enable_tracing()
185 .catch_panic()
186 .retry(
187 RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
188 .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
189 )
190 .concurrency(ServerConfig::get_worker_concurrency(
191 NOTIFICATION_SENDER,
192 DEFAULT_CONCURRENCY_NOTIFICATION,
193 ))
194 .data(app_state.clone())
195 .backend(queue.notification_queue.clone())
196 .build_fn(notification_handler);
197
198 let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
199 .layer(ErrorHandlingLayer::new())
200 .enable_tracing()
201 .catch_panic()
202 .retry(
203 RetryPolicy::retries(WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES)
204 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
205 )
206 .concurrency(ServerConfig::get_worker_concurrency(
207 SOLANA_TOKEN_SWAP_REQUEST,
208 DEFAULT_CONCURRENCY_SOLANA_SWAP,
209 ))
210 .data(app_state.clone())
211 .backend(queue.solana_token_swap_request_queue.clone())
212 .build_fn(solana_token_swap_request_handler);
213
214 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
215 .layer(ErrorHandlingLayer::new())
216 .enable_tracing()
217 .catch_panic()
218 .retry(
219 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
220 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
221 )
222 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
224 .backend(CronStream::new(
225 apalis_cron::Schedule::from_str("0 */30 * * * *")?,
227 ))
228 .build_fn(transaction_cleanup_handler);
229
230 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
231 .layer(ErrorHandlingLayer::new())
232 .enable_tracing()
233 .catch_panic()
234 .retry(
235 RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
236 .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
237 )
238 .concurrency(ServerConfig::get_worker_concurrency(
239 RELAYER_HEALTH_CHECK,
240 DEFAULT_CONCURRENCY_HEALTH_CHECK,
241 ))
242 .data(app_state.clone())
243 .backend(queue.relayer_health_check_queue.clone())
244 .build_fn(relayer_health_check_handler);
245
246 let monitor = Monitor::new()
247 .register(transaction_request_queue_worker)
248 .register(transaction_submission_queue_worker)
249 .register(transaction_status_queue_worker)
250 .register(transaction_status_queue_worker_evm)
251 .register(transaction_status_queue_worker_stellar)
252 .register(notification_queue_worker)
253 .register(solana_token_swap_request_queue_worker)
254 .register(transaction_cleanup_queue_worker)
255 .register(relayer_health_check_worker)
256 .on_event(monitor_handle_event)
257 .shutdown_timeout(Duration::from_millis(5000));
258
259 let monitor_future = monitor.run_with_signal(async {
260 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
261 .expect("Failed to create SIGINT signal");
262 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
263 .expect("Failed to create SIGTERM signal");
264
265 debug!("Workers monitor started");
266
267 tokio::select! {
268 _ = sigint.recv() => debug!("Received SIGINT."),
269 _ = sigterm.recv() => debug!("Received SIGTERM."),
270 };
271
272 debug!("Workers monitor shutting down");
273
274 Ok(())
275 });
276 tokio::spawn(async move {
277 if let Err(e) = monitor_future.await {
278 error!(error = %e, "monitor error");
279 }
280 });
281 debug!("Workers monitor shutdown complete");
282 Ok(())
283}
284
285fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
291 relayers
292 .into_iter()
293 .filter(|relayer| {
294 let policy = relayer.policies.get_solana_policy();
295 let swap_config = match policy.get_swap_config() {
296 Some(config) => config,
297 None => {
298 debug!(relayer_id = %relayer.id, "No swap configuration specified; skipping");
299 return false;
300 }
301 };
302
303 if swap_config.cron_schedule.is_none() {
304 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
305 return false;
306 }
307 true
308 })
309 .collect()
310}
311
312pub async fn initialize_solana_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
315 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
316) -> Result<()>
317where
318 J: JobProducerTrait + Send + Sync + 'static,
319 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
320 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
321 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
322 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
323 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
324 TCR: TransactionCounterTrait + Send + Sync + 'static,
325 PR: PluginRepositoryTrait + Send + Sync + 'static,
326 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
327{
328 let active_relayers = app_state.relayer_repository.list_active().await?;
329 let solena_relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
330
331 if solena_relayers_with_swap_enabled.is_empty() {
332 debug!("No solana relayers with swap enabled");
333 return Ok(());
334 }
335 info!(
336 "Found {} solana relayers with swap enabled",
337 solena_relayers_with_swap_enabled.len()
338 );
339
340 let mut workers = Vec::new();
341
342 let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
343
344 for relayer in solena_relayers_with_swap_enabled {
345 debug!(relayer = ?relayer, "found solana relayer with swap enabled");
346
347 let policy = relayer.policies.get_solana_policy();
348 let swap_config = match policy.get_swap_config() {
349 Some(config) => config,
350 None => {
351 debug!("No swap configuration specified; skipping validation.");
352 continue;
353 }
354 };
355
356 let calendar_schedule = match swap_config.cron_schedule {
357 Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
358 None => {
359 debug!(relayer = ?relayer, "no swap cron schedule found for relayer");
360 continue;
361 }
362 };
363
364 let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
366 .layer(ErrorHandlingLayer::new())
367 .enable_tracing()
368 .catch_panic()
369 .retry(
370 RetryPolicy::retries(WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES)
371 .with_backoff(swap_backoff.clone()),
372 )
373 .concurrency(1)
374 .data(relayer.id.clone())
375 .data(app_state.clone())
376 .backend(CronStream::new(calendar_schedule))
377 .build_fn(solana_token_swap_cron_handler);
378
379 workers.push(worker);
380 debug!(
381 "Created worker for solana relayer with swap enabled: {:?}",
382 relayer
383 );
384 }
385
386 let mut monitor = Monitor::new()
387 .on_event(monitor_handle_event)
388 .shutdown_timeout(Duration::from_millis(5000));
389
390 for worker in workers {
392 monitor = monitor.register(worker);
393 }
394
395 let monitor_future = monitor.run_with_signal(async {
396 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
397 .expect("Failed to create SIGINT signal");
398 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
399 .expect("Failed to create SIGTERM signal");
400
401 debug!("Solana Swap Monitor started");
402
403 tokio::select! {
404 _ = sigint.recv() => debug!("Received SIGINT."),
405 _ = sigterm.recv() => debug!("Received SIGTERM."),
406 };
407
408 debug!("Solana Swap Monitor shutting down");
409
410 Ok(())
411 });
412 tokio::spawn(async move {
413 if let Err(e) = monitor_future.await {
414 error!(error = %e, "monitor error");
415 }
416 });
417 Ok(())
418}
419
420fn monitor_handle_event(e: Worker<Event>) {
421 let worker_id = e.id();
422 match e.inner() {
423 Event::Engage(task_id) => {
424 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
425 }
426 Event::Error(e) => {
427 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
428 }
429 Event::Exit => {
430 debug!(worker_id = %worker_id, "worker exited");
431 }
432 Event::Idle => {
433 debug!(worker_id = %worker_id, "worker is idle");
434 }
435 Event::Start => {
436 debug!(worker_id = %worker_id, "worker started");
437 }
438 Event::Stop => {
439 debug!(worker_id = %worker_id, "worker stopped");
440 }
441 _ => {}
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::models::{
449 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
450 RelayerSolanaSwapConfig,
451 };
452
453 fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
454 RelayerRepoModel {
455 id: id.to_string(),
456 name: format!("EVM Relayer {}", id),
457 network: "sepolia".to_string(),
458 paused: false,
459 network_type: NetworkType::Evm,
460 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
461 signer_id: "test-signer".to_string(),
462 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
463 system_disabled: false,
464 ..Default::default()
465 }
466 }
467
468 fn create_test_solana_relayer_with_swap(
469 id: &str,
470 cron_schedule: Option<String>,
471 ) -> RelayerRepoModel {
472 RelayerRepoModel {
473 id: id.to_string(),
474 name: format!("Solana Relayer {}", id),
475 network: "mainnet-beta".to_string(),
476 paused: false,
477 network_type: NetworkType::Solana,
478 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
479 min_balance: Some(1000000000),
480 allowed_tokens: None,
481 allowed_programs: None,
482 max_signatures: None,
483 max_tx_data_size: None,
484 fee_payment_strategy: None,
485 fee_margin_percentage: None,
486 allowed_accounts: None,
487 disallowed_accounts: None,
488 max_allowed_fee_lamports: None,
489 swap_config: Some(RelayerSolanaSwapConfig {
490 strategy: None,
491 cron_schedule,
492 min_balance_threshold: Some(5000000000),
493 jupiter_swap_options: None,
494 }),
495 }),
496 signer_id: "test-signer".to_string(),
497 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
498 system_disabled: false,
499 ..Default::default()
500 }
501 }
502
503 #[test]
504 fn test_filter_relayers_for_swap_with_empty_list() {
505 let relayers = vec![];
506 let filtered = filter_relayers_for_swap(relayers);
507
508 assert_eq!(
509 filtered.len(),
510 0,
511 "Should return empty list when no relayers provided"
512 );
513 }
514
515 #[test]
516 fn test_filter_relayers_for_swap_filters_non_solana() {
517 let relayers = vec![
518 create_test_evm_relayer("evm-1"),
519 create_test_evm_relayer("evm-2"),
520 ];
521
522 let filtered = filter_relayers_for_swap(relayers);
523
524 assert_eq!(
525 filtered.len(),
526 0,
527 "Should filter out all non-Solana relayers"
528 );
529 }
530
531 #[test]
532 fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
533 let relayers = vec![
534 create_test_solana_relayer_with_swap("solana-1", None),
535 create_test_solana_relayer_with_swap("solana-2", None),
536 ];
537
538 let filtered = filter_relayers_for_swap(relayers);
539
540 assert_eq!(
541 filtered.len(),
542 0,
543 "Should filter out Solana relayers without cron schedule"
544 );
545 }
546
547 #[test]
548 fn test_filter_relayers_for_swap_includes_valid_relayers() {
549 let relayers = vec![
550 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
551 create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
552 ];
553
554 let filtered = filter_relayers_for_swap(relayers);
555
556 assert_eq!(
557 filtered.len(),
558 2,
559 "Should include all Solana relayers with cron schedule"
560 );
561 assert_eq!(filtered[0].id, "solana-1");
562 assert_eq!(filtered[1].id, "solana-2");
563 }
564
565 #[test]
566 fn test_filter_relayers_for_swap_with_mixed_relayers() {
567 let relayers = vec![
568 create_test_evm_relayer("evm-1"),
569 create_test_solana_relayer_with_swap("solana-no-cron", None),
570 create_test_solana_relayer_with_swap(
571 "solana-with-cron-1",
572 Some("0 0 * * * *".to_string()),
573 ),
574 create_test_evm_relayer("evm-2"),
575 create_test_solana_relayer_with_swap(
576 "solana-with-cron-2",
577 Some("0 */3 * * * *".to_string()),
578 ),
579 ];
580
581 let filtered = filter_relayers_for_swap(relayers);
582
583 assert_eq!(
584 filtered.len(),
585 2,
586 "Should only include Solana relayers with cron schedule"
587 );
588
589 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
591 assert!(
592 ids.contains(&"solana-with-cron-1"),
593 "Should include solana-with-cron-1"
594 );
595 assert!(
596 ids.contains(&"solana-with-cron-2"),
597 "Should include solana-with-cron-2"
598 );
599 assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
600 assert!(
601 !ids.contains(&"solana-no-cron"),
602 "Should not include Solana without cron"
603 );
604 }
605
606 #[test]
607 fn test_filter_relayers_for_swap_preserves_relayer_data() {
608 let cron = "0 1 * * * *".to_string();
609 let relayers = vec![create_test_solana_relayer_with_swap(
610 "test-relayer",
611 Some(cron.clone()),
612 )];
613
614 let filtered = filter_relayers_for_swap(relayers);
615
616 assert_eq!(filtered.len(), 1);
617
618 let relayer = &filtered[0];
619 assert_eq!(relayer.id, "test-relayer");
620 assert_eq!(relayer.name, "Solana Relayer test-relayer");
621 assert_eq!(relayer.network_type, NetworkType::Solana);
622
623 let policy = relayer.policies.get_solana_policy();
625 let swap_config = policy.get_swap_config().expect("Should have swap config");
626 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
627 }
628}