openzeppelin_relayer/jobs/handlers/
solana_swap_request_handler.rs

1//! Solana swap request handling worker implementation.
2//!
3//! This module implements the solana token swap request handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, info, instrument};
10
11use crate::{
12    constants::WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
13    domain::{create_solana_relayer, get_relayer_by_id, SolanaRelayerDexTrait},
14    jobs::{handle_result, Job, SolanaTokenSwapRequest},
15    models::DefaultAppState,
16    observability::request_id::set_request_id,
17    repositories::Repository,
18};
19
20/// Handles incoming swap jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), Error>` - Success or failure of notification processing
28#[instrument(
29    level = "debug",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %attempt.current(),
36        relayer_id = %job.data.relayer_id,
37    )
38)]
39pub async fn solana_token_swap_request_handler(
40    job: Job<SolanaTokenSwapRequest>,
41    context: Data<ThinData<DefaultAppState>>,
42    attempt: Attempt,
43) -> Result<(), Error> {
44    if let Some(request_id) = job.request_id.clone() {
45        set_request_id(request_id);
46    }
47
48    debug!(relayer_id = %job.data.relayer_id, "handling solana token swap request");
49
50    let result = handle_request(job.data, context).await;
51
52    handle_result(
53        result,
54        attempt,
55        "SolanaTokenSwapRequest",
56        WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
57    )
58}
59
60#[derive(Default, Debug, Clone)]
61pub struct CronReminder();
62
63/// Handles incoming swap jobs from the cron queue.
64#[instrument(
65    level = "info",
66    skip(_job, data, relayer_id),
67    fields(
68        job_type = "solana_token_swap_cron",
69        attempt = %attempt.current(),
70    ),
71    err
72)]
73pub async fn solana_token_swap_cron_handler(
74    _job: CronReminder,
75    relayer_id: Data<String>,
76    data: Data<ThinData<DefaultAppState>>,
77    attempt: Attempt,
78) -> Result<(), Error> {
79    info!(
80        relayer_id = %*relayer_id,
81        "handling solana token swap cron request"
82    );
83
84    let result = handle_request(
85        SolanaTokenSwapRequest {
86            relayer_id: relayer_id.to_string(),
87        },
88        data,
89    )
90    .await;
91
92    handle_result(
93        result,
94        attempt,
95        "SolanaTokenSwapRequest",
96        WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
97    )
98}
99
100async fn handle_request(
101    request: SolanaTokenSwapRequest,
102    context: Data<ThinData<DefaultAppState>>,
103) -> Result<()> {
104    debug!(relayer_id = %request.relayer_id, "processing solana token swap");
105
106    let relayer_model = get_relayer_by_id(request.relayer_id.clone(), &context).await?;
107    let signer_model = context
108        .signer_repository
109        .get_by_id(relayer_model.signer_id.clone())
110        .await?;
111
112    let relayer = create_solana_relayer(
113        relayer_model,
114        signer_model,
115        context.relayer_repository(),
116        context.network_repository(),
117        context.transaction_repository(),
118        context.job_producer(),
119    )
120    .await?;
121
122    relayer
123        .handle_token_swap_request(request.relayer_id.clone())
124        .await
125        .map_err(|e| eyre::eyre!("Failed to handle solana token swap request: {}", e))?;
126
127    Ok(())
128}
129
130#[cfg(test)]
131mod tests {}