1use crate::{
6 jobs::JobProducerTrait,
7 models::{
8 ApiError, ApiResponse, NetworkRepoModel, NotificationRepoModel, PaginationMeta,
9 PaginationQuery, PluginCallRequest, PluginModel, RelayerRepoModel, SignerRepoModel,
10 ThinDataAppState, TransactionRepoModel,
11 },
12 repositories::{
13 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14 Repository, TransactionCounterTrait, TransactionRepository,
15 },
16 services::plugins::{
17 PluginCallResponse, PluginCallResult, PluginHandlerResponse, PluginRunner, PluginService,
18 PluginServiceTrait,
19 },
20};
21use actix_web::{http::StatusCode, HttpResponse};
22use eyre::Result;
23use std::sync::Arc;
24
25pub async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
37 plugin_id: String,
38 plugin_call_request: PluginCallRequest,
39 state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
40) -> Result<HttpResponse, ApiError>
41where
42 J: JobProducerTrait + Send + Sync + 'static,
43 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
44 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
45 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
46 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
47 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
48 TCR: TransactionCounterTrait + Send + Sync + 'static,
49 PR: PluginRepositoryTrait + Send + Sync + 'static,
50 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
51{
52 let plugin = state
53 .plugin_repository
54 .get_by_id(&plugin_id)
55 .await?
56 .ok_or_else(|| ApiError::NotFound(format!("Plugin with id {plugin_id} not found")))?;
57
58 let plugin_runner = PluginRunner;
59 let plugin_service = PluginService::new(plugin_runner);
60 let result = plugin_service
61 .call_plugin(plugin, plugin_call_request, Arc::new(state))
62 .await;
63
64 match result {
65 PluginCallResult::Success(plugin_result) => {
66 let PluginCallResponse { result, metadata } = plugin_result;
67
68 let mut response = ApiResponse::success(result);
69 response.metadata = metadata;
70 Ok(HttpResponse::Ok().json(response))
71 }
72 PluginCallResult::Handler(handler) => {
73 let PluginHandlerResponse {
74 status,
75 message,
76 error,
77 metadata,
78 } = handler;
79
80 let log_count = metadata
81 .as_ref()
82 .and_then(|meta| meta.logs.as_ref().map(|logs| logs.len()))
83 .unwrap_or(0);
84 let trace_count = metadata
85 .as_ref()
86 .and_then(|meta| meta.traces.as_ref().map(|traces| traces.len()))
87 .unwrap_or(0);
88
89 let http_status =
90 StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
91
92 tracing::debug!(
94 status,
95 message = %message,
96 code = ?error.code.as_ref(),
97 details = ?error.details.as_ref(),
98 log_count,
99 trace_count,
100 "Plugin handler error"
101 );
102
103 let mut response = ApiResponse::new(Some(error), Some(message.clone()), None);
104 response.metadata = metadata;
105 Ok(HttpResponse::build(http_status).json(response))
106 }
107 PluginCallResult::Fatal(error) => {
108 tracing::error!("Plugin error: {:?}", error);
109 Ok(HttpResponse::InternalServerError()
110 .json(ApiResponse::<String>::error("Internal server error")))
111 }
112 }
113}
114
115pub async fn list_plugins<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
128 query: PaginationQuery,
129 state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
130) -> Result<HttpResponse, ApiError>
131where
132 J: JobProducerTrait + Send + Sync + 'static,
133 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
134 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
135 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
136 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
137 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
138 TCR: TransactionCounterTrait + Send + Sync + 'static,
139 PR: PluginRepositoryTrait + Send + Sync + 'static,
140 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
141{
142 let plugins = state.plugin_repository.list_paginated(query).await?;
143
144 let plugin_items: Vec<PluginModel> = plugins.items.into_iter().collect();
145
146 Ok(HttpResponse::Ok().json(ApiResponse::paginated(
147 plugin_items,
148 PaginationMeta {
149 total_items: plugins.total,
150 current_page: plugins.page,
151 per_page: plugins.per_page,
152 },
153 )))
154}
155
156#[cfg(test)]
157mod tests {
158 use std::time::Duration;
159
160 use super::*;
161 use actix_web::web;
162
163 use crate::{
164 constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS, models::PluginModel,
165 utils::mocks::mockutils::create_mock_app_state,
166 };
167
168 #[actix_web::test]
169 async fn test_call_plugin_execution_failure() {
170 let plugin = PluginModel {
172 id: "test-plugin".to_string(),
173 path: "test-path".to_string(),
174 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
175 emit_logs: false,
176 emit_traces: false,
177 };
178 let app_state =
179 create_mock_app_state(None, None, None, None, Some(vec![plugin]), None).await;
180 let plugin_call_request = PluginCallRequest {
181 params: serde_json::json!({"key":"value"}),
182 };
183 let response = call_plugin(
184 "test-plugin".to_string(),
185 plugin_call_request,
186 web::ThinData(app_state),
187 )
188 .await;
189 assert!(response.is_ok());
190 let http_response = response.unwrap();
191 assert_eq!(http_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
193 }
194
195 #[actix_web::test]
196 async fn test_call_plugin_not_found() {
197 let app_state = create_mock_app_state(None, None, None, None, None, None).await;
199 let plugin_call_request = PluginCallRequest {
200 params: serde_json::json!({"key":"value"}),
201 };
202 let response = call_plugin(
203 "non-existent".to_string(),
204 plugin_call_request,
205 web::ThinData(app_state),
206 )
207 .await;
208 assert!(response.is_err());
209 match response.unwrap_err() {
210 ApiError::NotFound(msg) => assert!(msg.contains("non-existent")),
211 _ => panic!("Expected NotFound error"),
212 }
213 }
214
215 #[actix_web::test]
216 async fn test_call_plugin_with_logs_and_traces_enabled() {
217 let plugin = PluginModel {
219 id: "test-plugin-logs".to_string(),
220 path: "test-path".to_string(),
221 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
222 emit_logs: true,
223 emit_traces: true,
224 };
225 let app_state =
226 create_mock_app_state(None, None, None, None, Some(vec![plugin]), None).await;
227 let plugin_call_request = PluginCallRequest {
228 params: serde_json::json!({}),
229 };
230 let response = call_plugin(
231 "test-plugin-logs".to_string(),
232 plugin_call_request,
233 web::ThinData(app_state),
234 )
235 .await;
236 assert!(response.is_ok());
237 }
238
239 #[actix_web::test]
240 async fn test_list_plugins() {
241 let plugin1 = PluginModel {
243 id: "plugin1".to_string(),
244 path: "path1".to_string(),
245 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
246 emit_logs: false,
247 emit_traces: false,
248 };
249 let plugin2 = PluginModel {
250 id: "plugin2".to_string(),
251 path: "path2".to_string(),
252 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
253 emit_logs: true,
254 emit_traces: true,
255 };
256 let app_state =
257 create_mock_app_state(None, None, None, None, Some(vec![plugin1, plugin2]), None).await;
258
259 let query = PaginationQuery {
260 page: 1,
261 per_page: 10,
262 };
263
264 let response = list_plugins(query, web::ThinData(app_state)).await;
265 assert!(response.is_ok());
266 let http_response = response.unwrap();
267 assert_eq!(http_response.status(), StatusCode::OK);
268 }
269
270 #[actix_web::test]
271 async fn test_list_plugins_empty() {
272 let app_state = create_mock_app_state(None, None, None, None, None, None).await;
274
275 let query = PaginationQuery {
276 page: 1,
277 per_page: 10,
278 };
279
280 let response = list_plugins(query, web::ThinData(app_state)).await;
281 assert!(response.is_ok());
282 let http_response = response.unwrap();
283 assert_eq!(http_response.status(), StatusCode::OK);
284 }
285}