delta_base_sdk/
rpc.rs

1//! # RPC Communication with the delta Base Layer
2//!
3//! This module provides a client for communicating with delta base layer through
4//! RPC (Remote Procedure Call). It enables applications to query network state, submit
5//! transactions, retrieve vault data, and subscribe to network events.
6//!
7//! ## Overview
8//!
9//! The primary interface is the [`BaseRpcClient`] which handles connection
10//! management, request serialization, and response parsing. The client provides
11//! methods for:
12//!
13//! - [Submitting transactions to the network](BaseRpcClient::submit_transaction);
14//! - Retrieving vault data ([`get_vault`](BaseRpcClient::get_vault),
15//!   [`get_base_vault`](BaseRpcClient::get_base_vault),
16//!   [`get_vaults`](BaseRpcClient::get_vaults));
17//! - [Checking transaction status](BaseRpcClient::get_transaction_status); or
18//! - [Subscribing to network events](BaseRpcClient::stream_base_layer_events).
19//!
20//! All RPC operations use the [`tonic`] library for gRPC communication, with automatic
21//! serialization and deserialization between protocol buffer and native Rust types.
22//!
23//! ## Usage
24//!
25//! To use the RPC client, you need a running delta base layer at some known URL.
26//!
27//! ## Example
28//!
29//! ```rust,no_run
30//! use delta_base_sdk::{
31//!     core::Shard,
32//!     crypto::{ed25519, Hash256, IntoSignatureEnum},
33//!     rpc::{BaseRpcClient, RpcError},
34//!     vaults::{Address, ReadableNativeBalance, Vault},
35//! };
36//! # use std::error::Error;
37//! # use std::str::FromStr;
38//! # use tokio_stream::StreamExt;
39//!
40//! async fn rpc_example() -> Result<(), Box<dyn Error>> {
41//!     // Connect to base layer
42//!     let client = BaseRpcClient::new("http://localhost:50051").await?;
43//!
44//!     // Get a vault
45//!     let pub_key = ed25519::PrivKey::generate().pub_key();
46//!     let vault = client.get_vault(Address::new(pub_key.owner(), 1)).await?;
47//!     println!("Vault balance: {}", vault.balance());
48//!
49//!     // Check a transaction status
50//!     // In a real app, this would be a valid signature
51//!     let signature = ed25519::Signature::from_str("<MY SIGNATURE>")?
52//!         .into_signature_enum(pub_key);
53//!     let status = client.get_transaction_status(signature.hash_sha256()).await?;
54//!     println!("Transaction status: {:?}", status);
55//!
56//!     // Query current epoch
57//!     let epoch = client.get_epoch().await?;
58//!     println!("Current epoch: {}", epoch);
59//!
60//!     // Subscribe to network events (requires tokio to run)
61//!     let mut events = client.stream_base_layer_events(0).await?;
62//!
63//!     // Process the first 5 events
64//!     let mut count = 0;
65//!     while let Some(event) = events.next().await {
66//!         if let Ok(event) = event {
67//!             println!("Received event: {:?}", event);
68//!             count += 1;
69//!             if count >= 5 {
70//!                 break;
71//!             }
72//!         }
73//!     }
74//!     Ok(())
75//! }
76//! ```
77
78use crate::{
79    core::Shard,
80    events::BaseLayerEvent,
81    sdl::StateDiffList,
82    transactions::{
83        types::TransactionType,
84        SignedTransaction,
85        TransactionStatus,
86        TypeData,
87    },
88};
89use http::{
90    uri::{
91        self,
92        InvalidUri,
93        Scheme,
94    },
95    Uri,
96};
97use primitives::{
98    constants::BASE_LAYER_SHARD,
99    domain_agreement::DomainAgreement,
100    type_aliases::{
101        Epoch,
102        SdlId,
103        TxId,
104    },
105    vault::{
106        base::Vault as BaseVault,
107        sharded::Vault as ShardedVault,
108        Address,
109        OwnerId,
110    },
111};
112use proto_types::{
113    error::HashSnafu,
114    sdl::SdlStatus,
115    services::{
116        GetBaseVaultRequest,
117        GetDomainAgreementRequest,
118        GetDomainOwnerIdsRequest,
119        GetEpochRequest,
120        GetEpochResponse,
121        GetSdlDataRequest,
122        GetShardedVaultsRequest,
123        GetTransactionStatusRequest,
124        StreamBaseLayerEventRequest,
125        ValidatorServiceClient,
126    },
127};
128use snafu::{
129    ResultExt,
130    Snafu,
131};
132use std::collections::HashMap;
133use tokio_stream::{
134    Stream,
135    StreamExt,
136};
137use tonic::{
138    transport::Channel,
139    Request,
140};
141use tonic_types::StatusExt;
142
143/// Maximal size of gRPC messages that the client can decode, and thus receive (10 MiB).
144const MAX_DECODING_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
145
146/// Errors that can occur when using the [`BaseRpcClient`]
147#[derive(Debug, Snafu)]
148pub enum RpcError {
149    /// Failed to establish a connection to base layer
150    ///
151    /// This error occurs when the client cannot connect to the specified
152    /// base layer URL, typically due to network issues or because
153    /// the node is not running.
154    #[snafu(display("Could not connect: {source}"))]
155    Connection {
156        /// The underlying transport error
157        source: tonic::transport::Error,
158    },
159
160    /// The RPC request was rejected by base layer
161    ///
162    /// This error occurs when the request is received by the base layer
163    /// but cannot be processed, such as when requesting a non-existent
164    /// vault or submitting an invalid transaction.
165    #[snafu(display("Request failed with status: {status}"))]
166    Request {
167        /// The status returned by the base layer
168        #[snafu(source(from(tonic::Status, Box::new)))]
169        status: Box<tonic::Status>,
170    },
171
172    /// Failed to convert between protocol buffer and native format
173    ///
174    /// This error occurs when there's a problem converting between the
175    /// wire format (protocol buffers) and the native Rust types, typically
176    /// due to missing or malformed data.
177    #[snafu(display("Could not convert type: {source}"))]
178    IntoProto {
179        /// The underlying conversion error
180        source: proto_types::error::ProtoError,
181    },
182
183    /// Failed to parse base layer URL
184    ///
185    /// This error occurs when the URL provided to connect to the base layer is
186    /// malformed or cannot be parsed.
187    #[snafu(display("Could not parse URL: {source}"))]
188    InvalidUrl {
189        /// The underlying HTTP error
190        source: http::Error,
191    },
192}
193
194impl RpcError {
195    /// Returns true if the underlying error is a gRPC not found status.
196    pub fn is_not_found(&self) -> bool {
197        matches!(self, Self::Request { status } if status.code() == tonic::Code::NotFound)
198    }
199
200    /// Returns true if the underlying error is an RPC epoch mismatch
201    pub fn is_epoch_mismatch(&self) -> bool {
202        matches!(self, Self::Request { status } if status.get_details_bad_request().is_some_and(|bad_req| {
203            bad_req
204                .field_violations
205                .iter()
206                .any(|fv| fv.field == "epoch")
207        }))
208    }
209}
210
211/// Client for communicating with the delta base layer through RPC
212///
213/// [BaseRpcClient] provides a high-level interface for interacting with
214/// delta base layers. It handles connection management, request formatting,
215/// and response parsing for all base layer operations.
216///
217/// # Example
218///
219/// ```rust,no_run
220/// use delta_base_sdk::rpc::BaseRpcClient;
221///
222/// async fn connect() -> Result<(), Box<dyn std::error::Error>> {
223///     // Connect to local base layer provider
224///     let client = BaseRpcClient::new("http://localhost:50051").await?;
225///
226///     // Connect to a remote node with HTTPS
227///     let secure_client = BaseRpcClient::new("https://baselayer.example.com").await?;
228///
229///     // Connect with just the host (HTTPS will be added automatically)
230///     let auto_scheme_client = BaseRpcClient::new("baselayer.example.com").await?;
231///
232///     Ok(())
233/// }
234/// ```
235#[derive(Debug, Clone)]
236pub struct BaseRpcClient {
237    // The inner client is cloned in all calls but this is cheap and avoids mutable self refs:
238    // https://docs.rs/tonic/latest/tonic/transport/struct.Channel.html#multiplexing-requests
239    inner: ValidatorServiceClient<Channel>,
240}
241
242impl BaseRpcClient {
243    /// Creates a new client by connecting to the specified base layer provider URL
244    ///
245    /// This method establishes a connection to delta base layer at the specified
246    /// URL. If the URL does not include a scheme (http:// or https://), HTTPS will be
247    /// used automatically.
248    ///
249    /// # Parameters
250    ///
251    /// * `url` - URL of base layer provider to connect to, either as a string or a URI
252    ///
253    /// # Errors
254    ///
255    /// Returns [RpcError] if:
256    /// - The URL cannot be parsed; or
257    /// - The connection cannot be established
258    pub async fn new(url: impl TryInto<Uri, Error = InvalidUri> + Send) -> Result<Self, RpcError> {
259        let uri = add_missing_scheme(url)?;
260        let inner = ValidatorServiceClient::connect(uri.to_string())
261            .await
262            .context(ConnectionSnafu)?
263            .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
264        Ok(Self { inner })
265    }
266
267    /// Submits a signed transaction to the base layer network
268    ///
269    /// This method sends a transaction to base layer for processing.
270    /// The transaction must be properly signed with a valid signature from
271    /// the transaction signer's private key.
272    ///
273    /// # Parameters
274    ///
275    /// * `tx` - Signed transaction to submit
276    ///
277    /// # Errors
278    ///
279    /// Returns [RpcError] if:
280    /// - The connection to the base layer provider fails;
281    /// - The base layer provider rejects the transaction; or
282    /// - There's a protocol conversion error.
283    pub async fn submit_transaction<T>(&self, tx: SignedTransaction<T>) -> Result<(), RpcError>
284    where
285        T: TransactionType + Into<TypeData> + Send,
286    {
287        self.inner
288            .clone()
289            .submit_transaction(Request::new(tx.into()))
290            .await
291            .context(RequestSnafu)?;
292        Ok(())
293    }
294
295    /// Gets the base vault for an owner in the base shard (0).
296    ///
297    /// This only fetches vaults from the base shard. Use
298    /// [`get_vault`](BaseRpcClient::get_vault) otherwise.
299    ///
300    /// # Parameters
301    ///
302    /// * `owner` - The ID of the vault owner
303    ///
304    /// # Errors
305    ///
306    /// - The vault doesn't exist;
307    /// - The connection to the base layer provider fails; or
308    /// - There's a protocol conversion error.
309    pub async fn get_base_vault(&self, owner: OwnerId) -> Result<BaseVault, RpcError> {
310        let proto_vault = self
311            .inner
312            .clone()
313            .get_base_vault(GetBaseVaultRequest {
314                owner: owner.into(),
315            })
316            .await
317            .context(RequestSnafu)?
318            .into_inner();
319
320        proto_vault.try_into().context(IntoProtoSnafu)
321    }
322
323    /// Retrieves a sharded vault for a specific address
324    ///
325    /// # Parameters
326    ///
327    /// * `address` - of the vault to fetch
328    ///
329    /// # Errors
330    ///
331    /// - The vault under the given address doesn't exist;
332    /// - `address.shard=0` i.e. a base vault was requested (use [Self::get_base_vault] instead);
333    /// - The connection to the base layer provider fails; or
334    /// - There's a protocol conversion error.
335    ///
336    /// # Example
337    ///
338    /// ```rust,no_run
339    /// use delta_base_sdk::{
340    ///     crypto::ed25519,
341    ///     rpc::BaseRpcClient,
342    ///     vaults::{Address, ReadableNativeBalance},
343    /// };
344    ///
345    /// async fn get_vault_example(client: &BaseRpcClient) -> Result<(), Box<dyn std::error::Error>> {
346    ///     let address = Address::new(ed25519::PubKey::generate().owner(), 1);
347    ///     let vault = client.get_vault(address).await?;
348    ///     println!("Vault data: {:?}", vault);
349    ///     println!("Vault balance: {}", vault.balance());
350    ///     Ok(())
351    /// }
352    /// ```
353    pub async fn get_vault(&self, address: Address) -> Result<ShardedVault, RpcError> {
354        let mut vaults = self.get_vaults([address]).await?;
355        vaults.remove(&address).ok_or_else(|| RpcError::Request {
356            status: Box::new(tonic::Status::not_found("Vault not found")),
357        })
358    }
359
360    /// Retrieves multiple sharded vaults by address
361    ///
362    /// This method fetches multiple vaults in a single request, up to a maximum of 100.
363    ///
364    /// # Parameters
365    ///
366    /// * `addresses` - An iterator of addresses to retrieve
367    ///
368    /// # Errors
369    ///
370    /// - Any of the vaults don't exist;
371    /// - Any `address.shard=0` i.e. a base vault was requested (use [Self::get_base_vault] instead);
372    /// - The connection to the base layer provider fails; or
373    /// - There's a protocol conversion error.
374    pub async fn get_vaults(
375        &self,
376        addresses: impl IntoIterator<Item = Address>,
377    ) -> Result<HashMap<Address, ShardedVault>, RpcError> {
378        // Collect the iterator first since we'll need to consume it multiple times
379        let addresses: Vec<Address> = addresses.into_iter().collect();
380        if addresses.is_empty() {
381            return Ok(HashMap::new())
382        }
383        // TODO(#1535) this should become obsolete with a non-zero address type
384        if addresses
385            .iter()
386            .any(|addr| addr.shard() == BASE_LAYER_SHARD)
387        {
388            return Err(RpcError::Request {
389                status: Box::new(tonic::Status::not_found(
390                    "Cannot get base vaults (shard = 0)",
391                )),
392            })
393        }
394
395        let addresses_proto: Vec<_> = addresses.iter().map(|address| (*address).into()).collect();
396
397        // Fetch vaults from server
398        let vaults = self
399            .inner
400            .clone()
401            .get_sharded_vaults(GetShardedVaultsRequest {
402                addresses: addresses_proto,
403            })
404            .await
405            .context(RequestSnafu)?
406            .into_inner()
407            .vaults;
408
409        // Reconstruct addresses matching server response order
410        addresses
411            .into_iter()
412            .zip(vaults)
413            .map(|(address, vault_proto)| {
414                vault_proto
415                    .try_into()
416                    .context(IntoProtoSnafu)
417                    .map(|vault| (address, vault))
418            })
419            .collect()
420    }
421
422    /// Returns all the [OwnerId] that have a vault registered for the given shard on the base layer
423    pub async fn get_domain_owner_ids(&self, shard: Shard) -> Result<Vec<OwnerId>, RpcError> {
424        let response = self
425            .inner
426            .clone()
427            .get_domain_owner_ids(GetDomainOwnerIdsRequest { shard })
428            .await
429            .context(RequestSnafu)?
430            .into_inner();
431        response
432            .owner_ids
433            .into_iter()
434            .map(TryInto::try_into)
435            .collect::<Result<Vec<OwnerId>, _>>()
436            .context(HashSnafu)
437            .context(IntoProtoSnafu)
438    }
439
440    /// Gets the Domain Agreement for the provided shard.
441    ///
442    /// # Parameters
443    /// * `shard` - the shard to query the agreement for
444    ///
445    /// # Errors
446    ///
447    /// Returns [RpcError] if:
448    /// - There's no active agreement;
449    /// - The connection to the base layer RPC fails; or
450    /// - There's a protocol conversion error.
451    pub async fn get_domain_agreement(&self, shard: Shard) -> Result<DomainAgreement, RpcError> {
452        self.inner
453            .clone()
454            .get_domain_agreement(GetDomainAgreementRequest { shard })
455            .await
456            .context(RequestSnafu)?
457            .into_inner()
458            .try_into()
459            .context(IntoProtoSnafu)
460    }
461
462    /// Retrieves the current status of a transaction by its signature
463    ///
464    /// This method allows tracking the progress of a transaction through the
465    /// delta network. After submitting a transaction, you can use this method
466    /// to check if it has been processed, confirmed, or rejected.
467    ///
468    /// # Parameters
469    ///
470    /// * `tx_id` - [TxId] of the transaction to query
471    ///
472    /// # Errors
473    ///
474    /// Returns [RpcError] if:
475    /// - The connection to the base layer provider fails; or
476    /// - The signature is invalid or unknown to the network.
477    ///
478    /// # Note
479    ///
480    /// Transaction signatures are unique identifiers generated when signing a
481    /// transaction. Retain the signature after submitting a transaction if you
482    /// want to check its status later.
483    pub async fn get_transaction_status(&self, tx_id: TxId) -> Result<TransactionStatus, RpcError> {
484        let res = self
485            .inner
486            .clone()
487            .get_transaction_status(GetTransactionStatusRequest {
488                tx_id: tx_id.into(),
489            })
490            .await
491            .context(RequestSnafu)?;
492        res.into_inner().try_into().context(IntoProtoSnafu)
493    }
494
495    /// Retrieves the current epoch in delta
496    ///
497    /// This method returns the current epoch number. Epochs are protocol-defined
498    /// periods in the delta protocol during which the base layer provider set
499    /// remains constant.
500    ///
501    /// # Errors
502    ///
503    /// Returns [RpcError] if:
504    /// - The connection to the base layer provider fails
505    /// - The base layer provider cannot provide epoch information
506    ///
507    /// # Example
508    ///
509    /// ```rust,no_run
510    /// use delta_base_sdk::rpc::BaseRpcClient;
511    ///
512    /// async fn get_network_info() -> Result<(), Box<dyn std::error::Error>> {
513    ///     let client = BaseRpcClient::new("http://localhost:50051").await?;
514    ///
515    ///     // Get information about the current epoch
516    ///     let epoch = client.get_epoch().await?;
517    ///
518    ///     println!("Current epoch: {}", epoch);
519    ///
520    ///     Ok(())
521    /// }
522    /// ```
523    pub async fn get_epoch(&self) -> Result<Epoch, RpcError> {
524        let res: GetEpochResponse = self
525            .inner
526            .clone()
527            .get_epoch(GetEpochRequest {})
528            .await
529            .context(RequestSnafu)?
530            .into_inner();
531        Ok(res.epoch)
532    }
533
534    /// Retrieves a [State Diff List (SDL)](StateDiffList) by its identifier
535    ///
536    /// A State Diff List (SDL) is a batch of state changes in the delta protocol.
537    /// This method allows retrieving the complete contents of an SDL by its ID,
538    /// enabling applications to inspect state changes or verify transaction results.
539    ///
540    /// # Parameters
541    ///
542    /// * `sdl_id` - ID of the SDL
543    ///
544    /// # Errors
545    ///
546    /// Returns [RpcError] if:
547    /// - The SDL with the specified ID doesn't exist;
548    /// - The connection to the base layer provider fails; or
549    /// - There's a protocol conversion error.
550    pub async fn get_sdl(&self, sdl_id: SdlId) -> Result<StateDiffList, RpcError> {
551        let res = self
552            .inner
553            .clone()
554            .get_sdl_data(GetSdlDataRequest {
555                sdl_id: sdl_id.into(),
556            })
557            .await
558            .context(RequestSnafu)?;
559        res.into_inner().try_into().context(IntoProtoSnafu)
560    }
561
562    /// Retrieves the [status](SdlStatus) of a [State Diff List (SDL)](StateDiffList).
563    ///
564    /// # Parameters
565    ///
566    /// * `sdl_id` - ID of the SDL
567    ///
568    /// # Errors
569    ///
570    /// Returns [RpcError] if:
571    /// - The SDL with the specified ID doesn't exist;
572    /// - The connection to the base layer provider fails; or
573    /// - There's a protocol conversion error.
574    pub async fn get_sdl_status(&self, sdl_id: SdlId) -> Result<SdlStatus, RpcError> {
575        let res = self
576            .inner
577            .clone()
578            .get_sdl_status(GetSdlDataRequest {
579                sdl_id: sdl_id.into(),
580            })
581            .await
582            .context(RequestSnafu)?;
583
584        res.into_inner().try_into().context(IntoProtoSnafu)
585    }
586
587    /// Subscribes to a stream of real-time events from the delta network
588    ///
589    /// This method establishes a streaming connection to receive events from the
590    /// base layer as they occur. Events include epoch transitions, vault migrations,
591    /// and other network state changes. The events are filtered by shard.
592    ///
593    /// # Parameters
594    ///
595    /// * `shard` - [Shard] number to receive events for
596    ///
597    /// # Errors
598    ///
599    /// Returns [RpcError] if:
600    /// - The connection to the base layer provider fails;
601    /// - The streaming request is rejected; or
602    /// - Individual stream items may contain errors if there are issues during streaming.
603    ///
604    /// # Example
605    ///
606    /// ```rust,no_run
607    /// use delta_base_sdk::{
608    ///     events::BaseLayerEvent,
609    ///     rpc::BaseRpcClient,
610    /// };
611    /// use tokio_stream::StreamExt;
612    ///
613    /// async fn monitor_events() -> Result<(), Box<dyn std::error::Error>> {
614    ///     let client = BaseRpcClient::new("http://localhost:50051").await?;
615    ///
616    ///     // Subscribe to base layer events for shard 0 (the base layer shard)
617    ///     let mut event_stream = client.stream_base_layer_events(0).await?;
618    ///
619    ///     println!("Listening for events...");
620    ///
621    ///     // Process events as they arrive
622    ///     while let Some(event_result) = event_stream.next().await {
623    ///         match event_result {
624    ///             Ok(event) => {
625    ///                 match event {
626    ///                     BaseLayerEvent::NewEpoch(epoch) => {
627    ///                         println!("New epoch started: {}", epoch);
628    ///                     },
629    ///                     BaseLayerEvent::SdlUpdate(update) => {
630    ///                         println!("SDL update received: {:?}", update.id);
631    ///                     },
632    ///                     BaseLayerEvent::VaultEmigrated(address) => {
633    ///                         println!("Vault emigrated: {address:?}");
634    ///                     },
635    ///                     BaseLayerEvent::VaultImmigrated(address) => {
636    ///                         println!("Vault immigrated: {address:?}");
637    ///                     },
638    ///                     BaseLayerEvent::CreditFromBase(address, amount) => {
639    ///                         println!("Local vault {address:?} received credit from base");
640    ///                     }
641    ///                     BaseLayerEvent::TransactionFailed{ tx_id, failure_info} => {
642    ///                         println!("Transaction failed: {tx_id:?}, {failure_info:?}");
643    ///                     }
644    ///                 }
645    ///             },
646    ///             Err(e) => {
647    ///                 eprintln!("Error in event stream: {e}");
648    ///                 // Decide whether to break or continue based on the error
649    ///             }
650    ///         }
651    ///     }
652    ///
653    ///     Ok(())
654    /// }
655    /// ```
656    ///
657    /// # Note
658    ///
659    /// The returned stream implements the `Stream` trait from the `tokio_stream` crate.
660    /// You need to bring the `StreamExt` trait into scope to use methods like `next()`.
661    ///
662    /// This is a long-lived connection that will continue to deliver events until
663    /// the stream is dropped or the connection is closed. It's suitable for building
664    /// reactive applications that need to respond to network events in real-time.
665    pub async fn stream_base_layer_events(
666        &self,
667        shard: Shard,
668    ) -> Result<impl Stream<Item = Result<BaseLayerEvent, RpcError>> + Send, RpcError> {
669        let res = self
670            .inner
671            .clone()
672            .stream_base_layer_events(StreamBaseLayerEventRequest { shard })
673            .await
674            .context(RequestSnafu)?;
675
676        let stream = res.into_inner().map(|r| {
677            r.context(RequestSnafu)
678                .and_then(|u| u.try_into().context(IntoProtoSnafu))
679        });
680
681        Ok(stream)
682    }
683}
684
685/// The default URI scheme to use when none is provided
686const DEFAULT_SCHEME: Scheme = Scheme::HTTPS;
687
688/// Adds the HTTPS scheme to a URL if it doesn't already have a scheme
689///
690/// This utility function ensures that all URLs have a scheme, adding HTTPS
691/// as the default if none is specified. This allows users to provide just
692/// the host and port (e.g., "localhost:50051") without having to specify
693/// the scheme.
694///
695/// # Parameters
696///
697/// * `url` - URL to process, which may or may not have a scheme
698///
699/// # Errors
700///
701/// Returns [RpcError] if:
702/// - The URL cannot be parsed as a valid URI
703/// - The modified URI cannot be built
704#[allow(clippy::result_large_err)]
705fn add_missing_scheme(url: impl TryInto<Uri, Error = InvalidUri>) -> Result<Uri, RpcError> {
706    let uri: Uri = url
707        .try_into()
708        .map_err(|e| e.into())
709        .context(InvalidUrlSnafu)?;
710
711    if uri.scheme().is_some() {
712        Ok(uri)
713    } else {
714        let path = uri
715            .path_and_query()
716            .map(|p| p.as_str())
717            .unwrap_or("/")
718            .to_string();
719        Into::<uri::Builder>::into(uri)
720            .scheme(DEFAULT_SCHEME)
721            .path_and_query(path)
722            .build()
723            .context(InvalidUrlSnafu)
724    }
725}
726
727#[cfg(test)]
728mod test {
729    use super::*;
730
731    #[test]
732    fn test_add_missing_scheme() {
733        // leaves existing scheme
734        assert_eq!(
735            "http://localhost:5005/",
736            add_missing_scheme("http://localhost:5005")
737                .unwrap()
738                .to_string()
739        );
740        assert_eq!(
741            "http://localhost:5005/path/yes",
742            add_missing_scheme("http://localhost:5005/path/yes")
743                .unwrap()
744                .to_string()
745        );
746        assert_eq!(
747            "yolo://net.delta.net/",
748            add_missing_scheme("yolo://net.delta.net")
749                .unwrap()
750                .to_string()
751        );
752
753        // adds scheme if missing
754        assert_eq!(
755            "https://net.delta.net/",
756            add_missing_scheme("net.delta.net").unwrap().to_string()
757        );
758        assert_eq!(
759            "https://localhost:12345/",
760            add_missing_scheme("localhost:12345").unwrap().to_string()
761        );
762    }
763}