openzeppelin_relayer/jobs/
queue.rs

1//! Queue management module for job processing.
2//!
3//! This module provides Redis-backed queue implementation for handling different types of jobs:
4//! - Transaction requests
5//! - Transaction submissions
6//! - Transaction status checks
7//! - Notifications
8//! - Solana swap requests
9//! - Relayer health checks
10use std::{env, sync::Arc};
11
12use apalis_redis::{Config, ConnectionManager, RedisStorage};
13use color_eyre::{eyre, Result};
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16use tracing::error;
17
18use crate::config::ServerConfig;
19
20use super::{
21    Job, NotificationSend, RelayerHealthCheck, SolanaTokenSwapRequest, TransactionRequest,
22    TransactionSend, TransactionStatusCheck,
23};
24
25#[derive(Clone, Debug)]
26pub struct Queue {
27    pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
28    pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
29    /// Default/fallback status queue for backward compatibility, Solana, and future networks
30    pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
31    /// EVM-specific status queue with slower retries
32    pub transaction_status_queue_evm: RedisStorage<Job<TransactionStatusCheck>>,
33    /// Stellar-specific status queue with fast retries
34    pub transaction_status_queue_stellar: RedisStorage<Job<TransactionStatusCheck>>,
35    pub notification_queue: RedisStorage<Job<NotificationSend>>,
36    pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
37    pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
38}
39
40impl Queue {
41    async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
42        namespace: &str,
43        shared: Arc<ConnectionManager>,
44    ) -> Result<RedisStorage<T>> {
45        let config = Config::default()
46            .set_namespace(namespace)
47            .set_enqueue_scheduled(Duration::from_secs(1)); // Sets the polling interval for scheduled jobs from default 30 seconds
48
49        Ok(RedisStorage::new_with_config((*shared).clone(), config))
50    }
51
52    pub async fn setup() -> Result<Self> {
53        let config = ServerConfig::from_env();
54        let redis_url = config.redis_url.clone();
55        let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
56        let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
57            Ok(result) => result.map_err(|e| {
58                error!(redis_url = %redis_url, error = %e, "failed to connect to redis");
59                eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
60            })?,
61            Err(_) => {
62                error!(redis_url = %redis_url, "timeout connecting to redis");
63                return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
64            }
65        };
66
67        let shared = Arc::new(conn);
68        // use REDIS_KEY_PREFIX only if set, otherwise do not use it
69        let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
70            .ok()
71            .filter(|v| !v.is_empty())
72            .map(|value| format!("{value}:queue:"))
73            .unwrap_or_default();
74        Ok(Self {
75            transaction_request_queue: Self::storage(
76                &format!("{redis_key_prefix}transaction_request_queue"),
77                shared.clone(),
78            )
79            .await?,
80            transaction_submission_queue: Self::storage(
81                &format!("{redis_key_prefix}transaction_submission_queue"),
82                shared.clone(),
83            )
84            .await?,
85            transaction_status_queue: Self::storage(
86                &format!("{redis_key_prefix}transaction_status_queue"),
87                shared.clone(),
88            )
89            .await?,
90            transaction_status_queue_evm: Self::storage(
91                &format!("{redis_key_prefix}transaction_status_queue_evm"),
92                shared.clone(),
93            )
94            .await?,
95            transaction_status_queue_stellar: Self::storage(
96                &format!("{redis_key_prefix}transaction_status_queue_stellar"),
97                shared.clone(),
98            )
99            .await?,
100            notification_queue: Self::storage(
101                &format!("{redis_key_prefix}notification_queue"),
102                shared.clone(),
103            )
104            .await?,
105            solana_token_swap_request_queue: Self::storage(
106                &format!("{redis_key_prefix}solana_token_swap_request_queue"),
107                shared.clone(),
108            )
109            .await?,
110            relayer_health_check_queue: Self::storage(
111                &format!("{redis_key_prefix}relayer_health_check_queue"),
112                shared.clone(),
113            )
114            .await?,
115        })
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[tokio::test]
124    async fn test_queue_storage_configuration() {
125        // Test the config creation logic without actual Redis connections
126        let namespace = "test_namespace";
127        let config = Config::default().set_namespace(namespace);
128
129        assert_eq!(config.get_namespace(), namespace);
130    }
131
132    // Mock version of Queue for testing
133    #[derive(Clone, Debug)]
134    struct MockQueue {
135        pub namespace_transaction_request: String,
136        pub namespace_transaction_submission: String,
137        pub namespace_transaction_status: String,
138        pub namespace_transaction_status_evm: String,
139        pub namespace_transaction_status_stellar: String,
140        pub namespace_notification: String,
141        pub namespace_solana_token_swap_request_queue: String,
142        pub namespace_relayer_health_check_queue: String,
143    }
144
145    impl MockQueue {
146        fn new() -> Self {
147            Self {
148                namespace_transaction_request: "transaction_request_queue".to_string(),
149                namespace_transaction_submission: "transaction_submission_queue".to_string(),
150                namespace_transaction_status: "transaction_status_queue".to_string(),
151                namespace_transaction_status_evm: "transaction_status_queue_evm".to_string(),
152                namespace_transaction_status_stellar: "transaction_status_queue_stellar"
153                    .to_string(),
154                namespace_notification: "notification_queue".to_string(),
155                namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
156                    .to_string(),
157                namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
158            }
159        }
160    }
161
162    #[test]
163    fn test_queue_namespaces() {
164        let mock_queue = MockQueue::new();
165
166        assert_eq!(
167            mock_queue.namespace_transaction_request,
168            "transaction_request_queue"
169        );
170        assert_eq!(
171            mock_queue.namespace_transaction_submission,
172            "transaction_submission_queue"
173        );
174        assert_eq!(
175            mock_queue.namespace_transaction_status,
176            "transaction_status_queue"
177        );
178        assert_eq!(
179            mock_queue.namespace_transaction_status_evm,
180            "transaction_status_queue_evm"
181        );
182        assert_eq!(
183            mock_queue.namespace_transaction_status_stellar,
184            "transaction_status_queue_stellar"
185        );
186        assert_eq!(mock_queue.namespace_notification, "notification_queue");
187        assert_eq!(
188            mock_queue.namespace_solana_token_swap_request_queue,
189            "solana_token_swap_request_queue"
190        );
191        assert_eq!(
192            mock_queue.namespace_relayer_health_check_queue,
193            "relayer_health_check_queue"
194        );
195    }
196}