delta_domain_sdk/
runner.rs

1//! # Runner
2//!
3//! Handle to launch the domain runtime — the event loop that processes events
4//! and executes domain operations.
5//!
6//! See [`Runner`] for how to start the domain.
7
8#[cfg(all(not(test), feature = "admin-api"))]
9use crate::admin_api::AdminApi;
10use crate::{
11    base_layer::BaseLayer,
12    storage::Database,
13};
14use base_sdk::{
15    core::Shard,
16    vaults::{
17        OwnerId,
18        Vault,
19    },
20};
21use domain_runtime::storage::ColumnFamilies;
22use proving_clients::Proving;
23use service_tasks::{
24    service::{
25        status::ServiceStatus,
26        Service,
27        ServiceError,
28    },
29    service_runner::ServiceRunner,
30};
31use snafu::{
32    ResultExt,
33    Snafu,
34};
35#[cfg(feature = "admin-api")]
36use std::net::SocketAddr;
37use std::{
38    collections::HashMap,
39    num::NonZero,
40};
41use storage::{
42    database::StorageError,
43    key_value::KeyValueStorageWithColumnFamilies,
44};
45
46/// Errors occurring in the [Runner]
47#[derive(Debug, Snafu)]
48pub enum Error {
49    /// Error while running service task
50    #[snafu(display("Failure while running the domain service task: {source}"))]
51    ServiceRun {
52        /// The underlying service error
53        source: ServiceError,
54    },
55
56    /// Error starting the admin API
57    #[snafu(display("Failure while starting the admin API: {source}"))]
58    AdminApi {
59        /// The underlying error
60        source: std::io::Error,
61    },
62}
63
64/// Handle to launch the domain runtime.
65///
66/// The domain runtime is the event loop that processes events and executes
67/// operations. Without launching it, actions performed by the
68/// [`DomainClient`](crate::client::DomainClient) cannot be executed, and
69/// reading state from [`Views`](crate::views::Views) is not guaranteed to be
70/// correct until initialization completes.
71#[derive(Debug)]
72pub struct Runner<P, S>
73where
74    P: Proving,
75    S: KeyValueStorageWithColumnFamilies<
76            ColumnFamilyIdentifier = ColumnFamilies,
77            Error = StorageError,
78        > + Send
79        + Sync
80        + 'static,
81{
82    service: ServiceRunner<domain_runtime::Task<BaseLayer, P, Database<S>, BaseLayer>>,
83
84    #[cfg(all(not(test), feature = "admin-api"))]
85    admin_api: AdminApi<Database<S>>,
86}
87
88impl<P, S> Runner<P, S>
89where
90    P: Proving,
91    S: KeyValueStorageWithColumnFamilies<
92            ColumnFamilyIdentifier = ColumnFamilies,
93            Error = StorageError,
94        > + Send
95        + Sync
96        + 'static,
97{
98    // To ease the feature magic below
99    #[allow(unused_variables, clippy::redundant_clone)]
100    pub(crate) fn new(
101        shard: NonZero<Shard>,
102        prover: P,
103        db: Database<S>,
104        base_layer: BaseLayer,
105        seed_vaults: HashMap<OwnerId, Vault>,
106        #[cfg(feature = "admin-api")] admin_api_address: SocketAddr,
107    ) -> Self {
108        let inputs = domain_runtime::task::Inputs {
109            shard,
110            prover,
111            seed_vaults,
112            base_layer_port: base_layer.clone(),
113            vaults_fetcher: base_layer,
114            db: db.clone(),
115        };
116        let service = ServiceRunner::new(inputs);
117
118        #[cfg(all(not(test), feature = "admin-api"))]
119        let admin_api = AdminApi::new(admin_api_address, service.endpoint(), db);
120
121        Self {
122            service,
123            #[cfg(all(not(test), feature = "admin-api"))]
124            admin_api,
125        }
126    }
127
128    pub(crate) fn endpoint(&self) -> domain_runtime::Endpoint {
129        self.service.endpoint()
130    }
131
132    /// Launch the domain runtime until completion.
133    ///
134    /// This method runs the domain runtime's event loop and does not return
135    /// until shutdown or error. Callers typically `tokio::spawn` this for more
136    /// control over the task.
137    ///
138    /// Note that initialization happens asynchronously after this method
139    /// starts; [`Views`](crate::views::Views) may not return correct data until
140    /// initialization completes. Use
141    /// [`run_in_background`](Self::run_in_background) if you need to wait for
142    /// initialization.
143    ///
144    /// If the `admin-api` feature is enabled, also launches the Admin API.
145    pub async fn run(self) -> Result<(), Error> {
146        #[cfg(all(not(test), feature = "admin-api"))]
147        self.admin_api.run().await.context(AdminApiSnafu)?;
148
149        self.service.run().await.context(ServiceRunSnafu)?;
150
151        Ok(())
152    }
153
154    /// Launch the domain runtime in the background.
155    ///
156    /// Spawns the domain runtime's event loop and returns once initialization
157    /// is complete. This is the recommended approach for tests and when you
158    /// need to use [`Views`](crate::views::Views) immediately after starting
159    /// the domain.
160    ///
161    /// Unlike [`run`](Self::run), callers don't have control over the spawned
162    /// task.
163    ///
164    /// If the `admin-api` feature is enabled, also launches the Admin API.
165    pub async fn run_in_background(&self) -> Result<(), Error> {
166        #[cfg(all(not(test), feature = "admin-api"))]
167        self.admin_api.clone().run().await.context(AdminApiSnafu)?;
168
169        let status = self
170            .service
171            .run_in_background()
172            .await
173            .context(ServiceRunSnafu)?;
174
175        if !matches!(status, ServiceStatus::Running) {
176            return Err(ServiceError::IncompatibleStatus { status }).context(ServiceRunSnafu)?;
177        }
178        Ok(())
179    }
180}