Skip to content

Commit b3420a9

Browse files
committed
Continued designing language server <-> daemon communication and interfaces
1 parent c16470f commit b3420a9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+606
-211
lines changed

Cargo.lock

Lines changed: 33 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ resolver = "3"
33
members = [
44
"src/adept",
55
"src/daemon",
6-
"src/daemon_scheduler",
6+
"src/daemon_init",
77
"src/language_server",
88
"src/lock_file",
99
"src/idle_tracker",
1010
"src/lsp_connection",
11+
"src/lsp_message",
1112
]
1213

1314
[workspace.dependencies]

src/adept/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7-
daemon = { version = "0.1.0", path = "../daemon" }
7+
daemon_init = { version = "0.1.0", path = "../daemon_init" }
88
language_server = { version = "0.1.0", path = "../language_server" }
99
smol.workspace = true
1010

src/adept/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ fn main() -> ExitCode {
55

66
match args.peek().map(String::as_str) {
77
Some("-h" | "--help") | None => show_help(),
8-
Some("--daemon") => daemon::start(),
8+
Some("--daemon") => daemon_init::start(),
99
Some("--language-server") => language_server::start(),
1010
_ => show_help(),
1111
}

src/daemon/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2024"
55

66
[dependencies]
77
lock_file = { version = "0.1.0", path = "../lock_file" }
8-
daemon_scheduler = { version = "0.1.0", path = "../daemon_scheduler" }
9-
smol.workspace = true
8+
lsp_message = { version = "0.1.0", path = "../lsp_message" }
109
thiserror.workspace = true
10+
derive_more.workspace = true
11+
fern.workspace = true
12+
humantime.workspace = true
13+
log.workspace = true

src/daemon/src/connection.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use lsp_message::LspMessage;
2+
use std::io::BufReader;
3+
#[cfg(target_family = "unix")]
4+
use std::os::unix::net::UnixStream;
5+
#[cfg(target_family = "unix")]
6+
use std::{io, sync::Mutex};
7+
8+
pub struct Connection {
9+
#[cfg(target_family = "unix")]
10+
pub stream: Mutex<UnixStream>,
11+
}
12+
13+
impl Connection {
14+
pub fn wait_for_message(&self) -> io::Result<Option<LspMessage>> {
15+
let stream = self.stream.lock().unwrap();
16+
LspMessage::read(&mut BufReader::new(&*stream))
17+
}
18+
19+
pub fn send(&self, message: LspMessage) -> io::Result<()> {
20+
let stream = self.stream.lock().unwrap();
21+
message.write(&mut &*stream)
22+
}
23+
}

src/daemon/src/daemon.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use crate::Queue;
2+
use lsp_message::LspMessage;
3+
#[cfg(target_family = "unix")]
4+
use std::os::unix::net::UnixListener;
5+
use std::{
6+
io::{self, BufReader},
7+
os::unix::net::UnixStream,
8+
sync::atomic::{AtomicBool, Ordering},
9+
};
10+
11+
pub struct Daemon {
12+
#[cfg(target_family = "unix")]
13+
pub listener: UnixListener,
14+
15+
pub should_exit: AtomicBool,
16+
pub queue: Queue,
17+
}
18+
19+
impl Daemon {
20+
#[cfg(target_family = "unix")]
21+
pub fn new(listener: UnixListener) -> Self {
22+
Self {
23+
listener,
24+
should_exit: false.into(),
25+
queue: Queue::default(),
26+
}
27+
}
28+
29+
pub fn wait_for_message(&self, client_stream: &UnixStream) -> io::Result<Option<LspMessage>> {
30+
LspMessage::read(&mut BufReader::new(client_stream))
31+
}
32+
33+
pub fn send(message: LspMessage, mut client_stream: &UnixStream) -> io::Result<()> {
34+
message.write(&mut client_stream)
35+
}
36+
37+
pub fn intend_exit(&self) {
38+
self.should_exit.store(true, Ordering::SeqCst);
39+
}
40+
41+
pub fn should_exit(&self) -> bool {
42+
self.should_exit.load(Ordering::SeqCst)
43+
}
44+
}

src/daemon/src/lib.rs

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,84 @@
1-
mod error;
1+
mod connection;
2+
mod daemon;
3+
mod logger;
4+
mod queue;
25

3-
pub use error::*;
4-
use smol::{Timer, net::TcpStream};
6+
pub use crate::{connection::Connection, daemon::Daemon};
7+
use lsp_message::LspMessage;
8+
pub use queue::*;
59
use std::{
6-
fs::remove_file,
7-
io,
8-
process::{Command, ExitCode},
10+
io::{self, BufReader, ErrorKind},
11+
os::unix::net::{SocketAddr, UnixStream},
12+
sync::Arc,
913
time::Duration,
1014
};
1115

12-
pub fn start() -> ExitCode {
13-
match try_become() {
14-
Ok(()) => ExitCode::SUCCESS,
15-
Err(error) => {
16-
eprintln!("{}", error);
17-
ExitCode::FAILURE
18-
}
16+
pub fn main_loop(daemon: Daemon) -> io::Result<()> {
17+
let daemon = Arc::new(daemon);
18+
19+
if logger::setup().is_err() {
20+
eprintln!("Failed to create daemon log file");
1921
}
20-
}
2122

22-
/// Become the daemon process
23-
pub fn try_become() -> io::Result<()> {
24-
let cwd = std::env::current_dir().expect("Failed to get current directory");
25-
let filepath = cwd.join("adeptd.lock");
23+
daemon
24+
.listener
25+
.set_nonblocking(true)
26+
.expect("Failed to set to non-blocking");
2627

27-
let Some(lock) = lock_file::acquire(&filepath)? else {
28-
eprintln!("Daemon already running.");
29-
return Ok(());
30-
};
28+
// Executor thread
29+
std::thread::spawn(|| {});
3130

32-
eprintln!("Starting daemon...");
33-
daemon_scheduler::main()?;
31+
// Accept clients
32+
#[cfg(target_family = "unix")]
33+
loop {
34+
match daemon.listener.accept() {
35+
Ok((stream, address)) => {
36+
let daemon = Arc::clone(&daemon);
37+
std::thread::spawn(move || handle_client(daemon, stream, address));
38+
}
39+
Err(error) => {
40+
if let io::ErrorKind::WouldBlock = error.kind() {
41+
// No clients ready to connect to us yet
42+
} else {
43+
log::error!("Failed to accept client: {:?}", error);
44+
}
45+
}
46+
}
3447

35-
eprintln!("Daemon shutting down...");
36-
drop(lock);
48+
if daemon.should_exit() {
49+
return Ok(());
50+
}
3751

38-
remove_file(&filepath)
52+
std::thread::sleep(Duration::from_millis(50));
53+
}
3954
}
4055

41-
/// Tries to connect to the daemon process. If the daemon process
42-
/// is not running yet, this function attempts to launch it.
43-
pub async fn connect() -> Result<TcpStream, StartError> {
44-
if let Ok(connection) = TcpStream::connect("127.0.0.1:6000").await {
45-
eprintln!("Connected to existing daemon.");
46-
return Ok(connection);
47-
}
56+
fn handle_client(_daemon: Arc<Daemon>, stream: UnixStream, address: SocketAddr) {
57+
log::info!("Accepted client {:?} {:?}", stream, address);
58+
std::thread::sleep(Duration::from_millis(50));
4859

49-
spawn()?;
60+
stream.set_nonblocking(false).unwrap();
61+
stream
62+
.set_read_timeout(Some(Duration::from_millis(50)))
63+
.unwrap();
64+
let reader = &mut BufReader::new(&stream);
5065

51-
for _ in 0..10 {
52-
if let Ok(connection) = TcpStream::connect("127.0.0.1:6000").await {
53-
return Ok(connection);
66+
loop {
67+
match LspMessage::read(reader) {
68+
Ok(None) => {
69+
log::info!("Shutting down connection to client");
70+
break;
71+
}
72+
Ok(message) => {
73+
log::info!("Got message {:?}", message);
74+
}
75+
Err(error) => {
76+
if let ErrorKind::WouldBlock = error.kind() {
77+
// Nothing to do
78+
} else {
79+
log::error!("Error receiving message from client - {:?}", error);
80+
}
81+
}
5482
}
55-
Timer::after(Duration::from_millis(20)).await;
5683
}
57-
58-
Err(StartError::FailedToStart)
59-
}
60-
61-
pub fn spawn() -> std::io::Result<()> {
62-
let exe = std::env::current_exe()?;
63-
64-
// WARNING: SECURITY: This could lead to privilege escalation
65-
// to the level the compiler is running at if an attacker
66-
// overwrites the current executable.
67-
// TL;DR - Don't let the compiler executable be changed
68-
// by less privileged users.
69-
Command::new(exe).arg("--daemon").spawn()?;
70-
Ok(())
7184
}

src/daemon/src/logger.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use std::{fs::File, time::SystemTime};
2+
3+
pub(crate) fn setup() -> Result<(), fern::InitError> {
4+
fern::Dispatch::new()
5+
.format(|out, message, record| {
6+
out.finish(format_args!(
7+
"[{} {} {}] {}",
8+
humantime::format_rfc3339_seconds(SystemTime::now()),
9+
record.level(),
10+
record.target(),
11+
message
12+
))
13+
})
14+
.level(log::LevelFilter::Debug)
15+
.chain(std::io::stderr())
16+
.chain(File::create("adeptd.log")?)
17+
.apply()?;
18+
Ok(())
19+
}
20+

src/daemon/src/queue.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::{collections::VecDeque, sync::Mutex};
2+
3+
// TODO
4+
#[derive(Default)]
5+
pub struct PendingQuery;
6+
7+
#[derive(Default)]
8+
pub struct Queue {
9+
// We can add priorities for this later,
10+
// but even just running everything at the same time/duration step value
11+
// is probably fully sufficient.
12+
_queue: Mutex<VecDeque<PendingQuery>>,
13+
}
14+
15+
impl Queue {
16+
pub fn next(&self) -> Option<PendingQuery> {
17+
todo!()
18+
}
19+
}

0 commit comments

Comments
 (0)