Skip to content

Commit e1bcd3c

Browse files
committed
Merge branch 'gw-no-status'
2 parents 0ff57b2 + 4e2c811 commit e1bcd3c

File tree

4 files changed

+102
-91
lines changed

4 files changed

+102
-91
lines changed

gateway/rpc/proto/gateway_rpc.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,8 @@ message GatewayState {
131131
service Gateway {
132132
// Register a new proxied CVM.
133133
rpc RegisterCvm(RegisterCvmRequest) returns (RegisterCvmResponse) {}
134-
// Get the status of the gateway.
135-
rpc Status(google.protobuf.Empty) returns (StatusResponse) {}
136134
// List all ACME account URIs and the public key history of the certificates for the Content Addressable HTTPS.
137135
rpc AcmeInfo(google.protobuf.Empty) returns (AcmeInfoResponse) {}
138-
// Find Proxied HostInfo by instance ID
139-
rpc GetInfo(GetInfoRequest) returns (GetInfoResponse) {}
140136
// Summary API for inspect.
141137
rpc GetMeta(google.protobuf.Empty) returns (GetMetaResponse);
142138
// Merge state from other Gateway instances.
@@ -149,6 +145,10 @@ message RenewCertResponse {
149145
}
150146

151147
service Admin {
148+
// Get the status of the gateway.
149+
rpc Status(google.protobuf.Empty) returns (StatusResponse) {}
150+
// Find Proxied HostInfo by instance ID
151+
rpc GetInfo(GetInfoRequest) returns (GetInfoResponse) {}
152152
// Exit the Gateway process.
153153
rpc Exit(google.protobuf.Empty) returns (google.protobuf.Empty) {}
154154
// Renew the proxy TLS certificate if certbot is enabled

gateway/src/admin_service.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,58 @@
1+
use std::sync::atomic::Ordering;
2+
13
use anyhow::{Context, Result};
24
use dstack_gateway_rpc::{
35
admin_server::{AdminRpc, AdminServer},
4-
RenewCertResponse,
6+
GetInfoRequest, GetInfoResponse, HostInfo, RenewCertResponse, StatusResponse,
57
};
68
use ra_rpc::{CallContext, RpcCall};
79

8-
use crate::main_service::Proxy;
10+
use crate::{
11+
main_service::{encode_ts, Proxy},
12+
proxy::NUM_CONNECTIONS,
13+
};
914

1015
pub struct AdminRpcHandler {
1116
state: Proxy,
1217
}
1318

19+
impl AdminRpcHandler {
20+
pub(crate) async fn status(self) -> Result<StatusResponse> {
21+
let mut state = self.state.lock();
22+
state.refresh_state()?;
23+
let base_domain = &state.config.proxy.base_domain;
24+
let hosts = state
25+
.state
26+
.instances
27+
.values()
28+
.map(|instance| HostInfo {
29+
instance_id: instance.id.clone(),
30+
ip: instance.ip.to_string(),
31+
app_id: instance.app_id.clone(),
32+
base_domain: base_domain.clone(),
33+
port: state.config.proxy.listen_port as u32,
34+
latest_handshake: encode_ts(instance.last_seen),
35+
num_connections: instance.num_connections(),
36+
})
37+
.collect::<Vec<_>>();
38+
let nodes = state
39+
.state
40+
.nodes
41+
.values()
42+
.cloned()
43+
.map(Into::into)
44+
.collect::<Vec<_>>();
45+
Ok(StatusResponse {
46+
url: state.config.sync.my_url.clone(),
47+
id: state.config.id(),
48+
bootnode_url: state.config.sync.bootnode.clone(),
49+
nodes,
50+
hosts,
51+
num_connections: NUM_CONNECTIONS.load(Ordering::Relaxed),
52+
})
53+
}
54+
}
55+
1456
impl AdminRpc for AdminRpcHandler {
1557
async fn exit(self) -> Result<()> {
1658
self.state.lock().exit();
@@ -27,6 +69,43 @@ impl AdminRpc for AdminRpcHandler {
2769
bot.set_caa().await?;
2870
Ok(())
2971
}
72+
73+
async fn status(self) -> Result<StatusResponse> {
74+
self.status().await
75+
}
76+
77+
async fn get_info(self, request: GetInfoRequest) -> Result<GetInfoResponse> {
78+
let state = self.state.lock();
79+
let base_domain = &state.config.proxy.base_domain;
80+
let handshakes = state.latest_handshakes(None)?;
81+
82+
if let Some(instance) = state.state.instances.get(&request.id) {
83+
let host_info = HostInfo {
84+
instance_id: instance.id.clone(),
85+
ip: instance.ip.to_string(),
86+
app_id: instance.app_id.clone(),
87+
base_domain: base_domain.clone(),
88+
port: state.config.proxy.listen_port as u32,
89+
latest_handshake: {
90+
let (ts, _) = handshakes
91+
.get(&instance.public_key)
92+
.copied()
93+
.unwrap_or_default();
94+
ts
95+
},
96+
num_connections: instance.num_connections(),
97+
};
98+
Ok(GetInfoResponse {
99+
found: true,
100+
info: Some(host_info),
101+
})
102+
} else {
103+
Ok(GetInfoResponse {
104+
found: false,
105+
info: None,
106+
})
107+
}
108+
}
30109
}
31110

32111
impl RpcCall<Proxy> for AdminRpcHandler {

gateway/src/main_service.rs

Lines changed: 15 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::{BTreeMap, BTreeSet},
33
net::Ipv4Addr,
4-
sync::{atomic::Ordering, Arc, Mutex, MutexGuard, Weak},
4+
sync::{Arc, Mutex, MutexGuard, Weak},
55
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
66
};
77

@@ -11,9 +11,8 @@ use certbot::{CertBot, WorkDir};
1111
use cmd_lib::run_cmd as cmd;
1212
use dstack_gateway_rpc::{
1313
gateway_server::{GatewayRpc, GatewayServer},
14-
AcmeInfoResponse, GatewayState, GetInfoRequest, GetInfoResponse, GetMetaResponse,
15-
GuestAgentConfig, HostInfo as PbHostInfo, RegisterCvmRequest, RegisterCvmResponse,
16-
StatusResponse, WireGuardConfig, WireGuardPeer,
14+
AcmeInfoResponse, GatewayState, GetMetaResponse, GuestAgentConfig, RegisterCvmRequest,
15+
RegisterCvmResponse, WireGuardConfig, WireGuardPeer,
1716
};
1817
use fs_err as fs;
1918
use ra_rpc::{CallContext, RpcCall, VerifiedAttestation};
@@ -29,7 +28,7 @@ use tracing::{debug, error, info, warn};
2928
use crate::{
3029
config::Config,
3130
models::{InstanceInfo, WgConf},
32-
proxy::{AddressGroup, AddressInfo, NUM_CONNECTIONS},
31+
proxy::{AddressGroup, AddressInfo},
3332
};
3433

3534
mod sync_client;
@@ -55,18 +54,18 @@ pub(crate) struct GatewayNodeInfo {
5554
}
5655

5756
#[derive(Debug, Serialize, Deserialize, Default)]
58-
struct ProxyStateMut {
59-
nodes: BTreeMap<String, GatewayNodeInfo>,
60-
apps: BTreeMap<String, BTreeSet<String>>,
61-
instances: BTreeMap<String, InstanceInfo>,
62-
allocated_addresses: BTreeSet<Ipv4Addr>,
57+
pub(crate) struct ProxyStateMut {
58+
pub(crate) nodes: BTreeMap<String, GatewayNodeInfo>,
59+
pub(crate) apps: BTreeMap<String, BTreeSet<String>>,
60+
pub(crate) instances: BTreeMap<String, InstanceInfo>,
61+
pub(crate) allocated_addresses: BTreeSet<Ipv4Addr>,
6362
#[serde(skip)]
64-
top_n: BTreeMap<String, (AddressGroup, Instant)>,
63+
pub(crate) top_n: BTreeMap<String, (AddressGroup, Instant)>,
6564
}
6665

6766
pub(crate) struct ProxyState {
68-
config: Arc<Config>,
69-
state: ProxyStateMut,
67+
pub(crate) config: Arc<Config>,
68+
pub(crate) state: ProxyStateMut,
7069
}
7170

7271
impl Proxy {
@@ -373,7 +372,7 @@ impl ProxyState {
373372
/// Get latest handshakes
374373
///
375374
/// Return a map of public key to (timestamp, elapsed)
376-
fn latest_handshakes(
375+
pub(crate) fn latest_handshakes(
377376
&self,
378377
stale_timeout: Option<Duration>,
379378
) -> Result<BTreeMap<String, (u64, Duration)>> {
@@ -559,7 +558,7 @@ impl ProxyState {
559558
)
560559
}
561560

562-
fn refresh_state(&mut self) -> Result<()> {
561+
pub(crate) fn refresh_state(&mut self) -> Result<()> {
563562
let handshakes = self.latest_handshakes(None)?;
564563
for instance in self.state.instances.values_mut() {
565564
let Some((ts, _)) = handshakes.get(&instance.public_key).copied() else {
@@ -580,7 +579,7 @@ fn decode_ts(ts: u64) -> SystemTime {
580579
.unwrap_or(UNIX_EPOCH)
581580
}
582581

583-
fn encode_ts(ts: SystemTime) -> u64 {
582+
pub(crate) fn encode_ts(ts: SystemTime) -> u64 {
584583
ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs()
585584
}
586585

@@ -654,74 +653,6 @@ impl GatewayRpc for RpcHandler {
654653
Ok(response)
655654
}
656655

657-
async fn status(self) -> Result<StatusResponse> {
658-
let mut state = self.state.lock();
659-
state.refresh_state()?;
660-
let base_domain = &state.config.proxy.base_domain;
661-
let hosts = state
662-
.state
663-
.instances
664-
.values()
665-
.map(|instance| PbHostInfo {
666-
instance_id: instance.id.clone(),
667-
ip: instance.ip.to_string(),
668-
app_id: instance.app_id.clone(),
669-
base_domain: base_domain.clone(),
670-
port: state.config.proxy.listen_port as u32,
671-
latest_handshake: encode_ts(instance.last_seen),
672-
num_connections: instance.num_connections(),
673-
})
674-
.collect::<Vec<_>>();
675-
let nodes = state
676-
.state
677-
.nodes
678-
.values()
679-
.cloned()
680-
.map(Into::into)
681-
.collect::<Vec<_>>();
682-
Ok(StatusResponse {
683-
url: state.config.sync.my_url.clone(),
684-
id: state.config.id(),
685-
bootnode_url: state.config.sync.bootnode.clone(),
686-
nodes,
687-
hosts,
688-
num_connections: NUM_CONNECTIONS.load(Ordering::Relaxed),
689-
})
690-
}
691-
692-
async fn get_info(self, request: GetInfoRequest) -> Result<GetInfoResponse> {
693-
let state = self.state.lock();
694-
let base_domain = &state.config.proxy.base_domain;
695-
let handshakes = state.latest_handshakes(None)?;
696-
697-
if let Some(instance) = state.state.instances.get(&request.id) {
698-
let host_info = PbHostInfo {
699-
instance_id: instance.id.clone(),
700-
ip: instance.ip.to_string(),
701-
app_id: instance.app_id.clone(),
702-
base_domain: base_domain.clone(),
703-
port: state.config.proxy.listen_port as u32,
704-
latest_handshake: {
705-
let (ts, _) = handshakes
706-
.get(&instance.public_key)
707-
.copied()
708-
.unwrap_or_default();
709-
ts
710-
},
711-
num_connections: instance.num_connections(),
712-
};
713-
Ok(GetInfoResponse {
714-
found: true,
715-
info: Some(host_info),
716-
})
717-
} else {
718-
Ok(GetInfoResponse {
719-
found: false,
720-
info: None,
721-
})
722-
}
723-
}
724-
725656
async fn acme_info(self) -> Result<AcmeInfoResponse> {
726657
let state = self.state.lock();
727658
let workdir = WorkDir::new(&state.config.certbot.workdir);

gateway/src/web_routes/route_index.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
admin_service::AdminRpcHandler,
23
main_service::{Proxy, RpcHandler},
34
models::Dashboard,
45
};
@@ -11,7 +12,7 @@ use rocket::{response::content::RawHtml as Html, State};
1112
pub async fn index(state: &State<Proxy>) -> anyhow::Result<Html<String>> {
1213
let context = CallContext::builder().state(&**state).build();
1314
let rpc_handler =
14-
RpcHandler::construct(context.clone()).context("Failed to construct RpcHandler")?;
15+
AdminRpcHandler::construct(context.clone()).context("Failed to construct RpcHandler")?;
1516
let status = rpc_handler.status().await.context("Failed to get status")?;
1617
let rpc_handler = RpcHandler::construct(context).context("Failed to construct RpcHandler")?;
1718
let acme_info = rpc_handler

0 commit comments

Comments
 (0)