openzeppelin_relayer/jobs/
queue.rs1use std::{env, sync::Arc};
11
12use apalis_redis::{Config, ConnectionManager, RedisStorage};
13use color_eyre::{eyre, Result};
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16use tracing::error;
17
18use crate::config::ServerConfig;
19
20use super::{
21 Job, NotificationSend, RelayerHealthCheck, SolanaTokenSwapRequest, TransactionRequest,
22 TransactionSend, TransactionStatusCheck,
23};
24
25#[derive(Clone, Debug)]
26pub struct Queue {
27 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
28 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
29 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
31 pub transaction_status_queue_evm: RedisStorage<Job<TransactionStatusCheck>>,
33 pub transaction_status_queue_stellar: RedisStorage<Job<TransactionStatusCheck>>,
35 pub notification_queue: RedisStorage<Job<NotificationSend>>,
36 pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
37 pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
38}
39
40impl Queue {
41 async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
42 namespace: &str,
43 shared: Arc<ConnectionManager>,
44 ) -> Result<RedisStorage<T>> {
45 let config = Config::default()
46 .set_namespace(namespace)
47 .set_enqueue_scheduled(Duration::from_secs(1)); Ok(RedisStorage::new_with_config((*shared).clone(), config))
50 }
51
52 pub async fn setup() -> Result<Self> {
53 let config = ServerConfig::from_env();
54 let redis_url = config.redis_url.clone();
55 let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
56 let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
57 Ok(result) => result.map_err(|e| {
58 error!(redis_url = %redis_url, error = %e, "failed to connect to redis");
59 eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
60 })?,
61 Err(_) => {
62 error!(redis_url = %redis_url, "timeout connecting to redis");
63 return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
64 }
65 };
66
67 let shared = Arc::new(conn);
68 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
70 .ok()
71 .filter(|v| !v.is_empty())
72 .map(|value| format!("{value}:queue:"))
73 .unwrap_or_default();
74 Ok(Self {
75 transaction_request_queue: Self::storage(
76 &format!("{redis_key_prefix}transaction_request_queue"),
77 shared.clone(),
78 )
79 .await?,
80 transaction_submission_queue: Self::storage(
81 &format!("{redis_key_prefix}transaction_submission_queue"),
82 shared.clone(),
83 )
84 .await?,
85 transaction_status_queue: Self::storage(
86 &format!("{redis_key_prefix}transaction_status_queue"),
87 shared.clone(),
88 )
89 .await?,
90 transaction_status_queue_evm: Self::storage(
91 &format!("{redis_key_prefix}transaction_status_queue_evm"),
92 shared.clone(),
93 )
94 .await?,
95 transaction_status_queue_stellar: Self::storage(
96 &format!("{redis_key_prefix}transaction_status_queue_stellar"),
97 shared.clone(),
98 )
99 .await?,
100 notification_queue: Self::storage(
101 &format!("{redis_key_prefix}notification_queue"),
102 shared.clone(),
103 )
104 .await?,
105 solana_token_swap_request_queue: Self::storage(
106 &format!("{redis_key_prefix}solana_token_swap_request_queue"),
107 shared.clone(),
108 )
109 .await?,
110 relayer_health_check_queue: Self::storage(
111 &format!("{redis_key_prefix}relayer_health_check_queue"),
112 shared.clone(),
113 )
114 .await?,
115 })
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[tokio::test]
124 async fn test_queue_storage_configuration() {
125 let namespace = "test_namespace";
127 let config = Config::default().set_namespace(namespace);
128
129 assert_eq!(config.get_namespace(), namespace);
130 }
131
132 #[derive(Clone, Debug)]
134 struct MockQueue {
135 pub namespace_transaction_request: String,
136 pub namespace_transaction_submission: String,
137 pub namespace_transaction_status: String,
138 pub namespace_transaction_status_evm: String,
139 pub namespace_transaction_status_stellar: String,
140 pub namespace_notification: String,
141 pub namespace_solana_token_swap_request_queue: String,
142 pub namespace_relayer_health_check_queue: String,
143 }
144
145 impl MockQueue {
146 fn new() -> Self {
147 Self {
148 namespace_transaction_request: "transaction_request_queue".to_string(),
149 namespace_transaction_submission: "transaction_submission_queue".to_string(),
150 namespace_transaction_status: "transaction_status_queue".to_string(),
151 namespace_transaction_status_evm: "transaction_status_queue_evm".to_string(),
152 namespace_transaction_status_stellar: "transaction_status_queue_stellar"
153 .to_string(),
154 namespace_notification: "notification_queue".to_string(),
155 namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
156 .to_string(),
157 namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
158 }
159 }
160 }
161
162 #[test]
163 fn test_queue_namespaces() {
164 let mock_queue = MockQueue::new();
165
166 assert_eq!(
167 mock_queue.namespace_transaction_request,
168 "transaction_request_queue"
169 );
170 assert_eq!(
171 mock_queue.namespace_transaction_submission,
172 "transaction_submission_queue"
173 );
174 assert_eq!(
175 mock_queue.namespace_transaction_status,
176 "transaction_status_queue"
177 );
178 assert_eq!(
179 mock_queue.namespace_transaction_status_evm,
180 "transaction_status_queue_evm"
181 );
182 assert_eq!(
183 mock_queue.namespace_transaction_status_stellar,
184 "transaction_status_queue_stellar"
185 );
186 assert_eq!(mock_queue.namespace_notification, "notification_queue");
187 assert_eq!(
188 mock_queue.namespace_solana_token_swap_request_queue,
189 "solana_token_swap_request_queue"
190 );
191 assert_eq!(
192 mock_queue.namespace_relayer_health_check_queue,
193 "relayer_health_check_queue"
194 );
195 }
196}