delta_domain_sdk/
runner.rs1#[cfg(all(not(test), feature = "admin-api"))]
9use crate::admin_api::AdminApi;
10use crate::{
11 base_layer::BaseLayer,
12 storage::Database,
13};
14use base_sdk::vaults::{
15 OwnerId,
16 Vault,
17};
18use domain_runtime::{
19 config::RuntimeConfig,
20 storage::ColumnFamilies,
21};
22use proving_clients::Proving;
23use service_tasks::{
24 service::{
25 status::ServiceStatus,
26 Service,
27 ServiceError,
28 },
29 service_runner::ServiceRunner,
30};
31use snafu::{
32 ResultExt,
33 Snafu,
34};
35use std::{
36 collections::HashMap,
37 net::SocketAddr,
38};
39use storage::{
40 database::StorageError,
41 key_value::KeyValueStorage,
42};
43
44#[derive(Debug, Snafu)]
46pub enum Error {
47 #[snafu(display("Failure while running the domain service task: {source}"))]
49 ServiceRun {
50 source: ServiceError,
52 },
53
54 #[snafu(display("Failure while starting the admin API: {source}"))]
56 AdminApi {
57 source: std::io::Error,
59 },
60}
61
62#[derive(Debug)]
70pub struct Runner<S>
71where
72 S: KeyValueStorage<ColumnFamilyIdentifier = ColumnFamilies, Error = StorageError>
73 + Send
74 + Sync
75 + 'static,
76{
77 service: ServiceRunner<domain_runtime::Task<BaseLayer, Database<S>, BaseLayer>>,
78
79 #[cfg(all(not(test), feature = "admin-api"))]
80 admin_api: AdminApi<Database<S>>,
81}
82
83impl<S> Runner<S>
84where
85 S: KeyValueStorage<ColumnFamilyIdentifier = ColumnFamilies, Error = StorageError>
86 + Send
87 + Sync
88 + 'static,
89{
90 #[allow(unused_variables, clippy::redundant_clone)]
92 pub(crate) fn new(
93 config: RuntimeConfig,
94 prover: Box<dyn Proving>,
95 db: Database<S>,
96 base_layer: BaseLayer,
97 seed_vaults: HashMap<OwnerId, Vault>,
98 admin_api_address: SocketAddr,
99 ) -> Self {
100 let inputs = domain_runtime::task::Inputs {
101 config,
102 prover,
103 seed_vaults,
104 base_layer_port: base_layer.clone(),
105 vaults_fetcher: base_layer,
106 db: db.clone(),
107 };
108 let service = ServiceRunner::new(inputs);
109
110 #[cfg(all(not(test), feature = "admin-api"))]
111 let admin_api = AdminApi::new(admin_api_address, service.endpoint(), db);
112
113 Self {
114 service,
115 #[cfg(all(not(test), feature = "admin-api"))]
116 admin_api,
117 }
118 }
119
120 pub(crate) fn endpoint(&self) -> domain_runtime::Endpoint {
121 self.service.endpoint()
122 }
123
124 pub async fn run(self) -> Result<(), Error> {
138 #[cfg(all(not(test), feature = "admin-api"))]
139 self.admin_api.run().await.context(AdminApiSnafu)?;
140
141 self.service.run().await.context(ServiceRunSnafu)?;
142
143 Ok(())
144 }
145
146 pub async fn run_in_background(&self) -> Result<(), Error> {
158 #[cfg(all(not(test), feature = "admin-api"))]
159 self.admin_api.clone().run().await.context(AdminApiSnafu)?;
160
161 let status = self
162 .service
163 .run_in_background()
164 .await
165 .context(ServiceRunSnafu)?;
166
167 if !matches!(status, ServiceStatus::Running) {
168 return Err(ServiceError::IncompatibleStatus { status }).context(ServiceRunSnafu)?;
169 }
170 Ok(())
171 }
172}