openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! Worker initialization
2//!
3//! This module contains functions for initializing background workers,
4//! including job processors and other long-running tasks.
5use crate::{
6    config::ServerConfig,
7    constants::{
8        DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9        DEFAULT_CONCURRENCY_SOLANA_SWAP, DEFAULT_CONCURRENCY_STATUS_CHECKER,
10        DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM, DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
11        DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12        WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13        WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_CLEANUP_RETRIES,
14        WORKER_TRANSACTION_REQUEST_RETRIES, WORKER_TRANSACTION_STATUS_CHECKER_RETRIES,
15        WORKER_TRANSACTION_SUBMIT_RETRIES,
16    },
17    jobs::{
18        notification_handler, relayer_health_check_handler, solana_token_swap_cron_handler,
19        solana_token_swap_request_handler, transaction_cleanup_handler,
20        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
21        JobProducerTrait,
22    },
23    models::{
24        NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
25        ThinDataAppState, TransactionRepoModel,
26    },
27    repositories::{
28        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29        Repository, TransactionCounterTrait, TransactionRepository,
30    },
31};
32use apalis::prelude::*;
33
34use apalis::layers::retry::backoff::MakeBackoff;
35use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
36use apalis::layers::ErrorHandlingLayer;
37
38/// Re-exports from [`tower::util`]
39pub use tower::util::rng::HasherRng;
40
41use apalis_cron::CronStream;
42use eyre::Result;
43use std::{str::FromStr, time::Duration};
44use tokio::signal::unix::SignalKind;
45use tracing::{debug, error, info};
46
47const TRANSACTION_REQUEST: &str = "transaction_request";
48const TRANSACTION_SENDER: &str = "transaction_sender";
49// Generic transaction status checker
50const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
51// Network specific status checkers
52const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
53const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
54const NOTIFICATION_SENDER: &str = "notification_sender";
55const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
56const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
57const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
58
59/// Creates an exponential backoff with configurable parameters
60///
61/// # Arguments
62/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
63/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
64/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
65///
66/// # Returns
67/// A configured backoff instance ready for use with RetryPolicy
68fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
69    let maker = ExponentialBackoffMaker::new(
70        Duration::from_millis(initial_ms),
71        Duration::from_millis(max_ms),
72        jitter,
73        HasherRng::default(),
74    )?;
75
76    Ok(maker)
77}
78
79pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
80    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
81) -> Result<()>
82where
83    J: JobProducerTrait + Send + Sync + 'static,
84    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
85    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
86    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
87    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
88    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
89    TCR: TransactionCounterTrait + Send + Sync + 'static,
90    PR: PluginRepositoryTrait + Send + Sync + 'static,
91    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
92{
93    let queue = app_state.job_producer.get_queue().await?;
94
95    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
96        .layer(ErrorHandlingLayer::new())
97        .retry(
98            RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
99                .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
100        )
101        .enable_tracing()
102        .catch_panic()
103        .concurrency(ServerConfig::get_worker_concurrency(
104            TRANSACTION_REQUEST,
105            DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
106        ))
107        .data(app_state.clone())
108        .backend(queue.transaction_request_queue.clone())
109        .build_fn(transaction_request_handler);
110
111    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
112        .layer(ErrorHandlingLayer::new())
113        .enable_tracing()
114        .catch_panic()
115        .retry(
116            RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
117                .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
118        )
119        .concurrency(ServerConfig::get_worker_concurrency(
120            TRANSACTION_SENDER,
121            DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
122        ))
123        .data(app_state.clone())
124        .backend(queue.transaction_submission_queue.clone())
125        .build_fn(transaction_submission_handler);
126
127    // Generic status checker
128    // Uses medium settings that work reasonably for most chains
129    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
130        .layer(ErrorHandlingLayer::new())
131        .enable_tracing()
132        .catch_panic()
133        .retry(
134            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
135                .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
136        )
137        .concurrency(ServerConfig::get_worker_concurrency(
138            TRANSACTION_STATUS_CHECKER,
139            DEFAULT_CONCURRENCY_STATUS_CHECKER,
140        ))
141        .data(app_state.clone())
142        .backend(queue.transaction_status_queue.clone())
143        .build_fn(transaction_status_handler);
144
145    // EVM status checker - slower retries to avoid premature resubmission
146    // EVM has longer block times (~12s) and needs time for resubmission logic
147    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
148        .layer(ErrorHandlingLayer::new())
149        .enable_tracing()
150        .catch_panic()
151        .retry(
152            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
153                .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
154        )
155        .concurrency(ServerConfig::get_worker_concurrency(
156            TRANSACTION_STATUS_CHECKER_EVM,
157            DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
158        ))
159        .data(app_state.clone())
160        .backend(queue.transaction_status_queue_evm.clone())
161        .build_fn(transaction_status_handler);
162
163    // Stellar status checker - fast retries for fast finality
164    // Stellar has sub-second finality, needs more frequent status checks
165    let transaction_status_queue_worker_stellar =
166        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
167            .layer(ErrorHandlingLayer::new())
168            .enable_tracing()
169            .catch_panic()
170            .retry(
171                RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
172                    .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
173            )
174            .concurrency(ServerConfig::get_worker_concurrency(
175                TRANSACTION_STATUS_CHECKER_STELLAR,
176                DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
177            ))
178            .data(app_state.clone())
179            .backend(queue.transaction_status_queue_stellar.clone())
180            .build_fn(transaction_status_handler);
181
182    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
183        .layer(ErrorHandlingLayer::new())
184        .enable_tracing()
185        .catch_panic()
186        .retry(
187            RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
188                .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
189        )
190        .concurrency(ServerConfig::get_worker_concurrency(
191            NOTIFICATION_SENDER,
192            DEFAULT_CONCURRENCY_NOTIFICATION,
193        ))
194        .data(app_state.clone())
195        .backend(queue.notification_queue.clone())
196        .build_fn(notification_handler);
197
198    let solana_token_swap_request_queue_worker = WorkerBuilder::new(SOLANA_TOKEN_SWAP_REQUEST)
199        .layer(ErrorHandlingLayer::new())
200        .enable_tracing()
201        .catch_panic()
202        .retry(
203            RetryPolicy::retries(WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES)
204                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
205        )
206        .concurrency(ServerConfig::get_worker_concurrency(
207            SOLANA_TOKEN_SWAP_REQUEST,
208            DEFAULT_CONCURRENCY_SOLANA_SWAP,
209        ))
210        .data(app_state.clone())
211        .backend(queue.solana_token_swap_request_queue.clone())
212        .build_fn(solana_token_swap_request_handler);
213
214    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
215        .layer(ErrorHandlingLayer::new())
216        .enable_tracing()
217        .catch_panic()
218        .retry(
219            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
220                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
221        )
222        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
223        .data(app_state.clone())
224        .backend(CronStream::new(
225            // every 30 minutes
226            apalis_cron::Schedule::from_str("0 */30 * * * *")?,
227        ))
228        .build_fn(transaction_cleanup_handler);
229
230    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
231        .layer(ErrorHandlingLayer::new())
232        .enable_tracing()
233        .catch_panic()
234        .retry(
235            RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
236                .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
237        )
238        .concurrency(ServerConfig::get_worker_concurrency(
239            RELAYER_HEALTH_CHECK,
240            DEFAULT_CONCURRENCY_HEALTH_CHECK,
241        ))
242        .data(app_state.clone())
243        .backend(queue.relayer_health_check_queue.clone())
244        .build_fn(relayer_health_check_handler);
245
246    let monitor = Monitor::new()
247        .register(transaction_request_queue_worker)
248        .register(transaction_submission_queue_worker)
249        .register(transaction_status_queue_worker)
250        .register(transaction_status_queue_worker_evm)
251        .register(transaction_status_queue_worker_stellar)
252        .register(notification_queue_worker)
253        .register(solana_token_swap_request_queue_worker)
254        .register(transaction_cleanup_queue_worker)
255        .register(relayer_health_check_worker)
256        .on_event(monitor_handle_event)
257        .shutdown_timeout(Duration::from_millis(5000));
258
259    let monitor_future = monitor.run_with_signal(async {
260        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
261            .expect("Failed to create SIGINT signal");
262        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
263            .expect("Failed to create SIGTERM signal");
264
265        debug!("Workers monitor started");
266
267        tokio::select! {
268            _ = sigint.recv() => debug!("Received SIGINT."),
269            _ = sigterm.recv() => debug!("Received SIGTERM."),
270        };
271
272        debug!("Workers monitor shutting down");
273
274        Ok(())
275    });
276    tokio::spawn(async move {
277        if let Err(e) = monitor_future.await {
278            error!(error = %e, "monitor error");
279        }
280    });
281    debug!("Workers monitor shutdown complete");
282    Ok(())
283}
284
285/// Filters relayers to find those eligible for swap workers
286/// Returns relayers that have:
287/// 1. Solana network type
288/// 2. Swap configuration
289/// 3. Cron schedule defined
290fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
291    relayers
292        .into_iter()
293        .filter(|relayer| {
294            let policy = relayer.policies.get_solana_policy();
295            let swap_config = match policy.get_swap_config() {
296                Some(config) => config,
297                None => {
298                    debug!(relayer_id = %relayer.id, "No swap configuration specified; skipping");
299                    return false;
300                }
301            };
302
303            if swap_config.cron_schedule.is_none() {
304                debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
305                return false;
306            }
307            true
308        })
309        .collect()
310}
311
312/// Initializes the Solana swap workers
313/// This function creates and registers workers for Solana relayers that have swap enabled and cron schedule set.
314pub async fn initialize_solana_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
315    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
316) -> Result<()>
317where
318    J: JobProducerTrait + Send + Sync + 'static,
319    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
320    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
321    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
322    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
323    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
324    TCR: TransactionCounterTrait + Send + Sync + 'static,
325    PR: PluginRepositoryTrait + Send + Sync + 'static,
326    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
327{
328    let active_relayers = app_state.relayer_repository.list_active().await?;
329    let solena_relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
330
331    if solena_relayers_with_swap_enabled.is_empty() {
332        debug!("No solana relayers with swap enabled");
333        return Ok(());
334    }
335    info!(
336        "Found {} solana relayers with swap enabled",
337        solena_relayers_with_swap_enabled.len()
338    );
339
340    let mut workers = Vec::new();
341
342    let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
343
344    for relayer in solena_relayers_with_swap_enabled {
345        debug!(relayer = ?relayer, "found solana relayer with swap enabled");
346
347        let policy = relayer.policies.get_solana_policy();
348        let swap_config = match policy.get_swap_config() {
349            Some(config) => config,
350            None => {
351                debug!("No swap configuration specified; skipping validation.");
352                continue;
353            }
354        };
355
356        let calendar_schedule = match swap_config.cron_schedule {
357            Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
358            None => {
359                debug!(relayer = ?relayer, "no swap cron schedule found for relayer");
360                continue;
361            }
362        };
363
364        // Create worker and add to the workers vector
365        let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
366            .layer(ErrorHandlingLayer::new())
367            .enable_tracing()
368            .catch_panic()
369            .retry(
370                RetryPolicy::retries(WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES)
371                    .with_backoff(swap_backoff.clone()),
372            )
373            .concurrency(1)
374            .data(relayer.id.clone())
375            .data(app_state.clone())
376            .backend(CronStream::new(calendar_schedule))
377            .build_fn(solana_token_swap_cron_handler);
378
379        workers.push(worker);
380        debug!(
381            "Created worker for solana relayer with swap enabled: {:?}",
382            relayer
383        );
384    }
385
386    let mut monitor = Monitor::new()
387        .on_event(monitor_handle_event)
388        .shutdown_timeout(Duration::from_millis(5000));
389
390    // Register all workers with the monitor
391    for worker in workers {
392        monitor = monitor.register(worker);
393    }
394
395    let monitor_future = monitor.run_with_signal(async {
396        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
397            .expect("Failed to create SIGINT signal");
398        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
399            .expect("Failed to create SIGTERM signal");
400
401        debug!("Solana Swap Monitor started");
402
403        tokio::select! {
404            _ = sigint.recv() => debug!("Received SIGINT."),
405            _ = sigterm.recv() => debug!("Received SIGTERM."),
406        };
407
408        debug!("Solana Swap Monitor shutting down");
409
410        Ok(())
411    });
412    tokio::spawn(async move {
413        if let Err(e) = monitor_future.await {
414            error!(error = %e, "monitor error");
415        }
416    });
417    Ok(())
418}
419
420fn monitor_handle_event(e: Worker<Event>) {
421    let worker_id = e.id();
422    match e.inner() {
423        Event::Engage(task_id) => {
424            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
425        }
426        Event::Error(e) => {
427            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
428        }
429        Event::Exit => {
430            debug!(worker_id = %worker_id, "worker exited");
431        }
432        Event::Idle => {
433            debug!(worker_id = %worker_id, "worker is idle");
434        }
435        Event::Start => {
436            debug!(worker_id = %worker_id, "worker started");
437        }
438        Event::Stop => {
439            debug!(worker_id = %worker_id, "worker stopped");
440        }
441        _ => {}
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::models::{
449        NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
450        RelayerSolanaSwapConfig,
451    };
452
453    fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
454        RelayerRepoModel {
455            id: id.to_string(),
456            name: format!("EVM Relayer {}", id),
457            network: "sepolia".to_string(),
458            paused: false,
459            network_type: NetworkType::Evm,
460            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
461            signer_id: "test-signer".to_string(),
462            address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
463            system_disabled: false,
464            ..Default::default()
465        }
466    }
467
468    fn create_test_solana_relayer_with_swap(
469        id: &str,
470        cron_schedule: Option<String>,
471    ) -> RelayerRepoModel {
472        RelayerRepoModel {
473            id: id.to_string(),
474            name: format!("Solana Relayer {}", id),
475            network: "mainnet-beta".to_string(),
476            paused: false,
477            network_type: NetworkType::Solana,
478            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
479                min_balance: Some(1000000000),
480                allowed_tokens: None,
481                allowed_programs: None,
482                max_signatures: None,
483                max_tx_data_size: None,
484                fee_payment_strategy: None,
485                fee_margin_percentage: None,
486                allowed_accounts: None,
487                disallowed_accounts: None,
488                max_allowed_fee_lamports: None,
489                swap_config: Some(RelayerSolanaSwapConfig {
490                    strategy: None,
491                    cron_schedule,
492                    min_balance_threshold: Some(5000000000),
493                    jupiter_swap_options: None,
494                }),
495            }),
496            signer_id: "test-signer".to_string(),
497            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
498            system_disabled: false,
499            ..Default::default()
500        }
501    }
502
503    #[test]
504    fn test_filter_relayers_for_swap_with_empty_list() {
505        let relayers = vec![];
506        let filtered = filter_relayers_for_swap(relayers);
507
508        assert_eq!(
509            filtered.len(),
510            0,
511            "Should return empty list when no relayers provided"
512        );
513    }
514
515    #[test]
516    fn test_filter_relayers_for_swap_filters_non_solana() {
517        let relayers = vec![
518            create_test_evm_relayer("evm-1"),
519            create_test_evm_relayer("evm-2"),
520        ];
521
522        let filtered = filter_relayers_for_swap(relayers);
523
524        assert_eq!(
525            filtered.len(),
526            0,
527            "Should filter out all non-Solana relayers"
528        );
529    }
530
531    #[test]
532    fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
533        let relayers = vec![
534            create_test_solana_relayer_with_swap("solana-1", None),
535            create_test_solana_relayer_with_swap("solana-2", None),
536        ];
537
538        let filtered = filter_relayers_for_swap(relayers);
539
540        assert_eq!(
541            filtered.len(),
542            0,
543            "Should filter out Solana relayers without cron schedule"
544        );
545    }
546
547    #[test]
548    fn test_filter_relayers_for_swap_includes_valid_relayers() {
549        let relayers = vec![
550            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
551            create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
552        ];
553
554        let filtered = filter_relayers_for_swap(relayers);
555
556        assert_eq!(
557            filtered.len(),
558            2,
559            "Should include all Solana relayers with cron schedule"
560        );
561        assert_eq!(filtered[0].id, "solana-1");
562        assert_eq!(filtered[1].id, "solana-2");
563    }
564
565    #[test]
566    fn test_filter_relayers_for_swap_with_mixed_relayers() {
567        let relayers = vec![
568            create_test_evm_relayer("evm-1"),
569            create_test_solana_relayer_with_swap("solana-no-cron", None),
570            create_test_solana_relayer_with_swap(
571                "solana-with-cron-1",
572                Some("0 0 * * * *".to_string()),
573            ),
574            create_test_evm_relayer("evm-2"),
575            create_test_solana_relayer_with_swap(
576                "solana-with-cron-2",
577                Some("0 */3 * * * *".to_string()),
578            ),
579        ];
580
581        let filtered = filter_relayers_for_swap(relayers);
582
583        assert_eq!(
584            filtered.len(),
585            2,
586            "Should only include Solana relayers with cron schedule"
587        );
588
589        // Verify the correct relayers were included
590        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
591        assert!(
592            ids.contains(&"solana-with-cron-1"),
593            "Should include solana-with-cron-1"
594        );
595        assert!(
596            ids.contains(&"solana-with-cron-2"),
597            "Should include solana-with-cron-2"
598        );
599        assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
600        assert!(
601            !ids.contains(&"solana-no-cron"),
602            "Should not include Solana without cron"
603        );
604    }
605
606    #[test]
607    fn test_filter_relayers_for_swap_preserves_relayer_data() {
608        let cron = "0 1 * * * *".to_string();
609        let relayers = vec![create_test_solana_relayer_with_swap(
610            "test-relayer",
611            Some(cron.clone()),
612        )];
613
614        let filtered = filter_relayers_for_swap(relayers);
615
616        assert_eq!(filtered.len(), 1);
617
618        let relayer = &filtered[0];
619        assert_eq!(relayer.id, "test-relayer");
620        assert_eq!(relayer.name, "Solana Relayer test-relayer");
621        assert_eq!(relayer.network_type, NetworkType::Solana);
622
623        // Verify swap config is preserved
624        let policy = relayer.policies.get_solana_policy();
625        let swap_config = policy.get_swap_config().expect("Should have swap config");
626        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
627    }
628}