1use rand::Rng;
24use std::future::Future;
25use std::time::Duration;
26
27use super::rpc_selector::RpcSelector;
28use crate::config::ServerConfig;
29use crate::constants::RETRY_JITTER_PERCENT;
30
31pub fn calculate_retry_delay(attempt: u8, base_delay_ms: u64, max_delay_ms: u64) -> Duration {
41 if base_delay_ms == 0 || max_delay_ms == 0 {
42 return Duration::from_millis(0);
43 }
44
45 let exp_backoff = if attempt > 63 {
47 max_delay_ms
48 } else {
49 let multiplier = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
51 base_delay_ms.saturating_mul(multiplier)
52 };
53
54 let delay_ms = exp_backoff.min(max_delay_ms);
55
56 apply_jitter(delay_ms)
57}
58
59fn apply_jitter(delay_ms: u64) -> Duration {
71 if delay_ms == 0 {
72 return Duration::from_millis(0);
73 }
74
75 let jitter_range = (delay_ms as f64 * RETRY_JITTER_PERCENT).floor() as u64;
77
78 if jitter_range == 0 {
79 return Duration::from_millis(delay_ms);
80 }
81
82 let mut rng = rand::rng();
83 let jitter_value = rng.random_range(0..=jitter_range);
84
85 let final_delay = if rng.random_bool(0.5) {
86 delay_ms.saturating_add(jitter_value)
87 } else {
88 delay_ms.saturating_sub(jitter_value)
89 };
90
91 Duration::from_millis(final_delay)
92}
93
94#[derive(Debug)]
96enum InternalRetryError<E> {
97 NonRetriable(E),
98 RetriesExhausted(E),
99}
100
101#[derive(Debug, Clone)]
103pub struct RetryConfig {
104 pub max_retries: u8,
106 pub max_failovers: u8,
108 pub base_delay_ms: u64,
110 pub max_delay_ms: u64,
112}
113
114impl RetryConfig {
115 pub fn new(max_retries: u8, max_failovers: u8, base_delay_ms: u64, max_delay_ms: u64) -> Self {
127 if (base_delay_ms == 0) != (max_delay_ms == 0) {
129 panic!(
130 "Delay values must be consistent: both zero (no delays) or both non-zero. Got base_delay_ms={base_delay_ms}, max_delay_ms={max_delay_ms}"
131 );
132 }
133
134 if base_delay_ms > 0 && max_delay_ms > 0 && max_delay_ms < base_delay_ms {
136 panic!(
137 "max_delay_ms ({max_delay_ms}) must be >= base_delay_ms ({base_delay_ms}) when both are non-zero"
138 );
139 }
140
141 Self {
142 max_retries,
143 max_failovers,
144 base_delay_ms,
145 max_delay_ms,
146 }
147 }
148
149 pub fn from_env() -> Self {
151 let config = ServerConfig::from_env();
152 Self::new(
153 config.provider_max_retries,
154 config.provider_max_failovers,
155 config.provider_retry_base_delay_ms,
156 config.provider_retry_max_delay_ms,
157 )
158 }
159}
160
161pub async fn retry_rpc_call<P, T, E, F, Fut, I>(
191 selector: &RpcSelector,
192 operation_name: &str,
193 is_retriable_error: impl Fn(&E) -> bool,
194 should_mark_provider_failed: impl Fn(&E) -> bool,
195 provider_initializer: I,
196 operation: F,
197 config: Option<RetryConfig>,
198) -> Result<T, E>
199where
200 P: Clone,
201 E: std::fmt::Display + From<String>,
202 F: Fn(P) -> Fut,
203 Fut: Future<Output = Result<T, E>>,
204 I: Fn(&str) -> Result<P, E>,
205{
206 let config = config.unwrap_or_else(RetryConfig::from_env);
207 let total_providers = selector.provider_count();
208 let max_failovers = std::cmp::min(config.max_failovers as usize, total_providers - 1);
209 let mut failover_count = 0;
210 let mut total_attempts = 0;
211 let mut last_error = None;
212
213 tracing::debug!(
214 operation_name = %operation_name,
215 max_retries = %config.max_retries,
216 max_failovers = %max_failovers,
217 total_providers = %total_providers,
218 "starting rpc call"
219 );
220
221 while failover_count <= max_failovers && selector.available_provider_count() > 0 {
222 let (provider, provider_url) =
224 match get_provider(selector, operation_name, &provider_initializer) {
225 Ok((provider, url)) => (provider, url),
226 Err(e) => {
227 last_error = Some(e);
228 failover_count += 1;
229
230 if failover_count > max_failovers || selector.available_provider_count() == 0 {
232 break;
233 }
234
235 selector.mark_current_as_failed();
237 continue;
238 }
239 };
240
241 tracing::debug!(
242 provider_url = %provider_url,
243 operation_name = %operation_name,
244 "selected provider"
245 );
246
247 match try_with_retries(
249 &provider,
250 &provider_url,
251 operation_name,
252 &operation,
253 &is_retriable_error,
254 &config,
255 &mut total_attempts,
256 )
257 .await
258 {
259 Ok(result) => {
260 tracing::debug!(
261 operation_name = %operation_name,
262 provider_url = %provider_url,
263 total_attempts = %total_attempts,
264 "rpc call succeeded"
265 );
266 return Ok(result);
267 }
268 Err(internal_err) => {
269 match internal_err {
270 InternalRetryError::NonRetriable(original_err) => {
271 if should_mark_provider_failed(&original_err)
273 && selector.available_provider_count() > 1
274 {
275 tracing::warn!(
276 error = %original_err,
277 provider_url = %provider_url,
278 operation_name = %operation_name,
279 "non-retriable error should mark provider as failed, marking as failed and switching to next provider"
280 );
281 selector.mark_current_as_failed();
282 }
283 return Err(original_err);
284 }
285 InternalRetryError::RetriesExhausted(original_err) => {
286 last_error = Some(original_err);
287
288 if selector.available_provider_count() > 1 {
291 tracing::warn!(
292 max_retries = %config.max_retries,
293 provider_url = %provider_url,
294 operation_name = %operation_name,
295 error = %last_error.as_ref().unwrap(),
296 failover_count = %(failover_count + 1),
297 max_failovers = %max_failovers,
298 "all retry attempts failed, marking as failed and switching to next provider"
299 );
300 selector.mark_current_as_failed();
301 failover_count += 1;
302 } else {
303 tracing::warn!(
304 max_retries = %config.max_retries,
305 provider_url = %provider_url,
306 operation_name = %operation_name,
307 error = %last_error.as_ref().unwrap(),
308 "all retry attempts failed, this is the last available provider, not marking as failed"
309 );
310 break;
311 }
312 }
313 }
314 }
315 }
316 }
317
318 match &last_error {
319 Some(e) => {
320 tracing::error!(
321 operation_name = %operation_name,
322 total_attempts = %total_attempts,
323 failover_count = %failover_count,
324 error = %e,
325 "rpc call failed after attempts across providers"
326 );
327 }
328 None => {
329 tracing::error!(
330 operation_name = %operation_name,
331 total_attempts = %total_attempts,
332 failover_count = %failover_count,
333 "rpc call failed after attempts across providers with no error details"
334 );
335 }
336 }
337
338 let error_message = match &last_error {
339 Some(e) => format!(
340 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers: {e}"
341 ),
342 None => format!(
343 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers with no error details"
344 )
345 };
346
347 Err(last_error.unwrap_or_else(|| E::from(error_message)))
349}
350
351fn get_provider<P, E, I>(
353 selector: &RpcSelector,
354 operation_name: &str,
355 provider_initializer: &I,
356) -> Result<(P, String), E>
357where
358 E: std::fmt::Display + From<String>,
359 I: Fn(&str) -> Result<P, E>,
360{
361 let provider_url = selector
363 .get_client(|url| Ok::<_, eyre::Report>(url.to_string()))
364 .map_err(|e| {
365 let err_msg = format!("Failed to get provider URL for {operation_name}: {e}");
366 tracing::warn!(operation_name = %operation_name, error = %e, "failed to get provider url");
367 E::from(err_msg)
368 })?;
369
370 let provider = provider_initializer(&provider_url).map_err(|e| {
372 tracing::warn!(
373 provider_url = %provider_url,
374 operation_name = %operation_name,
375 error = %e,
376 "failed to initialize provider"
377 );
378 e
379 })?;
380
381 Ok((provider, provider_url))
382}
383
384async fn try_with_retries<P, T, E, F, Fut>(
386 provider: &P,
387 provider_url: &str,
388 operation_name: &str,
389 operation: &F,
390 is_retriable_error: &impl Fn(&E) -> bool,
391 config: &RetryConfig,
392 total_attempts: &mut usize,
393) -> Result<T, InternalRetryError<E>>
394where
395 P: Clone,
396 E: std::fmt::Display + From<String>,
397 F: Fn(P) -> Fut,
398 Fut: Future<Output = Result<T, E>>,
399{
400 if config.max_retries <= 1 {
402 *total_attempts += 1;
403 return operation(provider.clone())
404 .await
405 .map_err(InternalRetryError::NonRetriable);
406 }
407
408 for current_attempt_idx in 0..config.max_retries {
409 *total_attempts += 1;
410
411 match operation(provider.clone()).await {
412 Ok(result) => {
413 tracing::debug!(
414 operation_name = %operation_name,
415 provider_url = %provider_url,
416 attempt = %(current_attempt_idx + 1),
417 max_retries = %config.max_retries,
418 total_attempts = %*total_attempts,
419 "rpc call succeeded"
420 );
421 return Ok(result);
422 }
423 Err(e) => {
424 let is_retriable = is_retriable_error(&e);
425 let is_last_attempt = current_attempt_idx + 1 >= config.max_retries;
426
427 tracing::warn!(
428 operation_name = %operation_name,
429 provider_url = %provider_url,
430 attempt = %(current_attempt_idx + 1),
431 max_retries = %config.max_retries,
432 error = %e,
433 retriable = %is_retriable,
434 "rpc call failed"
435 );
436
437 if !is_retriable {
438 return Err(InternalRetryError::NonRetriable(e));
439 }
440
441 if is_last_attempt {
442 tracing::warn!(
443 max_retries = %config.max_retries,
444 operation_name = %operation_name,
445 provider_url = %provider_url,
446 error = %e,
447 "all retries exhausted"
448 );
449 return Err(InternalRetryError::RetriesExhausted(e));
450 }
451
452 let delay = calculate_retry_delay(
454 current_attempt_idx + 1,
455 config.base_delay_ms,
456 config.max_delay_ms,
457 );
458
459 tracing::debug!(
460 operation_name = %operation_name,
461 provider_url = %provider_url,
462 delay = ?delay,
463 next_attempt = %(current_attempt_idx + 2),
464 max_retries = %config.max_retries,
465 "retrying rpc call after delay"
466 );
467 tokio::time::sleep(delay).await;
468 }
469 }
470 }
471
472 unreachable!(
473 "Loop should have returned if max_retries > 1; max_retries=0 or 1 case is handled above."
474 );
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use crate::models::RpcConfig;
481 use lazy_static::lazy_static;
482 use std::cmp::Ordering;
483 use std::env;
484 use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
485 use std::sync::Arc;
486 use std::sync::Mutex;
487
488 lazy_static! {
490 static ref RETRY_TEST_ENV_MUTEX: Mutex<()> = Mutex::new(());
491 }
492
493 #[derive(Debug, Clone)]
495 struct TestError(String);
496
497 impl std::fmt::Display for TestError {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 write!(f, "TestError: {}", self.0)
500 }
501 }
502
503 impl From<String> for TestError {
504 fn from(msg: String) -> Self {
505 TestError(msg)
506 }
507 }
508
509 struct EnvGuard {
511 keys: Vec<String>,
512 old_values: Vec<Option<String>>,
513 }
514
515 impl EnvGuard {
516 fn new() -> Self {
517 Self {
518 keys: Vec::new(),
519 old_values: Vec::new(),
520 }
521 }
522
523 fn set(&mut self, key: &str, value: &str) {
524 let old_value = env::var(key).ok();
525 self.keys.push(key.to_string());
526 self.old_values.push(old_value);
527 env::set_var(key, value);
528 }
529 }
530
531 impl Drop for EnvGuard {
532 fn drop(&mut self) {
533 for i in 0..self.keys.len() {
534 match &self.old_values[i] {
535 Some(value) => env::set_var(&self.keys[i], value),
536 None => env::remove_var(&self.keys[i]),
537 }
538 }
539 }
540 }
541
542 fn setup_test_env() -> EnvGuard {
544 let mut guard = EnvGuard::new();
545 guard.set("API_KEY", "fake-api-key-for-tests-01234567890123456789");
546 guard.set("PROVIDER_MAX_RETRIES", "2");
547 guard.set("PROVIDER_MAX_FAILOVERS", "1");
548 guard.set("PROVIDER_RETRY_BASE_DELAY_MS", "1");
549 guard.set("PROVIDER_RETRY_MAX_DELAY_MS", "5");
550 guard.set("REDIS_URL", "redis://localhost:6379");
551 guard.set(
552 "RELAYER_PRIVATE_KEY",
553 "0x1234567890123456789012345678901234567890123456789012345678901234",
554 );
555 guard
556 }
557
558 #[test]
559 fn test_calculate_retry_delay() {
560 let base_delay_ms = 10;
562 let max_delay_ms = 10000;
563
564 let expected_backoffs = [
565 10, 20, 40, 80, 160, 320, ];
572
573 for (i, expected) in expected_backoffs.iter().enumerate() {
574 let attempt = i as u8;
575 let delay = calculate_retry_delay(attempt, base_delay_ms, max_delay_ms);
576
577 let min_expected = (*expected as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
578 let max_expected = (*expected as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
579
580 assert!(
581 (min_expected..=max_expected).contains(&delay.as_millis()),
582 "Delay {} outside expected range {}..={}",
583 delay.as_millis(),
584 min_expected,
585 max_expected
586 );
587 }
588
589 let base_delay_ms = 100;
591 let max_delay_ms = 1000;
592 let delay = calculate_retry_delay(4, base_delay_ms, max_delay_ms);
593 let min_expected = (max_delay_ms as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
594 let max_expected = (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
595 assert!(
596 (min_expected..=max_expected).contains(&delay.as_millis()),
597 "Delay {} outside expected range {}..={}",
598 delay.as_millis(),
599 min_expected,
600 max_expected
601 );
602
603 assert_eq!(calculate_retry_delay(5, 0, 1000).as_millis(), 0);
605 assert_eq!(calculate_retry_delay(5, 100, 0).as_millis(), 0);
606 assert_eq!(calculate_retry_delay(5, 0, 0).as_millis(), 0);
607
608 let max_delay_ms = 10_000;
610 let delay = calculate_retry_delay(u8::MAX, 1, max_delay_ms);
611 assert!(
612 delay.as_millis()
613 <= (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128
614 );
615 }
616
617 #[test]
618 fn test_apply_jitter() {
619 let base_delay = 1000;
620 let jittered = apply_jitter(base_delay);
621
622 let min_expected = (base_delay as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u64;
623 let max_expected = (base_delay as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u64;
624
625 assert!(
626 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
627 "Jittered value {} outside expected range {}..={}",
628 jittered.as_millis(),
629 min_expected,
630 max_expected
631 );
632
633 assert_eq!(apply_jitter(0).as_millis(), 0);
635
636 for delay in 1..5 {
638 let jittered = apply_jitter(delay);
639 let jitter_range = (delay as f64 * RETRY_JITTER_PERCENT).floor() as u64;
640
641 if jitter_range == 0 {
642 assert_eq!(jittered.as_millis(), delay as u128);
643 } else {
644 let min_expected = delay.saturating_sub(jitter_range);
645 let max_expected = delay.saturating_add(jitter_range);
646 assert!(
647 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
648 "Jittered value {} outside expected range {}..={}",
649 jittered.as_millis(),
650 min_expected,
651 max_expected
652 );
653 }
654 }
655
656 let base_delay = 10000;
657 let iterations = 200;
658 let mut additions = 0;
659 let mut subtractions = 0;
660
661 for _ in 0..iterations {
662 let jittered = apply_jitter(base_delay);
663 let j_millis = jittered.as_millis();
664 let b_delay = base_delay as u128;
665
666 match j_millis.cmp(&b_delay) {
667 Ordering::Greater => {
668 additions += 1;
669 }
670 Ordering::Less => {
671 subtractions += 1;
672 }
673 Ordering::Equal => {}
674 }
675 }
676
677 assert!(additions > 0, "No additions were observed");
678 assert!(subtractions > 0, "No subtractions were observed");
679 }
680
681 #[test]
682 fn test_retry_config() {
683 let config = RetryConfig::new(5, 2, 100, 5000);
684 assert_eq!(config.max_retries, 5);
685 assert_eq!(config.max_failovers, 2);
686 assert_eq!(config.base_delay_ms, 100);
687 assert_eq!(config.max_delay_ms, 5000);
688 }
689
690 #[test]
691 fn test_retry_config_from_env() {
692 let _lock = RETRY_TEST_ENV_MUTEX
693 .lock()
694 .unwrap_or_else(|e| e.into_inner());
695 let mut guard = setup_test_env();
696 guard.set("REDIS_URL", "redis://localhost:6379");
698 guard.set(
699 "RELAYER_PRIVATE_KEY",
700 "0x1234567890123456789012345678901234567890123456789012345678901234",
701 );
702
703 let config = RetryConfig::from_env();
704 assert_eq!(config.max_retries, 2);
705 assert_eq!(config.max_failovers, 1);
706 assert_eq!(config.base_delay_ms, 1);
707 assert_eq!(config.max_delay_ms, 5);
708 }
709
710 #[test]
711 fn test_calculate_retry_delay_edge_cases() {
712 let delay = calculate_retry_delay(0, 100, 1000);
714 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
715 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
716 assert!(
717 (min_expected..=max_expected).contains(&delay.as_millis()),
718 "Delay {} outside expected range {}..={}",
719 delay.as_millis(),
720 min_expected,
721 max_expected
722 );
723
724 let delay = calculate_retry_delay(5, 100, 100);
726 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
727 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
728 assert!(
729 (min_expected..=max_expected).contains(&delay.as_millis()),
730 "Delay {} outside expected range {}..={}",
731 delay.as_millis(),
732 min_expected,
733 max_expected
734 );
735
736 let delay = calculate_retry_delay(60, 1000, u64::MAX);
738 assert!(delay.as_millis() > 0);
739
740 let delay = calculate_retry_delay(1, 1, 1);
742 assert_eq!(delay.as_millis(), 1);
743 }
744
745 #[test]
746 fn test_retry_config_validation() {
747 let _config = RetryConfig::new(3, 1, 100, 1000);
749 let _config = RetryConfig::new(3, 1, 0, 0); let _config = RetryConfig::new(3, 1, 100, 100); let _config = RetryConfig::new(0, 0, 1, 1); let _config = RetryConfig::new(255, 255, 1, 1000); }
754
755 #[test]
756 #[should_panic(
757 expected = "max_delay_ms (50) must be >= base_delay_ms (100) when both are non-zero"
758 )]
759 fn test_retry_config_validation_panic_delay_ordering() {
760 let _config = RetryConfig::new(3, 1, 100, 50);
762 }
763
764 #[test]
765 #[should_panic(
766 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
767 )]
768 fn test_retry_config_validation_panic_inconsistent_delays_base_zero() {
769 let _config = RetryConfig::new(3, 1, 0, 1000);
771 }
772
773 #[test]
774 #[should_panic(
775 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
776 )]
777 fn test_retry_config_validation_panic_inconsistent_delays_max_zero() {
778 let _config = RetryConfig::new(3, 1, 100, 0);
780 }
781
782 #[test]
783 fn test_get_provider() {
784 let _guard = setup_test_env();
785
786 let configs = vec![
787 RpcConfig::new("http://localhost:8545".to_string()),
788 RpcConfig::new("http://localhost:8546".to_string()),
789 ];
790 let selector = RpcSelector::new(configs).expect("Failed to create selector");
791
792 let initializer =
793 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
794
795 let result = get_provider(&selector, "test_operation", &initializer);
796 assert!(result.is_ok());
797 let (provider, url) = result.unwrap();
798 assert_eq!(url, "http://localhost:8545");
799 assert_eq!(provider, "provider-http://localhost:8545");
800
801 let initializer = |_: &str| -> Result<String, TestError> {
802 Err(TestError("Failed to initialize".to_string()))
803 };
804
805 let result = get_provider(&selector, "test_operation", &initializer);
806 assert!(result.is_err());
807 let err = result.unwrap_err();
808 assert!(format!("{}", err).contains("Failed to initialize"));
809 }
810
811 #[tokio::test]
812 async fn test_try_with_retries() {
813 let provider = "test_provider".to_string();
814 let provider_url = "http://localhost:8545";
815 let mut total_attempts = 0;
816 let config = RetryConfig::new(3, 1, 5, 10);
817
818 let operation = |p: String| async move {
819 assert_eq!(p, "test_provider");
820 Ok::<_, TestError>(42)
821 };
822
823 let result = try_with_retries(
824 &provider,
825 provider_url,
826 "test_operation",
827 &operation,
828 &|_| false,
829 &config,
830 &mut total_attempts,
831 )
832 .await;
833
834 assert!(result.is_ok());
835 assert_eq!(result.unwrap(), 42);
836 assert_eq!(total_attempts, 1);
837
838 let attempts = Arc::new(AtomicU8::new(0));
839 let attempts_clone = attempts.clone();
840 let operation = move |_: String| {
841 let attempts = attempts_clone.clone();
842 async move {
843 let current = attempts.fetch_add(1, AtomicOrdering::SeqCst);
844 if current < 2 {
845 Err(TestError("Retriable error".to_string()))
846 } else {
847 Ok(42)
848 }
849 }
850 };
851
852 let mut total_attempts = 0;
853 let result = try_with_retries(
854 &provider,
855 provider_url,
856 "test_operation",
857 &operation,
858 &|_| true,
859 &config,
860 &mut total_attempts,
861 )
862 .await;
863
864 assert!(result.is_ok());
865 assert_eq!(result.unwrap(), 42);
866 assert_eq!(total_attempts, 3);
867
868 let operation = |_: String| async { Err(TestError("Non-retriable error".to_string())) };
870
871 let mut total_attempts = 0;
872 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
873 &provider,
874 provider_url,
875 "test_operation",
876 &operation,
877 &|_| false,
878 &config,
879 &mut total_attempts,
880 )
881 .await;
882
883 assert!(result.is_err());
884 assert_eq!(total_attempts, 1);
885 let err = result.unwrap_err();
886 assert!(matches!(err, InternalRetryError::NonRetriable(_)));
887
888 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
890
891 let mut total_attempts = 0;
892 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
893 &provider,
894 provider_url,
895 "test_operation",
896 &operation,
897 &|_| true,
898 &config,
899 &mut total_attempts,
900 )
901 .await;
902
903 assert!(result.is_err());
904 assert_eq!(total_attempts, 3); let error = result.unwrap_err();
906 assert!(matches!(error, InternalRetryError::RetriesExhausted(_)));
907 }
908
909 #[tokio::test]
910 async fn test_try_with_retries_max_retries_zero() {
911 let provider = "test_provider".to_string();
912 let provider_url = "http://localhost:8545";
913 let mut total_attempts = 0;
914 let config = RetryConfig::new(0, 1, 5, 10);
915
916 let operation = |_p: String| async move { Ok::<_, TestError>(42) };
918
919 let result = try_with_retries(
920 &provider,
921 provider_url,
922 "test_operation",
923 &operation,
924 &|_| false,
925 &config,
926 &mut total_attempts,
927 )
928 .await;
929
930 assert!(result.is_ok());
931 assert_eq!(result.unwrap(), 42);
932
933 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
935
936 let mut total_attempts = 0;
937 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
938 &provider,
939 provider_url,
940 "test_operation",
941 &operation,
942 &|_| true,
943 &config,
944 &mut total_attempts,
945 )
946 .await;
947
948 assert!(result.is_err());
949 let error = result.unwrap_err();
950 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
952
953 #[tokio::test]
954 async fn test_try_with_retries_max_retries_one() {
955 let provider = "test_provider".to_string();
956 let provider_url = "http://localhost:8545";
957 let mut total_attempts = 0;
958 let config = RetryConfig::new(1, 1, 5, 10);
959
960 let operation = |p: String| async move {
962 assert_eq!(p, "test_provider");
963 Ok::<_, TestError>(42)
964 };
965
966 let result = try_with_retries(
967 &provider,
968 provider_url,
969 "test_operation",
970 &operation,
971 &|_| false,
972 &config,
973 &mut total_attempts,
974 )
975 .await;
976
977 assert!(result.is_ok());
978 assert_eq!(result.unwrap(), 42);
979
980 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
982
983 let mut total_attempts = 0;
984 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
985 &provider,
986 provider_url,
987 "test_operation",
988 &operation,
989 &|_| true,
990 &config,
991 &mut total_attempts,
992 )
993 .await;
994
995 assert!(result.is_err());
996 let error = result.unwrap_err();
997 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
999
1000 #[tokio::test]
1001 async fn test_non_retriable_error_does_not_mark_provider_failed() {
1002 let _guard = setup_test_env();
1003
1004 let configs = vec![
1005 RpcConfig::new("http://localhost:8545".to_string()),
1006 RpcConfig::new("http://localhost:8546".to_string()),
1007 ];
1008 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1009
1010 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1011
1012 let operation =
1014 |_provider: String| async move { Err(TestError("Non-retriable error".to_string())) };
1015
1016 let config = RetryConfig::new(3, 1, 0, 0);
1017
1018 let initial_available_count = selector.available_provider_count();
1020
1021 let result: Result<i32, TestError> = retry_rpc_call(
1022 &selector,
1023 "test_operation",
1024 |_| false, |_| false, provider_initializer,
1027 operation,
1028 Some(config),
1029 )
1030 .await;
1031
1032 assert!(result.is_err());
1033
1034 let final_available_count = selector.available_provider_count();
1036 assert_eq!(
1037 initial_available_count, final_available_count,
1038 "Provider count should remain the same for non-retriable errors"
1039 );
1040 }
1041
1042 #[tokio::test]
1043 async fn test_retriable_error_marks_provider_failed_after_retries_exhausted() {
1044 let _guard = setup_test_env();
1045
1046 let configs = vec![
1047 RpcConfig::new("http://localhost:8545".to_string()),
1048 RpcConfig::new("http://localhost:8546".to_string()),
1049 ];
1050 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1051
1052 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1053
1054 let operation = |_provider: String| async { Err(TestError("Retriable error".to_string())) };
1056
1057 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1061
1062 let result: Result<i32, TestError> = retry_rpc_call(
1063 &selector,
1064 "test_operation",
1065 |_| true, |_| true, provider_initializer,
1068 operation,
1069 Some(config),
1070 )
1071 .await;
1072
1073 assert!(result.is_err());
1074
1075 let final_available_count = selector.available_provider_count();
1077 assert!(final_available_count < initial_available_count,
1078 "At least one provider should be marked as failed after retriable errors exhaust retries");
1079 }
1080
1081 #[tokio::test]
1082 async fn test_retry_rpc_call_success() {
1083 let _guard = setup_test_env();
1084
1085 let configs = vec![
1086 RpcConfig::new("http://localhost:8545".to_string()),
1087 RpcConfig::new("http://localhost:8546".to_string()),
1088 ];
1089 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1090
1091 let attempts = Arc::new(AtomicU8::new(0));
1092 let attempts_clone = attempts.clone();
1093
1094 let provider_initializer =
1095 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1096
1097 let operation = move |_provider: String| {
1098 let attempts = attempts_clone.clone();
1099 async move {
1100 attempts.fetch_add(1, AtomicOrdering::SeqCst);
1101 Ok::<_, TestError>(42)
1102 }
1103 };
1104
1105 let config = RetryConfig::new(1, 1, 0, 0);
1106
1107 let result = retry_rpc_call(
1108 &selector,
1109 "test_operation",
1110 |_| false, |_| false, provider_initializer,
1113 operation,
1114 Some(config),
1115 )
1116 .await;
1117
1118 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1119 assert_eq!(result.unwrap(), 42);
1120 assert_eq!(attempts.load(AtomicOrdering::SeqCst), 1); }
1122
1123 #[tokio::test]
1124 async fn test_retry_rpc_call_with_provider_failover() {
1125 let _guard = setup_test_env();
1126
1127 let configs = vec![
1128 RpcConfig::new("http://localhost:8545".to_string()),
1129 RpcConfig::new("http://localhost:8546".to_string()),
1130 ];
1131 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1132
1133 let current_provider = Arc::new(Mutex::new(String::new()));
1134 let current_provider_clone = current_provider.clone();
1135
1136 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1137 let mut provider = current_provider_clone.lock().unwrap();
1138 *provider = url.to_string();
1139 Ok(url.to_string())
1140 };
1141
1142 let operation = move |provider: String| async move {
1143 if provider.contains("8545") {
1144 Err(TestError("First provider error".to_string()))
1145 } else {
1146 Ok(42)
1147 }
1148 };
1149
1150 let config = RetryConfig::new(2, 1, 0, 0); let result = retry_rpc_call(
1153 &selector,
1154 "test_operation",
1155 |_| true, |_| true, provider_initializer,
1158 operation,
1159 Some(config),
1160 )
1161 .await;
1162
1163 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1164 assert_eq!(result.unwrap(), 42);
1165
1166 let final_provider = current_provider.lock().unwrap().clone();
1168 assert!(
1169 final_provider.contains("8546"),
1170 "Wrong provider selected: {}",
1171 final_provider
1172 );
1173 }
1174
1175 #[tokio::test]
1176 async fn test_retry_rpc_call_all_providers_fail() {
1177 let _guard = setup_test_env();
1178
1179 let configs = vec![
1180 RpcConfig::new("http://localhost:8545".to_string()),
1181 RpcConfig::new("http://localhost:8546".to_string()),
1182 ];
1183 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1184
1185 let provider_initializer =
1186 |_: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1187
1188 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1189
1190 let config = RetryConfig::new(2, 1, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1193 &selector,
1194 "test_operation",
1195 |_| true, |_| false, provider_initializer,
1198 operation,
1199 Some(config),
1200 )
1201 .await;
1202
1203 assert!(result.is_err(), "Expected an error but got: {:?}", result);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_retry_rpc_call_with_default_config() {
1208 let (_guard, selector) = {
1209 let _lock = RETRY_TEST_ENV_MUTEX
1210 .lock()
1211 .unwrap_or_else(|e| e.into_inner());
1212 let guard = setup_test_env();
1213
1214 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1215 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1216 (guard, selector)
1217 };
1218
1219 let provider_initializer =
1220 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1221
1222 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1223
1224 let result = retry_rpc_call(
1226 &selector,
1227 "test_operation",
1228 |_| false,
1229 |_| false,
1230 provider_initializer,
1231 operation,
1232 None, )
1234 .await;
1235
1236 assert!(result.is_ok());
1237 assert_eq!(result.unwrap(), 42);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_retry_rpc_call_provider_initialization_failures() {
1242 let _guard = setup_test_env();
1243
1244 let configs = vec![
1245 RpcConfig::new("http://localhost:8545".to_string()),
1246 RpcConfig::new("http://localhost:8546".to_string()),
1247 ];
1248 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1249
1250 let attempt_count = Arc::new(AtomicU8::new(0));
1251 let attempt_count_clone = attempt_count.clone();
1252
1253 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1254 let count = attempt_count_clone.fetch_add(1, AtomicOrdering::SeqCst);
1255 if count == 0 && url.contains("8545") {
1256 Err(TestError("First provider init failed".to_string()))
1257 } else {
1258 Ok(url.to_string())
1259 }
1260 };
1261
1262 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1263
1264 let config = RetryConfig::new(2, 1, 0, 0);
1265
1266 let result = retry_rpc_call(
1267 &selector,
1268 "test_operation",
1269 |_| true,
1270 |_| false,
1271 provider_initializer,
1272 operation,
1273 Some(config),
1274 )
1275 .await;
1276
1277 assert!(result.is_ok());
1278 assert_eq!(result.unwrap(), 42);
1279 assert!(attempt_count.load(AtomicOrdering::SeqCst) >= 2); }
1281
1282 #[test]
1283 fn test_get_provider_selector_errors() {
1284 let _guard = setup_test_env();
1285
1286 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1288 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1289
1290 let _ = selector.get_current_url().unwrap(); selector.mark_current_as_failed(); let provider_initializer =
1295 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
1296
1297 let result = get_provider(&selector, "test_operation", &provider_initializer);
1299 assert!(result.is_err());
1300 }
1301
1302 #[tokio::test]
1303 async fn test_last_provider_never_marked_as_failed() {
1304 let _guard = setup_test_env();
1305
1306 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1308 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1309
1310 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1311
1312 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1314
1315 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1319 assert_eq!(initial_available_count, 1);
1320
1321 let result: Result<i32, TestError> = retry_rpc_call(
1322 &selector,
1323 "test_operation",
1324 |_| true, |_| true, provider_initializer,
1327 operation,
1328 Some(config),
1329 )
1330 .await;
1331
1332 assert!(result.is_err());
1333
1334 let final_available_count = selector.available_provider_count();
1336 assert_eq!(
1337 final_available_count, initial_available_count,
1338 "Last provider should never be marked as failed"
1339 );
1340 assert_eq!(
1341 final_available_count, 1,
1342 "Should still have 1 provider available"
1343 );
1344 }
1345
1346 #[tokio::test]
1347 async fn test_last_provider_behavior_with_multiple_providers() {
1348 let _guard = setup_test_env();
1349
1350 let configs = vec![
1352 RpcConfig::new("http://localhost:8545".to_string()),
1353 RpcConfig::new("http://localhost:8546".to_string()),
1354 RpcConfig::new("http://localhost:8547".to_string()),
1355 ];
1356 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1357
1358 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1359
1360 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1362
1363 let config = RetryConfig::new(2, 2, 0, 0); let initial_available_count = selector.available_provider_count();
1367 assert_eq!(initial_available_count, 3);
1368
1369 let result: Result<i32, TestError> = retry_rpc_call(
1370 &selector,
1371 "test_operation",
1372 |_| true, |_| true, provider_initializer,
1375 operation,
1376 Some(config),
1377 )
1378 .await;
1379
1380 assert!(result.is_err());
1381
1382 let final_available_count = selector.available_provider_count();
1384 assert_eq!(
1385 final_available_count, 1,
1386 "Should have exactly 1 provider left (the last one should not be marked as failed)"
1387 );
1388 }
1389
1390 #[tokio::test]
1391 async fn test_non_retriable_error_should_mark_provider_failed() {
1392 let _guard = setup_test_env();
1393
1394 let configs = vec![
1395 RpcConfig::new("http://localhost:8545".to_string()),
1396 RpcConfig::new("http://localhost:8546".to_string()),
1397 ];
1398 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1399
1400 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1401
1402 let operation = |_provider: String| async move {
1404 Err(TestError("Critical non-retriable error".to_string()))
1405 };
1406
1407 let config = RetryConfig::new(3, 1, 0, 0);
1408
1409 let initial_available_count = selector.available_provider_count();
1411 assert_eq!(initial_available_count, 2);
1412
1413 let result: Result<i32, TestError> = retry_rpc_call(
1414 &selector,
1415 "test_operation",
1416 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1419 operation,
1420 Some(config),
1421 )
1422 .await;
1423
1424 assert!(result.is_err());
1425
1426 let final_available_count = selector.available_provider_count();
1428 assert_eq!(final_available_count, 1,
1429 "Provider should be marked as failed when should_mark_provider_failed returns true for non-retriable error");
1430 }
1431
1432 #[tokio::test]
1433 async fn test_non_retriable_error_should_not_mark_provider_failed() {
1434 let _guard = setup_test_env();
1435
1436 let configs = vec![
1437 RpcConfig::new("http://localhost:8545".to_string()),
1438 RpcConfig::new("http://localhost:8546".to_string()),
1439 ];
1440 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1441
1442 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1443
1444 let operation = |_provider: String| async move {
1446 Err(TestError("Minor non-retriable error".to_string()))
1447 };
1448
1449 let config = RetryConfig::new(3, 1, 0, 0);
1450
1451 let initial_available_count = selector.available_provider_count();
1453 assert_eq!(initial_available_count, 2);
1454
1455 let result: Result<i32, TestError> = retry_rpc_call(
1456 &selector,
1457 "test_operation",
1458 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1461 operation,
1462 Some(config),
1463 )
1464 .await;
1465
1466 assert!(result.is_err());
1467
1468 let final_available_count = selector.available_provider_count();
1470 assert_eq!(final_available_count, initial_available_count,
1471 "Provider should NOT be marked as failed when should_mark_provider_failed returns false for non-retriable error");
1472 }
1473
1474 #[tokio::test]
1475 async fn test_retriable_error_ignores_should_mark_provider_failed() {
1476 let _guard = setup_test_env();
1477
1478 let configs = vec![
1479 RpcConfig::new("http://localhost:8545".to_string()),
1480 RpcConfig::new("http://localhost:8546".to_string()),
1481 ];
1482 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1483
1484 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1485
1486 let operation =
1488 |_provider: String| async { Err(TestError("Retriable network error".to_string())) };
1489
1490 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1494 assert_eq!(initial_available_count, 2);
1495
1496 let result: Result<i32, TestError> = retry_rpc_call(
1497 &selector,
1498 "test_operation",
1499 |_| true, |_| false, provider_initializer,
1502 operation,
1503 Some(config),
1504 )
1505 .await;
1506
1507 assert!(result.is_err());
1508
1509 let final_available_count = selector.available_provider_count();
1512 assert!(final_available_count < initial_available_count,
1513 "Provider should be marked as failed when retriable errors exhaust retries, regardless of should_mark_provider_failed");
1514 }
1515
1516 #[tokio::test]
1517 async fn test_mixed_error_scenarios_with_different_marking_behavior() {
1518 let _guard = setup_test_env();
1519
1520 let configs = vec![
1522 RpcConfig::new("http://localhost:8545".to_string()),
1523 RpcConfig::new("http://localhost:8546".to_string()),
1524 ];
1525 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1526
1527 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1528
1529 let operation =
1530 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1531
1532 let config = RetryConfig::new(1, 1, 0, 0);
1533 let initial_count = selector.available_provider_count();
1534
1535 let result: Result<i32, TestError> = retry_rpc_call(
1536 &selector,
1537 "test_operation",
1538 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1541 operation,
1542 Some(config.clone()),
1543 )
1544 .await;
1545
1546 assert!(result.is_err());
1547 let after_critical_count = selector.available_provider_count();
1548 assert_eq!(
1549 after_critical_count,
1550 initial_count - 1,
1551 "Critical error should mark provider as failed"
1552 );
1553
1554 let operation =
1556 |_provider: String| async move { Err(TestError("Minor validation error".to_string())) };
1557
1558 let result: Result<i32, TestError> = retry_rpc_call(
1559 &selector,
1560 "test_operation",
1561 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1564 operation,
1565 Some(config),
1566 )
1567 .await;
1568
1569 assert!(result.is_err());
1570 let final_count = selector.available_provider_count();
1571 assert_eq!(
1572 final_count, after_critical_count,
1573 "Minor error should NOT mark provider as failed"
1574 );
1575 }
1576
1577 #[tokio::test]
1578 async fn test_should_mark_provider_failed_respects_last_provider_protection() {
1579 let _guard = setup_test_env();
1580
1581 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1583 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1584
1585 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1586
1587 let operation =
1589 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1590
1591 let config = RetryConfig::new(1, 1, 0, 0);
1592
1593 let initial_available_count = selector.available_provider_count();
1595 assert_eq!(initial_available_count, 1);
1596
1597 let result: Result<i32, TestError> = retry_rpc_call(
1598 &selector,
1599 "test_operation",
1600 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1603 operation,
1604 Some(config),
1605 )
1606 .await;
1607
1608 assert!(result.is_err());
1609
1610 let final_available_count = selector.available_provider_count();
1612 assert_eq!(final_available_count, initial_available_count,
1613 "Last provider should never be marked as failed, regardless of should_mark_provider_failed");
1614 assert_eq!(
1615 final_available_count, 1,
1616 "Should still have 1 provider available"
1617 );
1618 }
1619
1620 #[tokio::test]
1621 async fn test_should_mark_provider_failed_with_multiple_providers_last_protection() {
1622 let _guard = setup_test_env();
1623
1624 let configs = vec![
1626 RpcConfig::new("http://localhost:8545".to_string()),
1627 RpcConfig::new("http://localhost:8546".to_string()),
1628 ];
1629 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1630
1631 let attempt_count = Arc::new(AtomicU8::new(0));
1632 let attempt_count_clone = attempt_count.clone();
1633
1634 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1635
1636 let operation = move |_provider: String| {
1638 let attempt_count = attempt_count_clone.clone();
1639 async move {
1640 let count = attempt_count.fetch_add(1, AtomicOrdering::SeqCst);
1641 Err(TestError(format!("Critical error #{}", count)))
1642 }
1643 };
1644
1645 let config = RetryConfig::new(1, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1649 assert_eq!(initial_available_count, 2);
1650
1651 let result: Result<i32, TestError> = retry_rpc_call(
1652 &selector,
1653 "test_operation",
1654 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1657 operation,
1658 Some(config),
1659 )
1660 .await;
1661
1662 assert!(result.is_err());
1663
1664 let final_available_count = selector.available_provider_count();
1666 assert_eq!(
1667 final_available_count, 1,
1668 "First provider should be marked as failed, but last provider should be protected"
1669 );
1670 }
1671}