Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions src/market/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use crate::{
},
};

use crate::error::NetError;
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fs::{File, OpenOptions},
io::{BufRead, BufReader, Write},
io::{BufRead, BufReader, BufWriter, Read, Write},
net::{Ipv4Addr, TcpListener, TcpStream},
path::{Path, PathBuf},
sync::{
Expand All @@ -25,8 +27,6 @@ use std::{
time::Duration,
};

use crate::error::NetError;

/// Represents errors that can occur during directory server operations.
#[derive(Debug)]
pub enum DirectoryServerError {
Expand Down Expand Up @@ -314,31 +314,53 @@ pub fn start_directory_server(directory: Arc<DirectoryServer>) -> Result<(), Dir
Ok(())
}

// The stream should have read and write timeout set.
// TODO: Use serde encoded data instead of string.
// Structured requests and responses using serde.
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
enum Request {
Post { address: String },
Get,
}

#[derive(Serialize, Deserialize, Debug)]
struct Response {
addresses: Vec<String>,
}

fn handle_client(
stream: &mut TcpStream,
directory: &Arc<DirectoryServer>,
) -> Result<(), DirectoryServerError> {
stream.set_read_timeout(Some(Duration::from_secs(5)))?;
stream.set_write_timeout(Some(Duration::from_secs(5)))?;

let reader_stream = stream.try_clone()?;
let writer_stream = stream.try_clone()?;
let mut reader = BufReader::new(reader_stream);
let mut request_line = String::new();

reader.read_line(&mut request_line)?;
if request_line.starts_with("POST") {
let addr: String = request_line.replace("POST ", "").trim().to_string();
directory.addresses.write()?.insert(addr.clone());
log::info!("Got new maker address: {}", addr);
} else if request_line.starts_with("GET") {
log::info!("Taker pinged the directory server");
let response = directory
.addresses
.read()?
.iter()
.fold(String::new(), |acc, addr| acc + addr + "\n");
stream.write_all(response.as_bytes())?;
stream.flush()?;
let mut writer = BufWriter::new(writer_stream);

let mut buffer = String::new();
reader.read_to_string(&mut buffer)?;

let request: Request = serde_json::from_str(&buffer).unwrap();

match request {
Request::Post { address } => {
directory.addresses.write()?.insert(address.clone());
log::info!("Got new maker address: {}", address);
}
Request::Get => {
log::info!("Taker pinged the directory server");

let addresses = directory.addresses.read()?.iter().cloned().collect();
let response = Response { addresses };

let response_json = serde_json::to_string(&response).unwrap();
writer.write_all(response_json.as_bytes())?;
writer.flush()?;
}
}

Ok(())
}
#[cfg(test)]
Expand Down