delta_base_sdk/
rpc.rs

1//! # RPC Communication with the delta Base Layer
2//!
3//! This module provides a client for communicating with delta base layer through
4//! RPC (Remote Procedure Call). It enables applications to query network state, submit
5//! transactions, retrieve vault data, and subscribe to network events.
6//!
7//! ## Overview
8//!
9//! The primary interface is the [`BaseRpcClient`] which handles connection
10//! management, request serialization, and response parsing. The client provides
11//! methods for:
12//!
13//! - [Submitting transactions to the network](BaseRpcClient::submit_transaction);
14//! - Retrieving vault data ([`get_vault`](BaseRpcClient::get_vault),
15//!   [`get_base_vault`](BaseRpcClient::get_base_vault),
16//!   [`get_vaults`](BaseRpcClient::get_vaults));
17//! - [Checking transaction status](BaseRpcClient::get_transaction_status); or
18//! - [Subscribing to network events](BaseRpcClient::stream_base_layer_events).
19//!
20//! All RPC operations use the [`tonic`] library for gRPC communication, with automatic
21//! serialization and deserialization between protocol buffer and native Rust types.
22//!
23//! ## Usage
24//!
25//! To use the RPC client, you need a running delta base layer at some known URL.
26//!
27//! ## Example
28//!
29//! ```rust,no_run
30//! use delta_base_sdk::{
31//!     core::Shard,
32//!     crypto::{ed25519, Hash256, IntoSignatureEnum},
33//!     rpc::{BaseRpcClient, RpcError},
34//!     vaults::{Address, ReadableNativeBalance, Vault},
35//! };
36//! # use std::error::Error;
37//! # use std::str::FromStr;
38//! # use tokio_stream::StreamExt;
39//!
40//! async fn rpc_example() -> Result<(), Box<dyn Error>> {
41//!     // Connect to base layer
42//!     let client = BaseRpcClient::new("http://localhost:50051").await?;
43//!
44//!     // Get a vault
45//!     let pubkey = ed25519::PrivKey::generate().pub_key();
46//!     let vault = client.get_vault(Address::new(pubkey.owner(), 1)).await?;
47//!     println!("Vault balance: {}", vault.balance());
48//!
49//!     // Check a transaction status
50//!     // In a real app, this would be a valid signature
51//!     let signature = ed25519::Signature::from_str("<MY SIGNATURE>")?
52//!         .into_signature_enum(pubkey);
53//!     let status = client.get_transaction_status(signature.hash_sha256()).await?;
54//!     println!("Transaction status: {:?}", status);
55//!
56//!     // Query current epoch
57//!     let epoch = client.get_epoch().await?;
58//!     println!("Current epoch: {}", epoch);
59//!
60//!     // Subscribe to network events (requires tokio to run)
61//!     let mut events = client.stream_base_layer_events(0).await?;
62//!
63//!     // Process the first 5 events
64//!     let mut count = 0;
65//!     while let Some(event) = events.next().await {
66//!         if let Ok(event) = event {
67//!             println!("Received event: {:?}", event);
68//!             count += 1;
69//!             if count >= 5 {
70//!                 break;
71//!             }
72//!         }
73//!     }
74//!     Ok(())
75//! }
76//! ```
77
78use crate::{
79    core::Shard,
80    crypto::HashDigest,
81    events::BaseLayerEvent,
82    sdl::StateDiffList,
83    transactions::{
84        types::TransactionType,
85        SignedTransaction,
86        TransactionStatus,
87        TypeData,
88    },
89};
90use http::{
91    uri::{
92        self,
93        InvalidUri,
94        Scheme,
95    },
96    Uri,
97};
98use primitives::{
99    domain_agreement::DomainAgreement,
100    type_aliases::{
101        Epoch,
102        TxId,
103    },
104    vault::{
105        base::Vault as BaseVault,
106        sharded::Vault as ShardedVault,
107        Address,
108        OwnerId,
109    },
110};
111use proto_types::{
112    error::HashSnafu,
113    sdl::SdlStatus,
114    services::{
115        GetBaseVaultRequest,
116        GetDomainAgreementRequest,
117        GetDomainOwnerIdsRequest,
118        GetEpochRequest,
119        GetEpochResponse,
120        GetSdlDataRequest,
121        GetShardedVaultsRequest,
122        GetTransactionStatusRequest,
123        StreamBaseLayerEventRequest,
124        ValidatorServiceClient,
125    },
126};
127use snafu::{
128    ResultExt,
129    Snafu,
130};
131use std::collections::HashMap;
132use tokio_stream::{
133    Stream,
134    StreamExt,
135};
136use tonic::{
137    transport::Channel,
138    Request,
139};
140
141/// Errors that can occur when using the [`BaseRpcClient`]
142#[derive(Debug, Snafu)]
143pub enum RpcError {
144    /// Failed to establish a connection to base layer
145    ///
146    /// This error occurs when the client cannot connect to the specified
147    /// base layer URL, typically due to network issues or because
148    /// the node is not running.
149    #[snafu(display("Could not connect: {source}"))]
150    Connection {
151        /// The underlying transport error
152        source: tonic::transport::Error,
153    },
154
155    /// The RPC request was rejected by base layer
156    ///
157    /// This error occurs when the request is received by the base layer
158    /// but cannot be processed, such as when requesting a non-existent
159    /// vault or submitting an invalid transaction.
160    #[snafu(display("Request failed with status: {status}"))]
161    Request {
162        /// The status returned by the base layer
163        #[snafu(source(from(tonic::Status, Box::new)))]
164        status: Box<tonic::Status>,
165    },
166
167    /// Failed to convert between protocol buffer and native format
168    ///
169    /// This error occurs when there's a problem converting between the
170    /// wire format (protocol buffers) and the native Rust types, typically
171    /// due to missing or malformed data.
172    #[snafu(display("Could not convert type: {source}"))]
173    IntoProto {
174        /// The underlying conversion error
175        source: proto_types::error::ProtoError,
176    },
177
178    /// Failed to parse base layer URL
179    ///
180    /// This error occurs when the URL provided to connect to the base layer is
181    /// malformed or cannot be parsed.
182    #[snafu(display("Could not parse URL: {source}"))]
183    InvalidUrl {
184        /// The underlying HTTP error
185        source: http::Error,
186    },
187}
188
189impl RpcError {
190    /// Returns true if the underlying error is a gRPC not found status.
191    pub fn is_not_found(&self) -> bool {
192        matches!(self, Self::Request { status } if status.code() == tonic::Code::NotFound)
193    }
194}
195
196/// Client for communicating with the delta base layer through RPC
197///
198/// [BaseRpcClient] provides a high-level interface for interacting with
199/// delta base layers. It handles connection management, request formatting,
200/// and response parsing for all base layer operations.
201///
202/// # Example
203///
204/// ```rust,no_run
205/// use delta_base_sdk::rpc::BaseRpcClient;
206///
207/// async fn connect() -> Result<(), Box<dyn std::error::Error>> {
208///     // Connect to local base layer provider
209///     let client = BaseRpcClient::new("http://localhost:50051").await?;
210///
211///     // Connect to a remote node with HTTPS
212///     let secure_client = BaseRpcClient::new("https://baselayer.example.com").await?;
213///
214///     // Connect with just the host (HTTPS will be added automatically)
215///     let auto_scheme_client = BaseRpcClient::new("baselayer.example.com").await?;
216///
217///     Ok(())
218/// }
219/// ```
220#[derive(Debug, Clone)]
221pub struct BaseRpcClient {
222    // The inner client is cloned in all calls but this is cheap and avoids mutable self refs:
223    // https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html#multiplexing-requests
224    inner: ValidatorServiceClient<Channel>,
225}
226
227impl BaseRpcClient {
228    /// Creates a new client by connecting to the specified base layer provider URL
229    ///
230    /// This method establishes a connection to delta base layer at the specified
231    /// URL. If the URL does not include a scheme (http:// or https://), HTTPS will be
232    /// used automatically.
233    ///
234    /// # Parameters
235    ///
236    /// * `url` - URL of base layer provider to connect to, either as a string or a URI
237    ///
238    /// # Errors
239    ///
240    /// Returns [RpcError] if:
241    /// - The URL cannot be parsed; or
242    /// - The connection cannot be established
243    pub async fn new(url: impl TryInto<Uri, Error = InvalidUri> + Send) -> Result<Self, RpcError> {
244        let uri = add_missing_scheme(url)?;
245        let inner = ValidatorServiceClient::connect(uri.to_string())
246            .await
247            .context(ConnectionSnafu)?;
248        Ok(Self { inner })
249    }
250
251    /// Submits a signed transaction to the base layer network
252    ///
253    /// This method sends a transaction to base layer for processing.
254    /// The transaction must be properly signed with a valid signature from
255    /// the transaction signer's private key.
256    ///
257    /// # Parameters
258    ///
259    /// * `tx` - Signed transaction to submit
260    ///
261    /// # Errors
262    ///
263    /// Returns [RpcError] if:
264    /// - The connection to the base layer provider fails;
265    /// - The base layer provider rejects the transaction; or
266    /// - There's a protocol conversion error.
267    pub async fn submit_transaction<T>(&self, tx: SignedTransaction<T>) -> Result<(), RpcError>
268    where
269        T: TransactionType + Into<TypeData> + Send,
270    {
271        self.inner
272            .clone()
273            .submit_transaction(Request::new(tx.into()))
274            .await
275            .context(RequestSnafu)?;
276        Ok(())
277    }
278
279    /// Gets the base vault for an owner in the base layer shard
280    ///
281    /// This is a convenience method that calls [`get_vault`](BaseRpcClient::get_vault)
282    /// with the base layer shard (0).
283    ///
284    /// # Parameters
285    ///
286    /// * `owner` - The ID of the vault owner
287    ///
288    /// # Errors
289    ///
290    /// Returns [RpcError] if:
291    /// - The vault doesn't exist;
292    /// - The connection to the base layer provider fails; or
293    /// - There's a protocol conversion error.
294    pub async fn get_base_vault(&self, owner: OwnerId) -> Result<BaseVault, RpcError> {
295        let proto_vault = self
296            .inner
297            .clone()
298            .get_base_vault(GetBaseVaultRequest {
299                owner: owner.into(),
300            })
301            .await
302            .context(RequestSnafu)?
303            .into_inner();
304
305        Ok(proto_vault.into())
306    }
307
308    /// Retrieves a domain vault data for a specific address
309    ///
310    /// # Parameters
311    ///
312    /// * `address` - of the vault to fetch
313    ///
314    /// # Errors
315    ///
316    /// Returns [RpcError] if:
317    /// - The vault under the given address doesn't exist;
318    /// - The connection to the base layer provider fails; or
319    /// - There's a protocol conversion error.
320    ///
321    /// # Example
322    ///
323    /// ```rust,no_run
324    /// use delta_base_sdk::{
325    ///     crypto::ed25519,
326    ///     rpc::BaseRpcClient,
327    ///     vaults::{Address, ReadableNativeBalance},
328    /// };
329    ///
330    /// async fn get_vault_example(client: &BaseRpcClient) -> Result<(), Box<dyn std::error::Error>> {
331    ///     let address = Address::new(ed25519::PubKey::generate().owner(), 1);
332    ///     let vault = client.get_vault(address).await?;
333    ///     println!("Vault data: {:?}", vault);
334    ///     println!("Vault balance: {}", vault.balance());
335    ///     Ok(())
336    /// }
337    /// ```
338    pub async fn get_vault(&self, address: Address) -> Result<ShardedVault, RpcError> {
339        let mut vaults = self.get_vaults([address]).await?;
340        vaults.remove(&address).ok_or_else(|| RpcError::Request {
341            status: Box::new(tonic::Status::not_found("Vault not found")),
342        })
343    }
344
345    /// Retrieves multiple sharded vaults by address
346    ///
347    /// This method fetches multiple vaults in a single request, up to a maximum of 100.
348    ///
349    /// # Parameters
350    ///
351    /// * `addresses` - An iterator of addresses to retrieve
352    ///
353    /// # Errors
354    ///
355    /// Returns [RpcError] if:
356    /// - Any of the vaults don't exist;
357    /// - The connection to the base layer provider fails; or
358    /// - There's a protocol conversion error.
359    pub async fn get_vaults(
360        &self,
361        addresses: impl IntoIterator<Item = Address>,
362    ) -> Result<HashMap<Address, ShardedVault>, RpcError> {
363        // Collect the iterator first since we'll need to consume it multiple times
364        let addresses: Vec<Address> = addresses.into_iter().collect();
365        if addresses.is_empty() {
366            return Ok(HashMap::new())
367        }
368
369        let addresses_proto: Vec<_> = addresses.iter().map(|address| (*address).into()).collect();
370
371        // Fetch vaults from server
372        let vaults = self
373            .inner
374            .clone()
375            .get_sharded_vaults(GetShardedVaultsRequest {
376                addresses: addresses_proto,
377            })
378            .await
379            .context(RequestSnafu)?
380            .into_inner()
381            .vaults;
382
383        // Reconstruct addresses matching server response order
384        addresses
385            .into_iter()
386            .zip(vaults)
387            .map(|(address, vault_proto)| {
388                vault_proto
389                    .try_into()
390                    .context(IntoProtoSnafu)
391                    .map(|vault| (address, vault))
392            })
393            .collect()
394    }
395
396    /// Returns all the [OwnerId] that have a vault registered for the given shard on the base layer
397    pub async fn get_domain_owner_ids(&self, shard: Shard) -> Result<Vec<OwnerId>, RpcError> {
398        let response = self
399            .inner
400            .clone()
401            .get_domain_owner_ids(GetDomainOwnerIdsRequest { shard })
402            .await
403            .context(RequestSnafu)?
404            .into_inner();
405        response
406            .owner_ids
407            .into_iter()
408            .map(TryInto::try_into)
409            .collect::<Result<Vec<OwnerId>, _>>()
410            .context(HashSnafu)
411            .context(IntoProtoSnafu)
412    }
413
414    /// Gets the Domain Agreement for the provided shard.
415    ///
416    /// # Parameters
417    /// * `shard` - the shard to query the agreement for
418    ///
419    /// # Errors
420    ///
421    /// Returns [RpcError] if:
422    /// - There's no active agreement;
423    /// - The connection to the base layer RPC fails; or
424    /// - There's a protocol conversion error.
425    pub async fn get_domain_agreement(&self, shard: Shard) -> Result<DomainAgreement, RpcError> {
426        self.inner
427            .clone()
428            .get_domain_agreement(GetDomainAgreementRequest { shard })
429            .await
430            .context(RequestSnafu)?
431            .into_inner()
432            .try_into()
433            .context(IntoProtoSnafu)
434    }
435
436    /// Retrieves the current status of a transaction by its signature
437    ///
438    /// This method allows tracking the progress of a transaction through the
439    /// delta network. After submitting a transaction, you can use this method
440    /// to check if it has been processed, confirmed, or rejected.
441    ///
442    /// # Parameters
443    ///
444    /// * `tx_id` - [TxId] of the transaction to query
445    ///
446    /// # Errors
447    ///
448    /// Returns [RpcError] if:
449    /// - The connection to the base layer provider fails; or
450    /// - The signature is invalid or unknown to the network.
451    ///
452    /// # Note
453    ///
454    /// Transaction signatures are unique identifiers generated when signing a
455    /// transaction. Retain the signature after submitting a transaction if you
456    /// want to check its status later.
457    pub async fn get_transaction_status(&self, tx_id: TxId) -> Result<TransactionStatus, RpcError> {
458        let res = self
459            .inner
460            .clone()
461            .get_transaction_status(GetTransactionStatusRequest {
462                tx_id: tx_id.into(),
463            })
464            .await
465            .context(RequestSnafu)?;
466        res.into_inner().try_into().context(IntoProtoSnafu)
467    }
468
469    /// Retrieves the current epoch in delta
470    ///
471    /// This method returns the current epoch number. Epochs are protocol-defined
472    /// periods in the delta protocol during which the base layer provider set
473    /// remains constant.
474    ///
475    /// # Errors
476    ///
477    /// Returns [RpcError] if:
478    /// - The connection to the base layer provider fails
479    /// - The base layer provider cannot provide epoch information
480    ///
481    /// # Example
482    ///
483    /// ```rust,no_run
484    /// use delta_base_sdk::rpc::BaseRpcClient;
485    ///
486    /// async fn get_network_info() -> Result<(), Box<dyn std::error::Error>> {
487    ///     let client = BaseRpcClient::new("http://localhost:50051").await?;
488    ///
489    ///     // Get information about the current epoch
490    ///     let epoch = client.get_epoch().await?;
491    ///
492    ///     println!("Current epoch: {}", epoch);
493    ///
494    ///     Ok(())
495    /// }
496    /// ```
497    pub async fn get_epoch(&self) -> Result<Epoch, RpcError> {
498        let res: GetEpochResponse = self
499            .inner
500            .clone()
501            .get_epoch(GetEpochRequest {})
502            .await
503            .context(RequestSnafu)?
504            .into_inner();
505        Ok(res.epoch)
506    }
507
508    /// Retrieves a [State Diff List (SDL)](StateDiffList) by its hash identifier
509    ///
510    /// A State Diff List (SDL) is a batch of state changes in the delta protocol.
511    /// This method allows retrieving the complete contents of an SDL by its hash,
512    /// enabling applications to inspect state changes or verify transaction results.
513    ///
514    /// # Parameters
515    ///
516    /// * `hash` - [Cryptographic hash](HashDigest) that uniquely identifies the SDL
517    ///
518    /// # Errors
519    ///
520    /// Returns [RpcError] if:
521    /// - The SDL with the specified hash doesn't exist;
522    /// - The connection to the base layer provider fails; or
523    /// - There's a protocol conversion error.
524    pub async fn get_sdl(&self, hash: HashDigest) -> Result<StateDiffList, RpcError> {
525        let res = self
526            .inner
527            .clone()
528            .get_sdl_data(GetSdlDataRequest {
529                sdl_hash: hash.into(),
530            })
531            .await
532            .context(RequestSnafu)?;
533        res.into_inner().try_into().context(IntoProtoSnafu)
534    }
535
536    /// Retrieves the [status](SdlStatus) of a [State Diff List (SDL)](StateDiffList).
537    ///
538    /// # Parameters
539    ///
540    /// * `hash` - [Cryptographic hash](HashDigest) that uniquely identifies the SDL
541    ///
542    /// # Errors
543    ///
544    /// Returns [RpcError] if:
545    /// - The SDL with the specified hash doesn't exist;
546    /// - The connection to the base layer provider fails; or
547    /// - There's a protocol conversion error.
548    pub async fn get_sdl_status(&self, hash: HashDigest) -> Result<SdlStatus, RpcError> {
549        let res = self
550            .inner
551            .clone()
552            .get_sdl_status(GetSdlDataRequest {
553                sdl_hash: hash.into(),
554            })
555            .await
556            .context(RequestSnafu)?;
557
558        res.into_inner().try_into().context(IntoProtoSnafu)
559    }
560
561    /// Subscribes to a stream of real-time events from the delta network
562    ///
563    /// This method establishes a streaming connection to receive events from the
564    /// base layer as they occur. Events include epoch transitions, vault migrations,
565    /// and other network state changes. The events are filtered by shard.
566    ///
567    /// # Parameters
568    ///
569    /// * `shard` - [Shard] number to receive events for
570    ///
571    /// # Errors
572    ///
573    /// Returns [RpcError] if:
574    /// - The connection to the base layer provider fails;
575    /// - The streaming request is rejected; or
576    /// - Individual stream items may contain errors if there are issues during streaming.
577    ///
578    /// # Example
579    ///
580    /// ```rust,no_run
581    /// use delta_base_sdk::{
582    ///     events::BaseLayerEvent,
583    ///     rpc::BaseRpcClient,
584    /// };
585    /// use tokio_stream::StreamExt;
586    ///
587    /// async fn monitor_events() -> Result<(), Box<dyn std::error::Error>> {
588    ///     let client = BaseRpcClient::new("http://localhost:50051").await?;
589    ///
590    ///     // Subscribe to base layer events for shard 0 (the base layer shard)
591    ///     let mut event_stream = client.stream_base_layer_events(0).await?;
592    ///
593    ///     println!("Listening for events...");
594    ///
595    ///     // Process events as they arrive
596    ///     while let Some(event_result) = event_stream.next().await {
597    ///         match event_result {
598    ///             Ok(event) => {
599    ///                 match event {
600    ///                     BaseLayerEvent::NewEpoch(epoch) => {
601    ///                         println!("New epoch started: {}", epoch);
602    ///                     },
603    ///                     BaseLayerEvent::SdlUpdate(update) => {
604    ///                         println!("SDL update received: {:?}", update.hash);
605    ///                     },
606    ///                     BaseLayerEvent::VaultEmigrated(address) => {
607    ///                         println!("Vault emigrated: {address:?}");
608    ///                     },
609    ///                     BaseLayerEvent::VaultImmigrated(address) => {
610    ///                         println!("Vault immigrated: {address:?}");
611    ///                     },
612    ///                 }
613    ///             },
614    ///             Err(e) => {
615    ///                 eprintln!("Error in event stream: {e}");
616    ///                 // Decide whether to break or continue based on the error
617    ///             }
618    ///         }
619    ///     }
620    ///
621    ///     Ok(())
622    /// }
623    /// ```
624    ///
625    /// # Note
626    ///
627    /// The returned stream implements the `Stream` trait from the `tokio_stream` crate.
628    /// You need to bring the `StreamExt` trait into scope to use methods like `next()`.
629    ///
630    /// This is a long-lived connection that will continue to deliver events until
631    /// the stream is dropped or the connection is closed. It's suitable for building
632    /// reactive applications that need to respond to network events in real-time.
633    pub async fn stream_base_layer_events(
634        &self,
635        shard: Shard,
636    ) -> Result<impl Stream<Item = Result<BaseLayerEvent, RpcError>> + Send, RpcError> {
637        let res = self
638            .inner
639            .clone()
640            .stream_base_layer_events(StreamBaseLayerEventRequest { shard })
641            .await
642            .context(RequestSnafu)?;
643
644        let stream = res.into_inner().map(|r| {
645            r.context(RequestSnafu)
646                .and_then(|u| u.try_into().context(IntoProtoSnafu))
647        });
648
649        Ok(stream)
650    }
651}
652
653/// The default URI scheme to use when none is provided
654const DEFAULT_SCHEME: Scheme = Scheme::HTTPS;
655
656/// Adds the HTTPS scheme to a URL if it doesn't already have a scheme
657///
658/// This utility function ensures that all URLs have a scheme, adding HTTPS
659/// as the default if none is specified. This allows users to provide just
660/// the host and port (e.g., "localhost:50051") without having to specify
661/// the scheme.
662///
663/// # Parameters
664///
665/// * `url` - URL to process, which may or may not have a scheme
666///
667/// # Errors
668///
669/// Returns [RpcError] if:
670/// - The URL cannot be parsed as a valid URI
671/// - The modified URI cannot be built
672#[allow(clippy::result_large_err)]
673fn add_missing_scheme(url: impl TryInto<Uri, Error = InvalidUri>) -> Result<Uri, RpcError> {
674    let uri: Uri = url
675        .try_into()
676        .map_err(|e| e.into())
677        .context(InvalidUrlSnafu)?;
678
679    if uri.scheme().is_some() {
680        Ok(uri)
681    } else {
682        let path = uri
683            .path_and_query()
684            .map(|p| p.as_str())
685            .unwrap_or("/")
686            .to_string();
687        Into::<uri::Builder>::into(uri)
688            .scheme(DEFAULT_SCHEME)
689            .path_and_query(path)
690            .build()
691            .context(InvalidUrlSnafu)
692    }
693}
694
695#[cfg(test)]
696mod test {
697    use super::*;
698
699    #[test]
700    fn test_add_missing_scheme() {
701        // leaves existing scheme
702        assert_eq!(
703            "http://localhost:5005/",
704            add_missing_scheme("http://localhost:5005")
705                .unwrap()
706                .to_string()
707        );
708        assert_eq!(
709            "http://localhost:5005/path/yes",
710            add_missing_scheme("http://localhost:5005/path/yes")
711                .unwrap()
712                .to_string()
713        );
714        assert_eq!(
715            "yolo://net.delta.net/",
716            add_missing_scheme("yolo://net.delta.net")
717                .unwrap()
718                .to_string()
719        );
720
721        // adds scheme if missing
722        assert_eq!(
723            "https://net.delta.net/",
724            add_missing_scheme("net.delta.net").unwrap().to_string()
725        );
726        assert_eq!(
727            "https://localhost:12345/",
728            add_missing_scheme("localhost:12345").unwrap().to_string()
729        );
730    }
731}