Streams Direct SDK (Rust)

Streams Direct SDKs

Choose the SDK version that matches your needs.

The Data Streams SDK for Rust provides a client library for interacting with Chainlink Data Streams. It offers both point-in-time data retrieval and real-time data streaming capabilities with built-in fault tolerance.

Requirements

  • Rust 1.70 or later
  • Valid Data Streams API credentials

Features

  • REST API Client: Fetch point-in-time data from Data Streams
  • WebSocket Client: Stream real-time data with automatic reconnection
  • Report Decoding: Built-in support for decoding and validating multiple report formats (V1, V2, V3, V4)
  • High Availability: WebSocket connection management with failover support
  • Tracing Support: Optional logging via the tracing crate
  • Async/Await: Built on Tokio for efficient async operations

Installation

Add the SDK to your project by including it in your Cargo.toml:

[dependencies]
data-streams-sdk = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/sdk" }
data-streams-report = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/report" }

Feature Flags

The SDK provides several feature flags to customize its functionality:

  • "rest": Enables the REST API client (enabled by default)
  • "websocket": Enables the WebSocket client for real-time streaming (enabled by default)
  • "tracing": Enables logging with the tracing crate (optional)
  • "full": Enables all features (default)

Report Types

The Rust SDK supports multiple report formats, including V3 (Crypto) and V4 (RWA). For the complete list of fields and their detailed descriptions, refer to the dedicated schema pages:

Below are basic code snippets for decoding these reports with the SDK:

V3 Reports (Crypto Streams)

use data_streams_report::report::v3::ReportDataV3;

// After you get 'report_blob' (for example, from 'decode_full_report' or a contract call):
let report_data = ReportDataV3::decode(&report_blob)?;

// Access whichever fields you need:
println!("Benchmark Price: {}", report_data.benchmark_price);
println!("Bid: {}", report_data.bid);
println!("Ask: {}", report_data.ask);

// ... etc.

For more details on every field in V3 (Crypto) reports, see the V3 report schema page.

V4 Reports (RWA Streams)

use data_streams_report::report::v4::ReportDataV4;

let report_data = ReportDataV4::decode(&report_blob)?;

// Example usage:
println!("Price: {}", report_data.price);
println!("Market Status: {}", report_data.market_status);
// ... etc.

For more details on every field in V4 (RWA) reports, see the V4 report schema page.

Authentication

The SDK uses HMAC authentication for all API requests. Configure your credentials:

use data_streams_sdk::config::Config;
use std::env;

let api_key = env::var("API_KEY").expect("API_KEY must be set");
let user_secret = env::var("USER_SECRET").expect("USER_SECRET must be set");

let config = Config::new(
    api_key,
    user_secret,
    "https://api.testnet-dataengine.chain.link".to_owned(),
    "wss://ws.testnet-dataengine.chain.link".to_owned(),
)
.build()?;

Security best practices:

  • Store credentials in environment variables
  • Avoid hardcoding credentials in source code
  • Use separate credentials for development and production
  • Rotate credentials periodically

WebSocket Features

High Availability Mode

use data_streams_sdk::config::{Config, WebSocketHighAvailability};

let ws_urls = "wss://ws1.dataengine.chain.link,wss://ws2.dataengine.chain.link";
let config = Config::new(api_key, api_secret, rest_url, ws_urls)
    // Enable WebSocket HA mode
    .with_ws_ha(WebSocketHighAvailability::Enabled)
    // Increase the max reconnection attempts (optional, default is 5)
    .with_ws_max_reconnect(10)
    .build()?;
  • Multiple WebSocket endpoints
  • Automatic failover on connection loss
  • Parallel connections to reduce gaps in data

Connection Management

The SDK allows you to:

  • Set ws_max_reconnect: The maximum number of reconnection attempts (default: 5)
  • Enable HA for multiple WebSocket endpoints
  • Optional insecure_skip_verify for TLS

Example:

use data_streams_sdk::config::{Config, InsecureSkipVerify, WebSocketHighAvailability};

let ws_urls = "wss://ws.testnet-dataengine.chain.link";

let config = Config::new(api_key, api_secret, rest_url, ws_urls)
    .with_ws_ha(WebSocketHighAvailability::Enabled)
    .with_ws_max_reconnect(5)
    .with_insecure_skip_verify(InsecureSkipVerify::Enabled)
    .build()?;

// Create and initialize the stream
let mut stream = Stream::new(&config, feed_ids).await?;
stream.listen().await?;

Error Handling

The SDK defines distinct error types:

  • ClientError for REST-based issues (e.g., HTTP request failures)
  • StreamError for WebSocket streaming issues
  • HmacError for authentication/HMAC generation problems

Example:

use data_streams_sdk::client::Client;
use data_streams_sdk::client::ClientError;

match client.get_latest_report(feed_id).await {
    Ok(report_response) => {
        println!("Report: {:?}", report_response.report);
    }
    Err(ClientError::ApiError(e)) => {
        eprintln!("Server returned an error: {}", e);
    }
    Err(e) => {
        eprintln!("Some other request error: {}", e);
    }
}

Examples

Step-by-Step Guides

More Examples

The SDK repository includes additional examples for common use cases:

  • Fetching a single report, bulk reports, or paginated reports
  • Compressing report data
  • Simple WebSocket streaming
  • Multiple WebSocket endpoints (HA)

Performance Considerations

  • Reuse the same Client instance whenever possible
  • Gracefully close WebSocket Stream objects when not in use (stream.close().await?)
  • Monitor memory usage if you expect large volumes of data
  • Use timeouts or retry logic in your own application where needed

Configuration Reference

Below is a detailed guide to the available builder methods for the ConfigBuilder. Each method is optional but can help you tailor the SDK’s behavior for your specific needs.

pub struct ConfigBuilder {
    // Enables High Availability (HA) for WebSocket connections
    pub fn with_ws_ha(mut self, WebSocketHighAvailability) -> Self;

    // Sets the max number of WebSocket reconnection attempts before giving up
    pub fn with_ws_max_reconnect(mut self, usize) -> Self;

    // Allows skipping TLS certificate verification (use with caution in production)
    pub fn with_insecure_skip_verify(mut self, InsecureSkipVerify) -> Self;

    // Provides a way to inspect HTTP responses for logging or debugging (REST calls only)
    pub fn with_inspect_http_response(mut self, fn(&Response)) -> Self;

    // Finalizes and validates the config
    pub fn build(self) -> Result<Config, ConfigError>;
}

with_ws_ha(WebSocketHighAvailability)

  • Purpose: Enables or disables HA mode for WebSocket streaming.

  • Values:

    • WebSocketHighAvailability::Enabled: Maintains multiple WebSocket connections to different endpoints simultaneously.
    • WebSocketHighAvailability::Disabled: Maintains a single connection.
  • Default: Disabled.

  • Example Use Case:

    // When you want uninterrupted streaming even if one server goes down:
    let config = Config::new(api_key, api_secret, rest_url, "wss://ws1,...,wss://wsN")
        .with_ws_ha(WebSocketHighAvailability::Enabled)
        .build()?;
    

with_ws_max_reconnect(usize)

  • Purpose: Sets the maximum number of reconnection attempts before the SDK stops trying.

  • Default: 5.

  • Parameter: usize indicates how many times you want the SDK to attempt reconnecting after a connection failure.

  • Example Use Case:

    // If you want to be very persistent:
    .with_ws_max_reconnect(20)
    

with_insecure_skip_verify(InsecureSkipVerify)

  • Purpose: When set, it allows skipping TLS certificate verification.

  • Values:

    • InsecureSkipVerify::Enabled: Skips verification of certificates — useful for local dev or self-signed certs.
    • InsecureSkipVerify::Disabled: Normal, secure certificate handling (recommended for production).
  • Default: Disabled.

  • Example Use Case:

    // Useful in development when using self-signed certs or containers
    .with_insecure_skip_verify(InsecureSkipVerify::Enabled)
    

with_inspect_http_response(fn(&Response))

  • Purpose: Allows you to provide a callback function that inspects any HTTP Response received by the REST client.

  • Default: None. (No inspection)

  • Example:

    // Log every HTTP response's status code
    .with_inspect_http_response(|response| {
        println!("Received HTTP status: {}", response.status());
    })
    

What's next

Get the latest Chainlink content straight to your inbox.