openzeppelin_relayer/jobs/handlers/
solana_swap_request_handler.rs1use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, info, instrument};
10
11use crate::{
12 constants::WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
13 domain::{create_solana_relayer, get_relayer_by_id, SolanaRelayerDexTrait},
14 jobs::{handle_result, Job, SolanaTokenSwapRequest},
15 models::DefaultAppState,
16 observability::request_id::set_request_id,
17 repositories::Repository,
18};
19
20#[instrument(
29 level = "debug",
30 skip(job, context),
31 fields(
32 request_id = ?job.request_id,
33 job_id = %job.message_id,
34 job_type = %job.job_type.to_string(),
35 attempt = %attempt.current(),
36 relayer_id = %job.data.relayer_id,
37 )
38)]
39pub async fn solana_token_swap_request_handler(
40 job: Job<SolanaTokenSwapRequest>,
41 context: Data<ThinData<DefaultAppState>>,
42 attempt: Attempt,
43) -> Result<(), Error> {
44 if let Some(request_id) = job.request_id.clone() {
45 set_request_id(request_id);
46 }
47
48 debug!(relayer_id = %job.data.relayer_id, "handling solana token swap request");
49
50 let result = handle_request(job.data, context).await;
51
52 handle_result(
53 result,
54 attempt,
55 "SolanaTokenSwapRequest",
56 WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
57 )
58}
59
60#[derive(Default, Debug, Clone)]
61pub struct CronReminder();
62
63#[instrument(
65 level = "info",
66 skip(_job, data, relayer_id),
67 fields(
68 job_type = "solana_token_swap_cron",
69 attempt = %attempt.current(),
70 ),
71 err
72)]
73pub async fn solana_token_swap_cron_handler(
74 _job: CronReminder,
75 relayer_id: Data<String>,
76 data: Data<ThinData<DefaultAppState>>,
77 attempt: Attempt,
78) -> Result<(), Error> {
79 info!(
80 relayer_id = %*relayer_id,
81 "handling solana token swap cron request"
82 );
83
84 let result = handle_request(
85 SolanaTokenSwapRequest {
86 relayer_id: relayer_id.to_string(),
87 },
88 data,
89 )
90 .await;
91
92 handle_result(
93 result,
94 attempt,
95 "SolanaTokenSwapRequest",
96 WORKER_SOLANA_TOKEN_SWAP_REQUEST_RETRIES,
97 )
98}
99
100async fn handle_request(
101 request: SolanaTokenSwapRequest,
102 context: Data<ThinData<DefaultAppState>>,
103) -> Result<()> {
104 debug!(relayer_id = %request.relayer_id, "processing solana token swap");
105
106 let relayer_model = get_relayer_by_id(request.relayer_id.clone(), &context).await?;
107 let signer_model = context
108 .signer_repository
109 .get_by_id(relayer_model.signer_id.clone())
110 .await?;
111
112 let relayer = create_solana_relayer(
113 relayer_model,
114 signer_model,
115 context.relayer_repository(),
116 context.network_repository(),
117 context.transaction_repository(),
118 context.job_producer(),
119 )
120 .await?;
121
122 relayer
123 .handle_token_swap_request(request.relayer_id.clone())
124 .await
125 .map_err(|e| eyre::eyre!("Failed to handle solana token swap request: {}", e))?;
126
127 Ok(())
128}
129
130#[cfg(test)]
131mod tests {}