1use crate::{
7 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
8 domain::{get_network_relayer, Relayer},
9 jobs::{handle_result, Job, JobProducerTrait, RelayerHealthCheck},
10 models::{
11 produce_relayer_enabled_payload, DefaultAppState, DisabledReason, NetworkRepoModel,
12 NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
13 TransactionRepoModel,
14 },
15 repositories::{
16 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
17 Repository, TransactionCounterTrait, TransactionRepository,
18 },
19 utils::calculate_scheduled_timestamp,
20};
21use actix_web::web::ThinData;
22use apalis::prelude::{Attempt, Data, *};
23use eyre::Result;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27pub async fn relayer_health_check_handler(
52 job: Job<RelayerHealthCheck>,
53 app_state: Data<ThinData<DefaultAppState>>,
54 attempt: Attempt,
55) -> Result<(), Error> {
56 relayer_health_check_handler_impl(job, app_state, attempt).await
57}
58
59#[allow(clippy::type_complexity)]
61async fn relayer_health_check_handler_impl<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
62 job: Job<RelayerHealthCheck>,
63 app_state: Data<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
64 attempt: Attempt,
65) -> Result<(), Error>
66where
67 J: JobProducerTrait + Send + Sync + 'static,
68 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
69 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
72 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
73 TCR: TransactionCounterTrait + Send + Sync + 'static,
74 PR: PluginRepositoryTrait + Send + Sync + 'static,
75 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
76{
77 let result = check_and_reenable_relayer(job.data, &app_state).await;
78 handle_result(
79 result,
80 attempt,
81 "relayer_health_check",
82 WORKER_DEFAULT_MAXIMUM_RETRIES,
83 )
84}
85
86async fn check_and_reenable_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
87 data: RelayerHealthCheck,
88 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
89) -> Result<()>
90where
91 J: JobProducerTrait + Send + Sync + 'static,
92 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
93 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
94 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
95 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
96 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
97 TCR: TransactionCounterTrait + Send + Sync + 'static,
98 PR: PluginRepositoryTrait + Send + Sync + 'static,
99 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
100{
101 let relayer_id = data.relayer_id.clone();
102
103 debug!(
104 relayer_id = %relayer_id,
105 retry_count = data.retry_count,
106 "Running health check on disabled relayer"
107 );
108
109 let relayer = app_state
111 .relayer_repository
112 .get_by_id(relayer_id.clone())
113 .await
114 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
115
116 if !relayer.system_disabled {
117 info!(
118 relayer_id = %relayer_id,
119 "Relayer is not disabled, skipping health check"
120 );
121 return Ok(());
122 }
123
124 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
126 .await
127 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
128
129 match relayer_service.check_health().await {
131 Ok(_) => {
132 info!(
134 relayer_id = %relayer_id,
135 retry_count = data.retry_count,
136 "Health checks passed, re-enabling relayer"
137 );
138
139 let enabled_relayer = app_state
141 .relayer_repository
142 .enable_relayer(relayer_id.clone())
143 .await
144 .map_err(|e| eyre::eyre!("Failed to enable relayer: {}", e))?;
145
146 if let Some(notification_id) = &enabled_relayer.notification_id {
148 app_state
149 .job_producer
150 .produce_send_notification_job(
151 produce_relayer_enabled_payload(
152 notification_id,
153 &enabled_relayer,
154 data.retry_count,
155 ),
156 None,
157 )
158 .await
159 .map_err(|e| eyre::eyre!("Failed to send notification: {}", e))?;
160
161 info!(
162 relayer_id = %relayer_id,
163 notification_id = %notification_id,
164 "Sent relayer recovery notification"
165 );
166 }
167
168 Ok(())
169 }
170 Err(failures) => {
171 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
173 DisabledReason::RpcValidationFailed("Unknown error".to_string())
174 });
175
176 warn!(
177 relayer_id = %relayer_id,
178 retry_count = data.retry_count,
179 reason = %reason,
180 "Health checks failed, scheduling retry"
181 );
182
183 let should_update = match &relayer.disabled_reason {
186 Some(old_reason) => !old_reason.same_variant(&reason),
187 None => true, };
189
190 if should_update {
191 debug!(
192 relayer_id = %relayer_id,
193 old_reason = ?relayer.disabled_reason,
194 new_reason = %reason,
195 "Disabled reason variant has changed, updating"
196 );
197
198 app_state
199 .relayer_repository
200 .disable_relayer(relayer_id.clone(), reason.clone())
201 .await
202 .map_err(|e| eyre::eyre!("Failed to update disabled reason: {}", e))?;
203 } else {
204 debug!(
205 relayer_id = %relayer_id,
206 reason = %reason,
207 "Disabled reason variant unchanged, skipping update"
208 );
209 }
210
211 let delay = calculate_backoff_delay(data.retry_count);
213
214 debug!(
215 relayer_id = %relayer_id,
216 next_retry = data.retry_count + 1,
217 delay_seconds = delay.as_secs(),
218 "Scheduling next health check attempt"
219 );
220
221 app_state
223 .job_producer
224 .produce_relayer_health_check_job(
225 RelayerHealthCheck::with_retry_count(relayer_id, data.retry_count + 1),
226 Some(calculate_scheduled_timestamp(delay.as_secs() as i64)),
227 )
228 .await
229 .map_err(|e| eyre::eyre!("Failed to schedule retry: {}", e))?;
230
231 Ok(())
232 }
233 }
234}
235
236fn calculate_backoff_delay(retry_count: u32) -> Duration {
245 let seconds = match retry_count {
246 0 => 10,
247 1 => 20,
248 2 => 30,
249 3 => 45,
250 _ => 60, };
252 Duration::from_secs(seconds)
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::models::{
259 DisabledReason, NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
260 };
261
262 #[test]
263 fn test_calculate_backoff_delay() {
264 assert_eq!(calculate_backoff_delay(0), Duration::from_secs(10)); assert_eq!(calculate_backoff_delay(1), Duration::from_secs(20)); assert_eq!(calculate_backoff_delay(2), Duration::from_secs(30)); assert_eq!(calculate_backoff_delay(3), Duration::from_secs(45)); assert_eq!(calculate_backoff_delay(4), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(10), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(100), Duration::from_secs(60)); }
272
273 #[test]
274 fn test_relayer_health_check_creation() {
275 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
276 assert_eq!(health_check.relayer_id, "test-relayer");
277 assert_eq!(health_check.retry_count, 0);
278
279 let health_check_with_retry =
280 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 3);
281 assert_eq!(health_check_with_retry.relayer_id, "test-relayer");
282 assert_eq!(health_check_with_retry.retry_count, 3);
283 }
284
285 fn create_disabled_relayer(id: &str) -> RelayerRepoModel {
286 RelayerRepoModel {
287 id: id.to_string(),
288 name: format!("Relayer {}", id),
289 network: "sepolia".to_string(),
290 paused: false,
291 network_type: NetworkType::Evm,
292 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
293 gas_price_cap: None,
294 whitelist_receivers: None,
295 eip1559_pricing: Some(false),
296 private_transactions: Some(false),
297 min_balance: Some(0),
298 gas_limit_estimation: Some(false),
299 }),
300 signer_id: "test-signer".to_string(),
301 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
302 notification_id: Some("test-notification".to_string()),
303 system_disabled: true,
304 disabled_reason: Some(DisabledReason::RpcValidationFailed(
305 "RPC unavailable".to_string(),
306 )),
307 custom_rpc_urls: None,
308 }
309 }
310
311 #[tokio::test]
312 async fn test_health_check_data_structure() {
313 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
315 assert_eq!(health_check.relayer_id, "test-relayer");
316 assert_eq!(health_check.retry_count, 0);
317
318 let health_check_retry =
320 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 5);
321 assert_eq!(health_check_retry.retry_count, 5);
322
323 let expected_delay = calculate_backoff_delay(5);
325 assert_eq!(expected_delay, Duration::from_secs(60)); }
327
328 #[tokio::test]
330 async fn test_relayer_health_check_handler_impl_exits_on_enabled() {
331 use crate::jobs::MockJobProducerTrait;
332 use crate::models::AppState;
333 use crate::repositories::{
334 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
335 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
336 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
337 };
338 use std::sync::Arc;
339
340 let mock_job_producer = MockJobProducerTrait::new();
342
343 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
345
346 let mut relayer = create_disabled_relayer("test-handler-enabled");
348 relayer.system_disabled = false;
349 relayer.disabled_reason = None;
350 relayer_repo.create(relayer).await.unwrap();
351
352 let app_state = Data::new(actix_web::web::ThinData(AppState {
354 relayer_repository: relayer_repo,
355 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
356 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
357 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
358 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
359 transaction_counter_store: Arc::new(
360 TransactionCounterRepositoryStorage::new_in_memory(),
361 ),
362 job_producer: Arc::new(mock_job_producer),
363 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
364 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
365 }));
366
367 let health_check = RelayerHealthCheck::new("test-handler-enabled".to_string());
369 let job = Job::new(crate::jobs::JobType::RelayerHealthCheck, health_check);
370 let attempt = Attempt::new_with_value(1);
371
372 let result = relayer_health_check_handler_impl(job, app_state, attempt).await;
374
375 assert!(result.is_ok());
377 }
378
379 #[tokio::test]
380 async fn test_relayer_health_check_backoff_progression() {
381 let delays: Vec<Duration> = (0..6).map(calculate_backoff_delay).collect();
383
384 assert_eq!(delays[0], Duration::from_secs(10)); assert_eq!(delays[1], Duration::from_secs(20)); assert_eq!(delays[2], Duration::from_secs(30)); assert_eq!(delays[3], Duration::from_secs(45)); assert_eq!(delays[4], Duration::from_secs(60)); assert_eq!(delays[5], Duration::from_secs(60)); for i in 0..4 {
394 assert!(
395 delays[i] < delays[i + 1],
396 "Delay should increase with retry count"
397 );
398 }
399
400 assert_eq!(delays[4], delays[5], "Delay should cap at 60 seconds");
402 }
403
404 #[tokio::test]
405 async fn test_disabled_reason_is_preserved() {
406 use crate::repositories::RelayerRepositoryStorage;
408 let repo = RelayerRepositoryStorage::new_in_memory();
409
410 let relayer = create_disabled_relayer("test-relayer-2");
411 let disabled_reason = relayer.disabled_reason.clone();
412
413 repo.create(relayer).await.unwrap();
414
415 let retrieved = repo.get_by_id("test-relayer-2".to_string()).await.unwrap();
417
418 assert!(retrieved.system_disabled);
419 assert_eq!(retrieved.disabled_reason, disabled_reason);
420
421 if let Some(reason) = &retrieved.disabled_reason {
423 let description = reason.description();
424 assert!(description.contains("RPC"));
425 }
426 }
427
428 #[tokio::test]
429 async fn test_check_and_reenable_relayer_exits_early_if_not_disabled() {
430 use crate::jobs::MockJobProducerTrait;
431 use crate::models::AppState;
432 use crate::repositories::{
433 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
434 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
435 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
436 };
437 use std::sync::Arc;
438
439 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
441
442 let mut relayer = create_disabled_relayer("test-check-enabled");
444 relayer.system_disabled = false;
445 relayer.disabled_reason = None;
446 relayer_repo.create(relayer).await.unwrap();
447
448 let mock_job_producer = MockJobProducerTrait::new();
450
451 let app_state = AppState {
453 relayer_repository: relayer_repo.clone(),
454 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
455 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
456 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
457 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
458 transaction_counter_store: Arc::new(
459 TransactionCounterRepositoryStorage::new_in_memory(),
460 ),
461 job_producer: Arc::new(mock_job_producer),
462 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
463 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
464 };
465
466 let health_check = RelayerHealthCheck::new("test-check-enabled".to_string());
468
469 let thin_app_state = actix_web::web::ThinData(app_state);
471
472 let result = check_and_reenable_relayer(health_check, &thin_app_state).await;
474
475 assert!(result.is_ok());
477
478 let retrieved = relayer_repo
480 .get_by_id("test-check-enabled".to_string())
481 .await
482 .unwrap();
483 assert!(!retrieved.system_disabled);
484 assert!(retrieved.disabled_reason.is_none());
485 }
486
487 #[tokio::test]
488 async fn test_check_and_reenable_variant_comparison() {
489 use crate::models::DisabledReason;
491
492 let reason1 = DisabledReason::RpcValidationFailed("Error A".to_string());
494 let reason2 = DisabledReason::RpcValidationFailed("Error B".to_string());
495 assert!(reason1.same_variant(&reason2));
496
497 let reason3 = DisabledReason::NonceSyncFailed("Error".to_string());
499 assert!(!reason1.same_variant(&reason3));
500
501 let multi1 = DisabledReason::Multiple(vec![
503 DisabledReason::RpcValidationFailed("A".to_string()),
504 DisabledReason::NonceSyncFailed("B".to_string()),
505 ]);
506 let multi2 = DisabledReason::Multiple(vec![
507 DisabledReason::RpcValidationFailed("C".to_string()),
508 DisabledReason::NonceSyncFailed("D".to_string()),
509 ]);
510 assert!(multi1.same_variant(&multi2));
511
512 let multi3 = DisabledReason::Multiple(vec![
514 DisabledReason::RpcValidationFailed("A".to_string()),
515 DisabledReason::BalanceCheckFailed("B".to_string()),
516 ]);
517 assert!(!multi1.same_variant(&multi3));
518 }
519
520 #[tokio::test]
521 async fn test_backoff_delay_calculation_edge_cases() {
522 let delay0 = calculate_backoff_delay(0);
526 assert_eq!(delay0, Duration::from_secs(10));
527
528 let delay_large = calculate_backoff_delay(100);
530 assert_eq!(delay_large, Duration::from_secs(60));
531
532 let mut prev_delay = Duration::from_secs(0);
534 for retry in 0..10 {
535 let delay = calculate_backoff_delay(retry);
536 if delay < Duration::from_secs(60) {
537 assert!(delay > prev_delay, "Retry {}: delay should increase", retry);
539 } else {
540 assert_eq!(
542 delay,
543 Duration::from_secs(60),
544 "Retry {}: should cap at 60s",
545 retry
546 );
547 }
548 prev_delay = delay;
549 }
550 }
551
552 #[tokio::test]
553 async fn test_disabled_reason_from_health_failures() {
554 use crate::models::{DisabledReason, HealthCheckFailure};
555
556 let empty_result = DisabledReason::from_health_failures(vec![]);
558 assert!(empty_result.is_none());
559
560 let single_failure = vec![HealthCheckFailure::RpcValidationFailed(
562 "RPC down".to_string(),
563 )];
564 let single_result = DisabledReason::from_health_failures(single_failure);
565 assert!(single_result.is_some());
566 match single_result.unwrap() {
567 DisabledReason::RpcValidationFailed(msg) => {
568 assert_eq!(msg, "RPC down");
569 }
570 _ => panic!("Expected RpcValidationFailed variant"),
571 }
572
573 let multiple_failures = vec![
575 HealthCheckFailure::RpcValidationFailed("RPC error".to_string()),
576 HealthCheckFailure::NonceSyncFailed("Nonce error".to_string()),
577 ];
578 let multiple_result = DisabledReason::from_health_failures(multiple_failures);
579 assert!(multiple_result.is_some());
580 match multiple_result.unwrap() {
581 DisabledReason::Multiple(reasons) => {
582 assert_eq!(reasons.len(), 2);
583 assert!(matches!(reasons[0], DisabledReason::RpcValidationFailed(_)));
584 assert!(matches!(reasons[1], DisabledReason::NonceSyncFailed(_)));
585 }
586 _ => panic!("Expected Multiple variant"),
587 }
588 }
589
590 #[tokio::test]
591 async fn test_relayer_health_check_retry_count_increments() {
592 let retry_counts = vec![0, 1, 2, 5, 10];
594
595 for retry_count in retry_counts {
596 let health_check =
597 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count);
598
599 assert_eq!(health_check.retry_count, retry_count);
601
602 let next_health_check =
604 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count + 1);
605 assert_eq!(next_health_check.retry_count, retry_count + 1);
606
607 let current_delay = calculate_backoff_delay(retry_count);
609 let next_delay = calculate_backoff_delay(retry_count + 1);
610
611 if current_delay < Duration::from_secs(60) {
612 assert!(next_delay >= current_delay);
613 } else {
614 assert_eq!(next_delay, Duration::from_secs(60));
615 }
616 }
617 }
618
619 #[tokio::test]
620 async fn test_repository_enable_disable_operations() {
621 use crate::models::DisabledReason;
622 use crate::repositories::{RelayerRepositoryStorage, Repository};
623
624 let repo = RelayerRepositoryStorage::new_in_memory();
625
626 let mut relayer = create_disabled_relayer("test-enable-disable");
628 relayer.system_disabled = false;
629 relayer.disabled_reason = None;
630 repo.create(relayer).await.unwrap();
631
632 let reason = DisabledReason::RpcValidationFailed("Test error".to_string());
634 let disabled = repo
635 .disable_relayer("test-enable-disable".to_string(), reason.clone())
636 .await
637 .unwrap();
638
639 assert!(disabled.system_disabled);
640 assert_eq!(disabled.disabled_reason, Some(reason));
641
642 let enabled = repo
644 .enable_relayer("test-enable-disable".to_string())
645 .await
646 .unwrap();
647
648 assert!(!enabled.system_disabled);
649 assert!(enabled.disabled_reason.is_none());
650
651 let retrieved = repo
653 .get_by_id("test-enable-disable".to_string())
654 .await
655 .unwrap();
656 assert!(!retrieved.system_disabled);
657 assert!(retrieved.disabled_reason.is_none());
658 }
659
660 #[tokio::test]
661 async fn test_disabled_reason_safe_description() {
662 use crate::models::DisabledReason;
663
664 let reasons = vec![
666 DisabledReason::NonceSyncFailed("Error with API key abc123".to_string()),
667 DisabledReason::RpcValidationFailed(
668 "RPC error: http://secret-rpc.com:8545".to_string(),
669 ),
670 DisabledReason::BalanceCheckFailed("Balance: 1.5 ETH at address 0x123...".to_string()),
671 ];
672
673 for reason in reasons {
674 let safe_desc = reason.safe_description();
675
676 assert!(!safe_desc.contains("abc123"));
678 assert!(!safe_desc.contains("http://"));
679 assert!(!safe_desc.contains("0x123"));
680 assert!(!safe_desc.contains("1.5 ETH"));
681
682 assert!(!safe_desc.is_empty());
684 }
685
686 let multiple = DisabledReason::Multiple(vec![
688 DisabledReason::RpcValidationFailed("Secret RPC info".to_string()),
689 DisabledReason::NonceSyncFailed("Secret nonce info".to_string()),
690 ]);
691
692 let safe_desc = multiple.safe_description();
693 assert!(!safe_desc.contains("Secret"));
694 assert!(safe_desc.contains("RPC endpoint validation failed"));
695 assert!(safe_desc.contains("Nonce synchronization failed"));
696 }
697}