Skip to content

npy2wig #78

Open
Open
@ClaudeHu

Description

@ClaudeHu

I create a new branch and put a function that transforms gtars.uniwig npy output into wig output. Should this be added to gtars.uniwig commands, or Python binding? I tested it locally with:

main.rs

use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::fs::{self, File};
use std::path::Path;
use std::io::Write;
use std::io::Read;
use byteorder::{LittleEndian, ReadBytesExt};
use ndarray::{Array, Ix1};
use ndarray_npy::read_npy;

/// Custom comparator for version sorting
fn version_sort(a: &String, b: &String) -> std::cmp::Ordering {
    use std::cmp::Ordering;

    let mut split_a = a.split(|c: char| !c.is_numeric()).filter_map(|s| s.parse::<usize>().ok());
    let mut split_b = b.split(|c: char| !c.is_numeric()).filter_map(|s| s.parse::<usize>().ok());

    loop {
        match (split_a.next(), split_b.next()) {
            (Some(x), Some(y)) => match x.cmp(&y) {
                Ordering::Equal => continue,
                ord => return ord,
            },
            (Some(_), None) => return Ordering::Greater,
            (None, Some(_)) => return Ordering::Less,
            (None, None) => return a.cmp(b), // Fallback to lexicographical if needed
        }
    }
}

fn read_u32_npy(npy_file_path: &Path) -> Result<Vec<u32>, Box<dyn std::error::Error>> {
    // Open the file
    let mut file = File::open(npy_file_path)?;

    // Read the entire file into a buffer
    let mut buffer = vec![];
    file.read_to_end(&mut buffer)?;

    // Skip the header
    let header_end = buffer
        .iter()
        .position(|&b| b == b'\n') // Find the end of the header
        .ok_or("Invalid NPY file: missing header newline")?
        + 1;

    let mut cursor = &buffer[header_end..]; // Skip to the data section
    let mut values = vec![];

    // Read remaining bytes as `u32` in little-endian order
    while let Ok(value) = cursor.read_u32::<LittleEndian>() {
        values.push(value);
    }

    Ok(values)
}



fn npy_to_wig(npy_header: &Path, wig_header: &Path) -> Result<(), Box<dyn std::error::Error>> {
    // Read the JSON file
    let input_file_path = npy_header.join("npy_meta.json");
    let json_data = fs::read_to_string(&input_file_path)?;

    // Deserialize JSON into a HashMap (unordered)
    let dictionary: HashMap<String, HashMap<String, i32>> = serde_json::from_str(&json_data)?;

    // Sort outer keys using version sorting
    let mut sorted_outer_keys: Vec<String> = dictionary.keys().cloned().collect();
    sorted_outer_keys.sort_by(version_sort);

    // Define the list of inner keys to include
    let inner_keys_filter = vec!["start", "core", "end"];
    let step_key = "stepsize";

    // Iterate through the list of inner keys
    for target_inner_key in &inner_keys_filter {
        println!("Preparing {} wiggle file", target_inner_key);
        // Construct the output file name
        let output_file_path = wig_header.join(format!("{}_{}.wig", wig_header.display(), target_inner_key));
        let mut output_file = File::create(&output_file_path)?;

        // Check this inner key across all sorted outer dictionaries
        for outer_key in &sorted_outer_keys {
            let inner_dict = dictionary.get(outer_key).unwrap();
            let mut value = *inner_dict.get(*target_inner_key).unwrap();
            if *target_inner_key == "start" || *target_inner_key == "core" {
                value += 1;
            }
            let step_value = inner_dict.get(step_key).unwrap();
            writeln!(
                output_file,
                "fixedStep chrom={} start={} step={}",
                outer_key, value, step_value
            )?;

            let npy_file_path = npy_header.join(format!("{}_{}.npy", outer_key, target_inner_key));
            let array = read_u32_npy(&npy_file_path)?;

            // Write the array values row by row
            for value in array.iter() {
                writeln!(output_file, "{}", value)?;
            }
        }
    }

    Ok(())
}

fn main() {
    let args: Vec<String> = env::args().collect();
    if args.len() < 3 {
        eprintln!("Usage: cargo run <npy_file_header> <wiggle_file_header>");
        std::process::exit(1);
    }

    let npy_header = Path::new(&args[1]);
    let wig_header = Path::new(&args[2]);

    if let Err(e) = npy_to_wig(npy_header, wig_header) {
        eprintln!("Error: {}", e);
        std::process::exit(1);
    }
}

Cargo.toml

[package]
name = "rust_npy_to_wig" # The name of your project
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] } # For serialization and deserialization
serde_json = "1.0" # For working with JSON
ndarray = "0.15"        # Add this for ndarray support
ndarray-npy = "0.7" 
byteorder = "1.4"

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions