delta_domain_sdk/
runner.rs1#[cfg(all(not(test), feature = "admin-api"))]
9use crate::admin_api::AdminApi;
10use crate::{
11 base_layer::BaseLayer,
12 storage::Database,
13};
14use base_sdk::{
15 core::Shard,
16 vaults::{
17 OwnerId,
18 Vault,
19 },
20};
21use domain_runtime::storage::ColumnFamilies;
22use proving_clients::Proving;
23use service_tasks::{
24 service::{
25 status::ServiceStatus,
26 Service,
27 ServiceError,
28 },
29 service_runner::ServiceRunner,
30};
31use snafu::{
32 ResultExt,
33 Snafu,
34};
35#[cfg(feature = "admin-api")]
36use std::net::SocketAddr;
37use std::{
38 collections::HashMap,
39 num::NonZero,
40};
41use storage::{
42 database::StorageError,
43 key_value::KeyValueStorageWithColumnFamilies,
44};
45
46#[derive(Debug, Snafu)]
48pub enum Error {
49 #[snafu(display("Failure while running the domain service task: {source}"))]
51 ServiceRun {
52 source: ServiceError,
54 },
55
56 #[snafu(display("Failure while starting the admin API: {source}"))]
58 AdminApi {
59 source: std::io::Error,
61 },
62}
63
64#[derive(Debug)]
72pub struct Runner<P, S>
73where
74 P: Proving,
75 S: KeyValueStorageWithColumnFamilies<
76 ColumnFamilyIdentifier = ColumnFamilies,
77 Error = StorageError,
78 > + Send
79 + Sync
80 + 'static,
81{
82 service: ServiceRunner<domain_runtime::Task<BaseLayer, P, Database<S>, BaseLayer>>,
83
84 #[cfg(all(not(test), feature = "admin-api"))]
85 admin_api: AdminApi<Database<S>>,
86}
87
88impl<P, S> Runner<P, S>
89where
90 P: Proving,
91 S: KeyValueStorageWithColumnFamilies<
92 ColumnFamilyIdentifier = ColumnFamilies,
93 Error = StorageError,
94 > + Send
95 + Sync
96 + 'static,
97{
98 #[allow(unused_variables, clippy::redundant_clone)]
100 pub(crate) fn new(
101 shard: NonZero<Shard>,
102 prover: P,
103 db: Database<S>,
104 base_layer: BaseLayer,
105 seed_vaults: HashMap<OwnerId, Vault>,
106 #[cfg(feature = "admin-api")] admin_api_address: SocketAddr,
107 ) -> Self {
108 let inputs = domain_runtime::task::Inputs {
109 shard,
110 prover,
111 seed_vaults,
112 base_layer_port: base_layer.clone(),
113 vaults_fetcher: base_layer,
114 db: db.clone(),
115 };
116 let service = ServiceRunner::new(inputs);
117
118 #[cfg(all(not(test), feature = "admin-api"))]
119 let admin_api = AdminApi::new(admin_api_address, service.endpoint(), db);
120
121 Self {
122 service,
123 #[cfg(all(not(test), feature = "admin-api"))]
124 admin_api,
125 }
126 }
127
128 pub(crate) fn endpoint(&self) -> domain_runtime::Endpoint {
129 self.service.endpoint()
130 }
131
132 pub async fn run(self) -> Result<(), Error> {
146 #[cfg(all(not(test), feature = "admin-api"))]
147 self.admin_api.run().await.context(AdminApiSnafu)?;
148
149 self.service.run().await.context(ServiceRunSnafu)?;
150
151 Ok(())
152 }
153
154 pub async fn run_in_background(&self) -> Result<(), Error> {
166 #[cfg(all(not(test), feature = "admin-api"))]
167 self.admin_api.clone().run().await.context(AdminApiSnafu)?;
168
169 let status = self
170 .service
171 .run_in_background()
172 .await
173 .context(ServiceRunSnafu)?;
174
175 if !matches!(status, ServiceStatus::Running) {
176 return Err(ServiceError::IncompatibleStatus { status }).context(ServiceRunSnafu)?;
177 }
178 Ok(())
179 }
180}