Skip to content

Failure to subscribe 50,000 variables at once #122

@Manas-Suri

Description

@Manas-Suri

Hello everyone,

I am grateful for this repository to help us with asynchronously running OPC UA on Rust. I am facing a big issue at the moment; I want to subscribe to 50,000 variables at once. This client contains 30,000 variables that are updated every 1 second and 20,000 variables that are updated every 5 seconds. As this implementation does not allow me to have all the variables at once, I am trying to divide all the variables into chunks of 5000 variables.
As I am subscribing to the variables, these chunks get updated by the time I reach the next chunk to get the values. For example, I am fetching the 5,000-10,000 variables. The publisher updates the values, and then instead of subscribing to 10,000-15,000 values, this subscriber again fetches the 0-5,000 values. Due to this, I am not able to reach the fetching of all 50,000 values.

Things to note:

  1. I can not change the publisher rate. It needs to be fixed and is of 30,000 variables updated every 1 second and another 20,000 values updating every 5 seconds.
  2. I am running the whole OPC UA server on TCP.
  3. I am using the x86 architecture to run the server and client locally on my system.

Can you suggest to me or provide me a way to solve this issue? I am grateful for your help.

Thanking You

I have changed async-opcua/samples/simple-client/src/main.rs to have this functionality.
Code:

// OPCUA for Rust
// SPDX-License-Identifier: MPL-2.0
// Copyright (C) 2017-2024 Adam Lock

//! This simple OPC UA client will do the following:
//!
//! 1. Create a client configuration
//! 2. Connect to an endpoint specified by the url with security None
//! 3. Subscribe to values and loop forever printing out their values
use std::{sync::Arc, time::Duration};
use std::sync::{Mutex,MutexGuard};
use std::collections::HashSet;
use std::sync::RwLock;
use log::warn;
use opcua::{
    client::{ClientBuilder, DataChangeCallback, IdentityToken, MonitoredItem, Session},
    crypto::SecurityPolicy,
    types::{
        DataValue, MessageSecurityMode, MonitoredItemCreateRequest, NodeId, StatusCode,
        TimestampsToReturn, UserTokenPolicy,
    },
};

const OPCUA_SUBSCRIBER_100: usize = 29999; 
const OPCUA_SUBSCRIBER_500: usize = 19999; 
const DEFAULT_URL: &str = "opc.tcp://lapt-owl-011.salz:4840";
struct Args {
    help: bool,
    url: String,
}

impl Args {
    pub fn parse_args() -> Result<Args, Box<dyn std::error::Error>> {
        let mut args = pico_args::Arguments::from_env();
        Ok(Args {
            help: args.contains(["-h", "--help"]),
            url: args
                .opt_value_from_str("--url")?
                .unwrap_or_else(|| String::from(DEFAULT_URL)),
        })
    }

    pub fn usage() {
        println!(
            r#"Simple Client
Usage:
  -h, --help   Show help
  --url [url]  Url to connect to (default: {})"#,
            DEFAULT_URL
        );
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    // Read command line arguments
    let args = Args::parse_args().map_err(|_| Args::usage())?;
    if args.help {
        Args::usage();
        return Ok(());
    }
    // Optional - enable OPC UA logging
    env_logger::init();

    // Make the client configuration
    let mut client = ClientBuilder::new()
        .application_name("Simple Client")
        .application_uri("urn:SimpleClient")
        .product_uri("urn:SimpleClient")
        .trust_server_certs(true)
        .create_sample_keypair(true)
        .session_retry_limit(3)
        .max_array_length(9000)
        .max_message_size(16*1024*1024) // 16 MB
        .max_chunk_count(0)
        .client()
        .unwrap();

    let (session, event_loop) = client
        .connect_to_matching_endpoint(
            (
                args.url.as_ref(),
                SecurityPolicy::None.to_str(),
                MessageSecurityMode::None,
                UserTokenPolicy::anonymous(),
            ),
            IdentityToken::Anonymous,
        )
        .await
        .unwrap();
    let handle = event_loop.spawn();
    session.wait_for_connection().await;
    if let Err(result) = subscribe_to_variables(session.clone(), 2).await {
        println!(
            "ERROR: Got an error while subscribing to variables - {}",
            result
        );
        let _ = session.disconnect().await;
    }

    // It's a good idea to intercept ctrl-c and gracefully shut down the client
    // since servers will keep sessions alive for some time after a sudden disconnect.
    // This way, the session will be properly closed.
    let session_c = session.clone();
    tokio::task::spawn(async move {
        if let Err(e) = tokio::signal::ctrl_c().await {
            warn!("Failed to register CTRL-C handler: {e}");
            return;
        }
        let _ = session_c.disconnect().await;
    });

    handle.await.unwrap();
    
    Ok(())
}

// async fn subscribe_to_variables(session: Arc<Session>, ns: u16,output_val: Arc<Mutex<u16>>) -> Result<(), StatusCode> {
async fn subscribe_to_variables(session: Arc<Session>, ns: u16) -> Result<(), StatusCode> {
    // Creates a subscription with a data change callback
    let subscriber_count = Arc::new(Mutex::new(0));
    let received_nodes = Arc::new(RwLock::new(HashSet::new()));
    let subscription_id = session
        .create_subscription(
            Duration::from_secs(1),
            10,
            30,
            0,
            0,
            true,
            DataChangeCallback::new({ let subscriber_count1 = Arc::clone(&subscriber_count);
                let received_nodes1 = Arc::clone(&received_nodes);   
                move|dv, item| {
                    // println!("Data change callback triggered.");
                    let mut count = subscriber_count1.lock().unwrap();
                    *count += 1; // Increment the count
                    let mut seen = received_nodes1.write().unwrap();
                    seen.insert(item.item_to_monitor().node_id.clone());

                    if seen.len() == OPCUA_SUBSCRIBER_500.try_into().unwrap() {
                        seen.clear();
                        println!("Previous count: {}", *count);
                        *count = 0;
                    }
                    print_value(&dv, item)

                        
                }
            }),
        )
        .await?;
    

    println!("Created a subscription with id = {}", subscription_id);

    // let total_items = OPCUA_SUBSCRIBER_100;
    // let batch_size = 5198;
    let batch_size = 4000;
    let total_items = OPCUA_SUBSCRIBER_500;
    
        
        // // For store500_var
        for chunk_start in (0..=total_items).step_by(batch_size) {
            let chunk_end = (chunk_start + batch_size).min(total_items + 1);
            let mut items_batch: Vec<MonitoredItemCreateRequest> = Vec::new();
            // Create items for store100_var
            for v in chunk_start..chunk_end {
                if v < OPCUA_SUBSCRIBER_100 {
                    let var_name = format!("store100_var{:06}", v);
                    items_batch.push(NodeId::new(ns, var_name).into());
                }
            }
            // Create items for store500_var
            for v in chunk_start..chunk_end {
                if v < OPCUA_SUBSCRIBER_500 {
                    let var_name = format!("store500_var{:06}", v);
                    items_batch.push(NodeId::new(ns, var_name).into());
                }
            }
            if let Err(e) = session
                .create_monitored_items(subscription_id, TimestampsToReturn::Both, items_batch)
                .await
                {
                    println!(
                        "Error creating monitored items from {} to {}: {}",
                        chunk_start,
                        chunk_end - 1,
                        e
                    );
                    // return Err(e); // or continue, depending on tolerance
                    continue;
                }
        }
    Ok(())
}

fn print_value(data_value: &DataValue, item: &MonitoredItem) {
    let node_id = &item.item_to_monitor().node_id;
    if let Some(ref value) = data_value.value {
        println!("Item \"{}\", Value = {:?}", node_id, value);
    } else {
        println!(
            "Item \"{}\", Value not found, error: {}",
            node_id,
            data_value.status.as_ref().unwrap()
        );
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions