Skip to content

Commit f5044bf

Browse files
committed
Rework topics + syncOp trait
1 parent 33ab69f commit f5044bf

19 files changed

Lines changed: 373 additions & 264 deletions

File tree

dev/dev.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,10 @@ ALTER COLUMN days TYPE SMALLINT USING days::SMALLINT;
224224

225225
ALTER TABLE subscriptions
226226
ALTER COLUMN referred_by TYPE VARCHAR(13);
227+
228+
===
229+
230+
ALTER TABLE connections
231+
ALTER COLUMN created_at TYPE timestamptz USING created_at AT TIME ZONE 'UTC',
232+
ALTER COLUMN modified_at TYPE timestamptz USING modified_at AT TIME ZONE 'UTC',
233+
ALTER COLUMN expired_at TYPE timestamptz USING expired_at AT TIME ZONE 'UTC';

dev/h2.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
listen: :443
2+
3+
acme:
4+
domains:
5+
- api.example.org
6+
email: admin@example.com
7+
8+
auth:
9+
type: http
10+
http:
11+
url: https://api.example.org/auth
12+
13+
masquerade:
14+
type: proxy
15+
proxy:
16+
url: https://www.microsoft.com/
17+
rewriteHost: true

src/bin/agent/core/http.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use super::Agent;
1919
pub struct ConnTypeParam {
2020
pub proto: Tag,
2121
pub last_update: Option<u64>,
22-
pub env: String,
22+
pub env: Option<String>,
23+
pub topic: uuid::Uuid,
2324
}
2425

2526
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -69,12 +70,14 @@ where
6970
.clone()
7071
};
7172

73+
let id = node.uuid;
7274
let env = node.env;
7375

7476
let conn_type_param = ConnTypeParam {
7577
proto,
7678
last_update,
77-
env,
79+
env: Some(env),
80+
topic: id,
7881
};
7982

8083
let mut endpoint_url = Url::parse(&endpoint)?;
@@ -95,13 +98,18 @@ where
9598
let status = res.status();
9699
let body = res.text().await?;
97100
if status.is_success() {
98-
log::debug!("Connections Request Accepted: {:?}", status);
101+
log::debug!("Connections Request Accepted for {}: {} ", proto, status);
99102
Ok(())
100103
} else {
101-
log::error!("Connections Request failed: {} - {}", status, body);
104+
log::error!(
105+
"Connections Request failed for {}: {} - {}",
106+
proto,
107+
status,
108+
body
109+
);
102110
Err(PonyError::Custom(format!(
103-
"Connections Request failed: {} - {}",
104-
status, body
111+
"Connections Request failed for {}: {} - {}",
112+
proto, status, body
105113
)))
106114
}
107115
}

src/bin/agent/core/service.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,20 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
100100
Ok(cfg) => {
101101
if let Err(e) = cfg.validate() {
102102
log::error!("Hysteria2 config validation failed: {}", e);
103-
None
103+
panic!("Hysteria2 config: {}", e);
104104
} else {
105105
match H2Settings::try_from(cfg) {
106106
Ok(settings) => Some(settings),
107107
Err(e) => {
108108
log::error!("Hysteria2 validation error: {}", e);
109-
None
109+
panic!("Hysteria2 config: {}", e);
110110
}
111111
}
112112
}
113113
}
114114
Err(e) => {
115115
log::error!("Failed to load Hysteria2 config: {}", e);
116-
None
116+
panic!("Hysteria2 config: {}", e);
117117
}
118118
}
119119
} else {
@@ -228,7 +228,8 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
228228
let tags: Vec<_> = node
229229
.inbounds
230230
.keys()
231-
.filter(|k| !matches!(k, Tag::Hysteria2))
231+
.filter(|k| !matches!(k, Tag::Hysteria2)) // Hysteria2 uses external auth provider
232+
.filter(|k| !matches!(k, Tag::Mtproto)) // Mtproto doesn't support auth provider
232233
.collect();
233234

234235
for tag in tags {
@@ -322,7 +323,7 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
322323
tasks.push(xray_stats_task);
323324
}
324325

325-
if settings.agent.stat_enabled {
326+
if settings.agent.stat_enabled && settings.wg.enabled {
326327
log::info!("Running WG Stat Task");
327328
let wg_stats_task = tokio::spawn({
328329
let agent = Arc::new(agent.clone());
@@ -343,7 +344,7 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
343344
tasks.push(wg_stats_task);
344345
}
345346

346-
if settings.xray.enabled {
347+
if settings.xray.enabled && settings.agent.local {
347348
if let Some(xray_handler_client) = xray_handler_client {
348349
let conn_task = tokio::spawn({
349350
let conn_id = uuid::Uuid::new_v4();
@@ -406,7 +407,7 @@ pub async fn run(settings: AgentSettings) -> Result<()> {
406407
}
407408
}
408409

409-
if settings.wg.enabled {
410+
if settings.wg.enabled && settings.agent.local {
410411
let conn_task = tokio::spawn({
411412
let agent = agent.clone();
412413
let settings = settings.clone();

src/bin/api/core/http/filters.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use warp::Filter;
22

3-
use pony::zmq::publisher::Publisher as ZmqPublisher;
43
use pony::Connection;
54
use pony::ConnectionApiOp;
65
use pony::ConnectionBaseOp;
@@ -29,13 +28,6 @@ pub fn with_ch(
2928
warp::any().map(move || ch.clone())
3029
}
3130

32-
/// Provides zmq publisher filter
33-
pub fn publisher(
34-
publisher: ZmqPublisher,
35-
) -> impl Filter<Extract = (ZmqPublisher,), Error = std::convert::Infallible> + Clone {
36-
warp::any().map(move || publisher.clone())
37-
}
38-
3931
pub fn with_param_string(
4032
param: String,
4133
) -> impl Filter<Extract = (String,), Error = std::convert::Infallible> + Clone {

src/bin/api/core/http/handlers/connection.rs

Lines changed: 51 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use pony::http::MyRejection;
1111
use pony::http::ResponseMessage;
1212
use pony::memory::tag::ProtoTag;
1313
use pony::utils;
14-
use pony::zmq::publisher::Publisher as ZmqPublisher;
1514
use pony::Connection;
1615
use pony::ConnectionApiOp;
1716
use pony::ConnectionBaseOp;
@@ -34,8 +33,7 @@ use super::super::request::ConnCreateRequest;
3433
/// Handler get connection
3534
// GET /connections
3635
pub async fn get_connections_handler<N, C, S>(
37-
conn_req: ConnTypeParam,
38-
publisher: ZmqPublisher,
36+
req: ConnTypeParam,
3937
memory: MemSync<N, C, S>,
4038
) -> Result<impl warp::Reply, warp::Rejection>
4139
where
@@ -58,55 +56,58 @@ where
5856
+ std::convert::From<pony::Subscription>,
5957
{
6058
let mem = memory.memory.read().await;
61-
let proto = conn_req.proto;
62-
let env = conn_req.env;
63-
64-
let last_update = conn_req.last_update;
59+
let proto = req.proto;
60+
let topic = req.topic;
61+
let last_update = req.last_update;
62+
let env = req.env;
6563

6664
let connections_to_send: Vec<_> = mem
6765
.connections
6866
.iter()
6967
.filter(|(_, conn)| {
70-
if conn.get_deleted() {
71-
return false;
72-
}
73-
74-
if conn.get_proto().proto() != proto {
75-
return false;
76-
}
77-
78-
if let Some(ts) = last_update {
79-
conn.get_modified_at().and_utc().timestamp() as u64 >= ts
80-
} else {
81-
true
82-
}
68+
!conn.get_deleted()
69+
&& conn.get_proto().proto() == proto
70+
&& env.as_ref().is_none_or(|e| conn.get_env() == *e)
71+
&& last_update.is_none_or(|ts| conn.get_modified_at().timestamp() as u64 >= ts)
8372
})
8473
.collect();
8574

86-
log::debug!(
87-
"Sending {} {:?} connections to auth",
88-
connections_to_send.len(),
89-
proto
90-
);
91-
92-
let messages: Vec<_> = connections_to_send
93-
.iter()
94-
.map(|(conn_id, conn)| conn.as_create_message(conn_id))
95-
.collect();
96-
97-
let bytes = to_bytes::<_, 1024>(&messages).map_err(|e| {
98-
log::error!("Serialization error: {}", e);
99-
warp::reject::custom(MyRejection(Box::new(e)))
100-
})?;
101-
102-
publisher
103-
.send_binary(&env, bytes.as_ref())
104-
.await
105-
.map_err(|e| {
106-
log::error!("Publish error: {}", e);
75+
if !connections_to_send.is_empty() {
76+
log::debug!(
77+
"Sending {} {:?} connections for env {:?} to topic {} ",
78+
connections_to_send.len(),
79+
proto,
80+
env,
81+
topic
82+
);
83+
84+
let messages: Vec<_> = connections_to_send
85+
.iter()
86+
.map(|(conn_id, conn)| conn.as_create_message(conn_id))
87+
.collect();
88+
89+
let bytes = to_bytes::<_, 1024>(&messages).map_err(|e| {
90+
log::error!("Serialization error: {}", e);
10791
warp::reject::custom(MyRejection(Box::new(e)))
10892
})?;
10993

94+
memory
95+
.publisher
96+
.send_binary(&topic, bytes.as_ref())
97+
.await
98+
.map_err(|e| {
99+
log::error!("Publish error: {}", e);
100+
warp::reject::custom(MyRejection(Box::new(e)))
101+
})?;
102+
} else {
103+
log::debug!(
104+
"No message {} to send for env {:?} to topic {}",
105+
proto,
106+
env,
107+
topic
108+
);
109+
}
110+
110111
let resp = ResponseMessage::<Option<IdResponse>> {
111112
status: StatusCode::OK.as_u16(),
112113
message: "Ok".to_string(),
@@ -122,7 +123,6 @@ where
122123
// POST /connection
123124
pub async fn create_connection_handler<N, C, S>(
124125
conn_req: ConnCreateRequest,
125-
publisher: ZmqPublisher,
126126
memory: MemSync<N, C, S>,
127127
) -> Result<impl warp::Reply, warp::Rejection>
128128
where
@@ -342,7 +342,7 @@ where
342342
conn.get_env()
343343
};
344344

345-
let _ = publisher.send_binary(&topic, bytes.as_ref()).await;
345+
let _ = memory.publisher.send_binary(&topic, bytes.as_ref()).await;
346346

347347
Ok(http::success_response(
348348
format!("Connection {} has been created", id),
@@ -376,7 +376,7 @@ where
376376
// DELETE /connection?id=
377377
pub async fn delete_connection_handler<N, C, S>(
378378
conn_param: ConnQueryParam,
379-
publisher: ZmqPublisher,
379+
380380
memory: MemSync<N, C, S>,
381381
) -> Result<impl warp::Reply, warp::Rejection>
382382
where
@@ -413,36 +413,17 @@ where
413413

414414
if conn.get_deleted() {
415415
return Ok(http::not_found(&format!(
416-
"Connection {} is deleted",
416+
"Connection {} already is deleted",
417417
conn_id
418418
)));
419419
}
420420

421-
match SyncOp::delete_connection(&memory, &conn_id).await {
422-
Ok(StorageOperationStatus::Ok(id)) => {
423-
let msg = conn.as_delete_message(&conn_id);
424-
425-
let bytes = match rkyv::to_bytes::<_, 1024>(&msg) {
426-
Ok(b) => b,
427-
Err(e) => {
428-
return Ok(http::internal_error(&format!("Serialization error: {}", e)));
429-
}
430-
};
431-
432-
if let Some(node_id) = conn.get_wireguard_node_id() {
433-
let _ = publisher
434-
.send_binary(&node_id.to_string(), bytes.as_ref())
435-
.await;
436-
} else {
437-
let _ = publisher.send_binary(&conn.get_env(), bytes.as_ref()).await;
438-
}
439-
440-
Ok(http::success_response(
441-
format!("Connection {} has been deleted", id),
442-
Some(id),
443-
http::Instance::Connection(conn.clone().into()),
444-
))
445-
}
421+
match SyncOp::delete_connection(&memory, &conn_id, &conn).await {
422+
Ok(StorageOperationStatus::Ok(id)) => Ok(http::success_response(
423+
format!("Connection {} has been deleted", id),
424+
Some(id),
425+
http::Instance::Connection(conn.clone().into()),
426+
)),
446427

447428
Ok(StorageOperationStatus::NotFound(id)) => {
448429
Ok(http::not_found(&format!("Connection {} not found", id)))

0 commit comments

Comments
 (0)