delta_base_sdk/rpc.rs
1//! # RPC Communication with the delta Base Layer
2//!
3//! This module provides a client for communicating with delta base layer through
4//! RPC (Remote Procedure Call). It enables applications to query network state, submit
5//! transactions, retrieve vault data, and subscribe to network events.
6//!
7//! ## Overview
8//!
9//! The primary interface is the [`BaseRpcClient`] which handles connection
10//! management, request serialization, and response parsing. The client provides
11//! methods for:
12//!
13//! - [Submitting transactions to the network](BaseRpcClient::submit_transaction);
14//! - Retrieving vault data ([`get_vault`](BaseRpcClient::get_vault),
15//! [`get_base_vault`](BaseRpcClient::get_base_vault),
16//! [`get_vaults`](BaseRpcClient::get_vaults));
17//! - [Checking transaction status](BaseRpcClient::get_transaction_status); or
18//! - [Subscribing to network events](BaseRpcClient::stream_base_layer_events).
19//!
20//! All RPC operations use the [`tonic`] library for gRPC communication, with automatic
21//! serialization and deserialization between protocol buffer and native Rust types.
22//!
23//! ## Usage
24//!
25//! To use the RPC client, you need a running delta base layer at some known URL.
26//!
27//! ## Example
28//!
29//! ```rust,no_run
30//! use delta_base_sdk::{
31//! core::Shard,
32//! crypto::{ed25519, Hash256, IntoSignatureEnum},
33//! rpc::{BaseRpcClient, RpcError},
34//! vaults::{Address, ReadableNativeBalance, Vault},
35//! };
36//! # use std::error::Error;
37//! # use std::str::FromStr;
38//! # use tokio_stream::StreamExt;
39//!
40//! async fn rpc_example() -> Result<(), Box<dyn Error>> {
41//! // Connect to base layer
42//! let client = BaseRpcClient::new("http://localhost:50051").await?;
43//!
44//! // Get a vault
45//! let pubkey = ed25519::PrivKey::generate().pub_key();
46//! let vault = client.get_vault(Address::new(pubkey.owner(), 1)).await?;
47//! println!("Vault balance: {}", vault.balance());
48//!
49//! // Check a transaction status
50//! // In a real app, this would be a valid signature
51//! let signature = ed25519::Signature::from_str("<MY SIGNATURE>")?
52//! .into_signature_enum(pubkey);
53//! let status = client.get_transaction_status(signature.hash_sha256()).await?;
54//! println!("Transaction status: {:?}", status);
55//!
56//! // Query current epoch
57//! let epoch = client.get_epoch().await?;
58//! println!("Current epoch: {}", epoch);
59//!
60//! // Subscribe to network events (requires tokio to run)
61//! let mut events = client.stream_base_layer_events(0).await?;
62//!
63//! // Process the first 5 events
64//! let mut count = 0;
65//! while let Some(event) = events.next().await {
66//! if let Ok(event) = event {
67//! println!("Received event: {:?}", event);
68//! count += 1;
69//! if count >= 5 {
70//! break;
71//! }
72//! }
73//! }
74//! Ok(())
75//! }
76//! ```
77
78use crate::{
79 core::Shard,
80 crypto::HashDigest,
81 events::BaseLayerEvent,
82 sdl::StateDiffList,
83 transactions::{
84 types::TransactionType,
85 SignedTransaction,
86 TransactionStatus,
87 TypeData,
88 },
89};
90use http::{
91 uri::{
92 self,
93 InvalidUri,
94 Scheme,
95 },
96 Uri,
97};
98use primitives::{
99 domain_agreement::DomainAgreement,
100 type_aliases::{
101 Epoch,
102 TxId,
103 },
104 vault::{
105 base::Vault as BaseVault,
106 sharded::Vault as ShardedVault,
107 Address,
108 OwnerId,
109 },
110};
111use proto_types::{
112 error::HashSnafu,
113 sdl::SdlStatus,
114 services::{
115 GetBaseVaultRequest,
116 GetDomainAgreementRequest,
117 GetDomainOwnerIdsRequest,
118 GetEpochRequest,
119 GetEpochResponse,
120 GetSdlDataRequest,
121 GetShardedVaultsRequest,
122 GetTransactionStatusRequest,
123 StreamBaseLayerEventRequest,
124 ValidatorServiceClient,
125 },
126};
127use snafu::{
128 ResultExt,
129 Snafu,
130};
131use std::collections::HashMap;
132use tokio_stream::{
133 Stream,
134 StreamExt,
135};
136use tonic::{
137 transport::Channel,
138 Request,
139};
140
141/// Errors that can occur when using the [`BaseRpcClient`]
142#[derive(Debug, Snafu)]
143pub enum RpcError {
144 /// Failed to establish a connection to base layer
145 ///
146 /// This error occurs when the client cannot connect to the specified
147 /// base layer URL, typically due to network issues or because
148 /// the node is not running.
149 #[snafu(display("Could not connect: {source}"))]
150 Connection {
151 /// The underlying transport error
152 source: tonic::transport::Error,
153 },
154
155 /// The RPC request was rejected by base layer
156 ///
157 /// This error occurs when the request is received by the base layer
158 /// but cannot be processed, such as when requesting a non-existent
159 /// vault or submitting an invalid transaction.
160 #[snafu(display("Request failed with status: {status}"))]
161 Request {
162 /// The status returned by the base layer
163 #[snafu(source(from(tonic::Status, Box::new)))]
164 status: Box<tonic::Status>,
165 },
166
167 /// Failed to convert between protocol buffer and native format
168 ///
169 /// This error occurs when there's a problem converting between the
170 /// wire format (protocol buffers) and the native Rust types, typically
171 /// due to missing or malformed data.
172 #[snafu(display("Could not convert type: {source}"))]
173 IntoProto {
174 /// The underlying conversion error
175 source: proto_types::error::ProtoError,
176 },
177
178 /// Failed to parse base layer URL
179 ///
180 /// This error occurs when the URL provided to connect to the base layer is
181 /// malformed or cannot be parsed.
182 #[snafu(display("Could not parse URL: {source}"))]
183 InvalidUrl {
184 /// The underlying HTTP error
185 source: http::Error,
186 },
187}
188
189impl RpcError {
190 /// Returns true if the underlying error is a gRPC not found status.
191 pub fn is_not_found(&self) -> bool {
192 matches!(self, Self::Request { status } if status.code() == tonic::Code::NotFound)
193 }
194}
195
196/// Client for communicating with the delta base layer through RPC
197///
198/// [BaseRpcClient] provides a high-level interface for interacting with
199/// delta base layers. It handles connection management, request formatting,
200/// and response parsing for all base layer operations.
201///
202/// # Example
203///
204/// ```rust,no_run
205/// use delta_base_sdk::rpc::BaseRpcClient;
206///
207/// async fn connect() -> Result<(), Box<dyn std::error::Error>> {
208/// // Connect to local base layer provider
209/// let client = BaseRpcClient::new("http://localhost:50051").await?;
210///
211/// // Connect to a remote node with HTTPS
212/// let secure_client = BaseRpcClient::new("https://baselayer.example.com").await?;
213///
214/// // Connect with just the host (HTTPS will be added automatically)
215/// let auto_scheme_client = BaseRpcClient::new("baselayer.example.com").await?;
216///
217/// Ok(())
218/// }
219/// ```
220#[derive(Debug, Clone)]
221pub struct BaseRpcClient {
222 // The inner client is cloned in all calls but this is cheap and avoids mutable self refs:
223 // https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html#multiplexing-requests
224 inner: ValidatorServiceClient<Channel>,
225}
226
227impl BaseRpcClient {
228 /// Creates a new client by connecting to the specified base layer provider URL
229 ///
230 /// This method establishes a connection to delta base layer at the specified
231 /// URL. If the URL does not include a scheme (http:// or https://), HTTPS will be
232 /// used automatically.
233 ///
234 /// # Parameters
235 ///
236 /// * `url` - URL of base layer provider to connect to, either as a string or a URI
237 ///
238 /// # Errors
239 ///
240 /// Returns [RpcError] if:
241 /// - The URL cannot be parsed; or
242 /// - The connection cannot be established
243 pub async fn new(url: impl TryInto<Uri, Error = InvalidUri> + Send) -> Result<Self, RpcError> {
244 let uri = add_missing_scheme(url)?;
245 let inner = ValidatorServiceClient::connect(uri.to_string())
246 .await
247 .context(ConnectionSnafu)?;
248 Ok(Self { inner })
249 }
250
251 /// Submits a signed transaction to the base layer network
252 ///
253 /// This method sends a transaction to base layer for processing.
254 /// The transaction must be properly signed with a valid signature from
255 /// the transaction signer's private key.
256 ///
257 /// # Parameters
258 ///
259 /// * `tx` - Signed transaction to submit
260 ///
261 /// # Errors
262 ///
263 /// Returns [RpcError] if:
264 /// - The connection to the base layer provider fails;
265 /// - The base layer provider rejects the transaction; or
266 /// - There's a protocol conversion error.
267 pub async fn submit_transaction<T>(&self, tx: SignedTransaction<T>) -> Result<(), RpcError>
268 where
269 T: TransactionType + Into<TypeData> + Send,
270 {
271 self.inner
272 .clone()
273 .submit_transaction(Request::new(tx.into()))
274 .await
275 .context(RequestSnafu)?;
276 Ok(())
277 }
278
279 /// Gets the base vault for an owner in the base layer shard
280 ///
281 /// This is a convenience method that calls [`get_vault`](BaseRpcClient::get_vault)
282 /// with the base layer shard (0).
283 ///
284 /// # Parameters
285 ///
286 /// * `owner` - The ID of the vault owner
287 ///
288 /// # Errors
289 ///
290 /// Returns [RpcError] if:
291 /// - The vault doesn't exist;
292 /// - The connection to the base layer provider fails; or
293 /// - There's a protocol conversion error.
294 pub async fn get_base_vault(&self, owner: OwnerId) -> Result<BaseVault, RpcError> {
295 let proto_vault = self
296 .inner
297 .clone()
298 .get_base_vault(GetBaseVaultRequest {
299 owner: owner.into(),
300 })
301 .await
302 .context(RequestSnafu)?
303 .into_inner();
304
305 Ok(proto_vault.into())
306 }
307
308 /// Retrieves a domain vault data for a specific address
309 ///
310 /// # Parameters
311 ///
312 /// * `address` - of the vault to fetch
313 ///
314 /// # Errors
315 ///
316 /// Returns [RpcError] if:
317 /// - The vault under the given address doesn't exist;
318 /// - The connection to the base layer provider fails; or
319 /// - There's a protocol conversion error.
320 ///
321 /// # Example
322 ///
323 /// ```rust,no_run
324 /// use delta_base_sdk::{
325 /// crypto::ed25519,
326 /// rpc::BaseRpcClient,
327 /// vaults::{Address, ReadableNativeBalance},
328 /// };
329 ///
330 /// async fn get_vault_example(client: &BaseRpcClient) -> Result<(), Box<dyn std::error::Error>> {
331 /// let address = Address::new(ed25519::PubKey::generate().owner(), 1);
332 /// let vault = client.get_vault(address).await?;
333 /// println!("Vault data: {:?}", vault);
334 /// println!("Vault balance: {}", vault.balance());
335 /// Ok(())
336 /// }
337 /// ```
338 pub async fn get_vault(&self, address: Address) -> Result<ShardedVault, RpcError> {
339 let mut vaults = self.get_vaults([address]).await?;
340 vaults.remove(&address).ok_or_else(|| RpcError::Request {
341 status: Box::new(tonic::Status::not_found("Vault not found")),
342 })
343 }
344
345 /// Retrieves multiple sharded vaults by address
346 ///
347 /// This method fetches multiple vaults in a single request, up to a maximum of 100.
348 ///
349 /// # Parameters
350 ///
351 /// * `addresses` - An iterator of addresses to retrieve
352 ///
353 /// # Errors
354 ///
355 /// Returns [RpcError] if:
356 /// - Any of the vaults don't exist;
357 /// - The connection to the base layer provider fails; or
358 /// - There's a protocol conversion error.
359 pub async fn get_vaults(
360 &self,
361 addresses: impl IntoIterator<Item = Address>,
362 ) -> Result<HashMap<Address, ShardedVault>, RpcError> {
363 // Collect the iterator first since we'll need to consume it multiple times
364 let addresses: Vec<Address> = addresses.into_iter().collect();
365 if addresses.is_empty() {
366 return Ok(HashMap::new())
367 }
368
369 let addresses_proto: Vec<_> = addresses.iter().map(|address| (*address).into()).collect();
370
371 // Fetch vaults from server
372 let vaults = self
373 .inner
374 .clone()
375 .get_sharded_vaults(GetShardedVaultsRequest {
376 addresses: addresses_proto,
377 })
378 .await
379 .context(RequestSnafu)?
380 .into_inner()
381 .vaults;
382
383 // Reconstruct addresses matching server response order
384 addresses
385 .into_iter()
386 .zip(vaults)
387 .map(|(address, vault_proto)| {
388 vault_proto
389 .try_into()
390 .context(IntoProtoSnafu)
391 .map(|vault| (address, vault))
392 })
393 .collect()
394 }
395
396 /// Returns all the [OwnerId] that have a vault registered for the given shard on the base layer
397 pub async fn get_domain_owner_ids(&self, shard: Shard) -> Result<Vec<OwnerId>, RpcError> {
398 let response = self
399 .inner
400 .clone()
401 .get_domain_owner_ids(GetDomainOwnerIdsRequest { shard })
402 .await
403 .context(RequestSnafu)?
404 .into_inner();
405 response
406 .owner_ids
407 .into_iter()
408 .map(TryInto::try_into)
409 .collect::<Result<Vec<OwnerId>, _>>()
410 .context(HashSnafu)
411 .context(IntoProtoSnafu)
412 }
413
414 /// Gets the Domain Agreement for the provided shard.
415 ///
416 /// # Parameters
417 /// * `shard` - the shard to query the agreement for
418 ///
419 /// # Errors
420 ///
421 /// Returns [RpcError] if:
422 /// - There's no active agreement;
423 /// - The connection to the base layer RPC fails; or
424 /// - There's a protocol conversion error.
425 pub async fn get_domain_agreement(&self, shard: Shard) -> Result<DomainAgreement, RpcError> {
426 self.inner
427 .clone()
428 .get_domain_agreement(GetDomainAgreementRequest { shard })
429 .await
430 .context(RequestSnafu)?
431 .into_inner()
432 .try_into()
433 .context(IntoProtoSnafu)
434 }
435
436 /// Retrieves the current status of a transaction by its signature
437 ///
438 /// This method allows tracking the progress of a transaction through the
439 /// delta network. After submitting a transaction, you can use this method
440 /// to check if it has been processed, confirmed, or rejected.
441 ///
442 /// # Parameters
443 ///
444 /// * `tx_id` - [TxId] of the transaction to query
445 ///
446 /// # Errors
447 ///
448 /// Returns [RpcError] if:
449 /// - The connection to the base layer provider fails; or
450 /// - The signature is invalid or unknown to the network.
451 ///
452 /// # Note
453 ///
454 /// Transaction signatures are unique identifiers generated when signing a
455 /// transaction. Retain the signature after submitting a transaction if you
456 /// want to check its status later.
457 pub async fn get_transaction_status(&self, tx_id: TxId) -> Result<TransactionStatus, RpcError> {
458 let res = self
459 .inner
460 .clone()
461 .get_transaction_status(GetTransactionStatusRequest {
462 tx_id: tx_id.into(),
463 })
464 .await
465 .context(RequestSnafu)?;
466 res.into_inner().try_into().context(IntoProtoSnafu)
467 }
468
469 /// Retrieves the current epoch in delta
470 ///
471 /// This method returns the current epoch number. Epochs are protocol-defined
472 /// periods in the delta protocol during which the base layer provider set
473 /// remains constant.
474 ///
475 /// # Errors
476 ///
477 /// Returns [RpcError] if:
478 /// - The connection to the base layer provider fails
479 /// - The base layer provider cannot provide epoch information
480 ///
481 /// # Example
482 ///
483 /// ```rust,no_run
484 /// use delta_base_sdk::rpc::BaseRpcClient;
485 ///
486 /// async fn get_network_info() -> Result<(), Box<dyn std::error::Error>> {
487 /// let client = BaseRpcClient::new("http://localhost:50051").await?;
488 ///
489 /// // Get information about the current epoch
490 /// let epoch = client.get_epoch().await?;
491 ///
492 /// println!("Current epoch: {}", epoch);
493 ///
494 /// Ok(())
495 /// }
496 /// ```
497 pub async fn get_epoch(&self) -> Result<Epoch, RpcError> {
498 let res: GetEpochResponse = self
499 .inner
500 .clone()
501 .get_epoch(GetEpochRequest {})
502 .await
503 .context(RequestSnafu)?
504 .into_inner();
505 Ok(res.epoch)
506 }
507
508 /// Retrieves a [State Diff List (SDL)](StateDiffList) by its hash identifier
509 ///
510 /// A State Diff List (SDL) is a batch of state changes in the delta protocol.
511 /// This method allows retrieving the complete contents of an SDL by its hash,
512 /// enabling applications to inspect state changes or verify transaction results.
513 ///
514 /// # Parameters
515 ///
516 /// * `hash` - [Cryptographic hash](HashDigest) that uniquely identifies the SDL
517 ///
518 /// # Errors
519 ///
520 /// Returns [RpcError] if:
521 /// - The SDL with the specified hash doesn't exist;
522 /// - The connection to the base layer provider fails; or
523 /// - There's a protocol conversion error.
524 pub async fn get_sdl(&self, hash: HashDigest) -> Result<StateDiffList, RpcError> {
525 let res = self
526 .inner
527 .clone()
528 .get_sdl_data(GetSdlDataRequest {
529 sdl_hash: hash.into(),
530 })
531 .await
532 .context(RequestSnafu)?;
533 res.into_inner().try_into().context(IntoProtoSnafu)
534 }
535
536 /// Retrieves the [status](SdlStatus) of a [State Diff List (SDL)](StateDiffList).
537 ///
538 /// # Parameters
539 ///
540 /// * `hash` - [Cryptographic hash](HashDigest) that uniquely identifies the SDL
541 ///
542 /// # Errors
543 ///
544 /// Returns [RpcError] if:
545 /// - The SDL with the specified hash doesn't exist;
546 /// - The connection to the base layer provider fails; or
547 /// - There's a protocol conversion error.
548 pub async fn get_sdl_status(&self, hash: HashDigest) -> Result<SdlStatus, RpcError> {
549 let res = self
550 .inner
551 .clone()
552 .get_sdl_status(GetSdlDataRequest {
553 sdl_hash: hash.into(),
554 })
555 .await
556 .context(RequestSnafu)?;
557
558 res.into_inner().try_into().context(IntoProtoSnafu)
559 }
560
561 /// Subscribes to a stream of real-time events from the delta network
562 ///
563 /// This method establishes a streaming connection to receive events from the
564 /// base layer as they occur. Events include epoch transitions, vault migrations,
565 /// and other network state changes. The events are filtered by shard.
566 ///
567 /// # Parameters
568 ///
569 /// * `shard` - [Shard] number to receive events for
570 ///
571 /// # Errors
572 ///
573 /// Returns [RpcError] if:
574 /// - The connection to the base layer provider fails;
575 /// - The streaming request is rejected; or
576 /// - Individual stream items may contain errors if there are issues during streaming.
577 ///
578 /// # Example
579 ///
580 /// ```rust,no_run
581 /// use delta_base_sdk::{
582 /// events::BaseLayerEvent,
583 /// rpc::BaseRpcClient,
584 /// };
585 /// use tokio_stream::StreamExt;
586 ///
587 /// async fn monitor_events() -> Result<(), Box<dyn std::error::Error>> {
588 /// let client = BaseRpcClient::new("http://localhost:50051").await?;
589 ///
590 /// // Subscribe to base layer events for shard 0 (the base layer shard)
591 /// let mut event_stream = client.stream_base_layer_events(0).await?;
592 ///
593 /// println!("Listening for events...");
594 ///
595 /// // Process events as they arrive
596 /// while let Some(event_result) = event_stream.next().await {
597 /// match event_result {
598 /// Ok(event) => {
599 /// match event {
600 /// BaseLayerEvent::NewEpoch(epoch) => {
601 /// println!("New epoch started: {}", epoch);
602 /// },
603 /// BaseLayerEvent::SdlUpdate(update) => {
604 /// println!("SDL update received: {:?}", update.hash);
605 /// },
606 /// BaseLayerEvent::VaultEmigrated(address) => {
607 /// println!("Vault emigrated: {address:?}");
608 /// },
609 /// BaseLayerEvent::VaultImmigrated(address) => {
610 /// println!("Vault immigrated: {address:?}");
611 /// },
612 /// }
613 /// },
614 /// Err(e) => {
615 /// eprintln!("Error in event stream: {e}");
616 /// // Decide whether to break or continue based on the error
617 /// }
618 /// }
619 /// }
620 ///
621 /// Ok(())
622 /// }
623 /// ```
624 ///
625 /// # Note
626 ///
627 /// The returned stream implements the `Stream` trait from the `tokio_stream` crate.
628 /// You need to bring the `StreamExt` trait into scope to use methods like `next()`.
629 ///
630 /// This is a long-lived connection that will continue to deliver events until
631 /// the stream is dropped or the connection is closed. It's suitable for building
632 /// reactive applications that need to respond to network events in real-time.
633 pub async fn stream_base_layer_events(
634 &self,
635 shard: Shard,
636 ) -> Result<impl Stream<Item = Result<BaseLayerEvent, RpcError>> + Send, RpcError> {
637 let res = self
638 .inner
639 .clone()
640 .stream_base_layer_events(StreamBaseLayerEventRequest { shard })
641 .await
642 .context(RequestSnafu)?;
643
644 let stream = res.into_inner().map(|r| {
645 r.context(RequestSnafu)
646 .and_then(|u| u.try_into().context(IntoProtoSnafu))
647 });
648
649 Ok(stream)
650 }
651}
652
653/// The default URI scheme to use when none is provided
654const DEFAULT_SCHEME: Scheme = Scheme::HTTPS;
655
656/// Adds the HTTPS scheme to a URL if it doesn't already have a scheme
657///
658/// This utility function ensures that all URLs have a scheme, adding HTTPS
659/// as the default if none is specified. This allows users to provide just
660/// the host and port (e.g., "localhost:50051") without having to specify
661/// the scheme.
662///
663/// # Parameters
664///
665/// * `url` - URL to process, which may or may not have a scheme
666///
667/// # Errors
668///
669/// Returns [RpcError] if:
670/// - The URL cannot be parsed as a valid URI
671/// - The modified URI cannot be built
672#[allow(clippy::result_large_err)]
673fn add_missing_scheme(url: impl TryInto<Uri, Error = InvalidUri>) -> Result<Uri, RpcError> {
674 let uri: Uri = url
675 .try_into()
676 .map_err(|e| e.into())
677 .context(InvalidUrlSnafu)?;
678
679 if uri.scheme().is_some() {
680 Ok(uri)
681 } else {
682 let path = uri
683 .path_and_query()
684 .map(|p| p.as_str())
685 .unwrap_or("/")
686 .to_string();
687 Into::<uri::Builder>::into(uri)
688 .scheme(DEFAULT_SCHEME)
689 .path_and_query(path)
690 .build()
691 .context(InvalidUrlSnafu)
692 }
693}
694
695#[cfg(test)]
696mod test {
697 use super::*;
698
699 #[test]
700 fn test_add_missing_scheme() {
701 // leaves existing scheme
702 assert_eq!(
703 "http://localhost:5005/",
704 add_missing_scheme("http://localhost:5005")
705 .unwrap()
706 .to_string()
707 );
708 assert_eq!(
709 "http://localhost:5005/path/yes",
710 add_missing_scheme("http://localhost:5005/path/yes")
711 .unwrap()
712 .to_string()
713 );
714 assert_eq!(
715 "yolo://net.delta.net/",
716 add_missing_scheme("yolo://net.delta.net")
717 .unwrap()
718 .to_string()
719 );
720
721 // adds scheme if missing
722 assert_eq!(
723 "https://net.delta.net/",
724 add_missing_scheme("net.delta.net").unwrap().to_string()
725 );
726 assert_eq!(
727 "https://localhost:12345/",
728 add_missing_scheme("localhost:12345").unwrap().to_string()
729 );
730 }
731}