Skip to content

Commit

Permalink
WIP unsuccessful attempt upgrading Axum in CARL's lib.rs.
Browse files Browse the repository at this point in the history
A relevant example in Axum has not yet been updated, so might not yet be possible:
https://github.com/tokio-rs/axum/blob/main/examples/rest-grpc-multiplex/src/main.rs

Used alternative example: tokio-rs/axum#2736 (comment)
But lifetime issues prevent it from compiling.
  • Loading branch information
mbfm committed Nov 25, 2024
1 parent 40c4d86 commit 900fd17
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 460 deletions.
447 changes: 189 additions & 258 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 16 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ anyhow = "1.0.79"
assert_fs = "1.1.1"
async-trait = "0.1.77"
axum = "0.7.5"
axum-server = "0.6.0"
axum-server-dual-protocol = "0.6.0"
axum-server = "0.7.1"
axum-server-dual-protocol = "0.7.0"
backon = { version = "1.2.0", default-features = false }
base64 = "0.22.1"
brotli = "7.0.0"
Expand All @@ -77,11 +77,15 @@ fs-err = "3.0.0"
fs_extra = "1.3.0"
futures = "0.3.30"
glob = "0.3.1"
gloo-net = { version = "0.5.0" }
gloo-net = { version = "0.6.0" }
gloo-timers = { version = "0.3.0" }
googletest = { version = "0.12.0" }
home = "0.5.5"
http = "1.1.0"
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = "1.4.1"
hyper-util = "0.1.6"
indicatif = "0.17.7"
indoc = "2.0.4"
jsonwebtoken = "9.2.0"
Expand All @@ -97,13 +101,14 @@ nix = "0.29.0"
oauth2 = { version = "5.0.0-alpha.4", default-features = false }
openidconnect = { version = "4.0.0-alpha.2", default-features = false }
openssl-sys = { version = "0.9.102", features = ["vendored"] }
opentelemetry = "0.23.0"
opentelemetry-appender-tracing = "0.4.0"
opentelemetry-otlp = "0.16.0"
opentelemetry_sdk = "0.23.0"
opentelemetry-semantic-conventions = "0.15.0"
opentelemetry = "0.24.0"
opentelemetry-appender-tracing = "0.5.0"
opentelemetry-otlp = "0.17.0"
opentelemetry_sdk = "0.24.1"
opentelemetry-semantic-conventions = "0.16.0"
pem = { version = "3.0.3", features = ["serde"] }
phf = { version = "0.11", features = ["macros"] }
pin-project = "1.1.7"
ping-rs = { version = "0.1.2" }
pq-sys = { version = "0.6.1", features = ["bundled"] }
predicates = "3.0.4"
Expand Down Expand Up @@ -142,12 +147,12 @@ toml_edit = "0.22.15"
tonic = { version = "0.12.0", default-features = false }
tonic-build = { version = "0.12.0", default-features = false }
tonic-web = "0.12.0"
tonic-web-wasm-client = { version = "0.5.1" }
tonic-async-interceptor = { version = "0.11.0" }
tonic-web-wasm-client = { version = "0.6.0" }
tonic-async-interceptor = { version = "0.12.0" }
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["cors", "fs"] }
tracing = { version = "0.1.40" }
tracing-opentelemetry = "0.24.0"
tracing-opentelemetry = "0.25.0"
tracing-subscriber = { version = "0.3.18", default-features = false }
tracing-web = { version = "0.1.3" }
url = "2.5.0"
Expand Down
4 changes: 4 additions & 0 deletions opendut-carl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ flate2 = { workspace = true }
futures = { workspace = true }
googletest = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
indoc = { workspace = true }
jsonwebtoken = { workspace = true}
openidconnect = { workspace = true }
openssl-sys = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
pem = { workspace = true, features = ["serde"]}
pin-project = { workspace = true }
pq-sys = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
Expand Down
35 changes: 11 additions & 24 deletions opendut-carl/src/http/router/cleo.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,39 @@
use axum::body::StreamBody;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use axum_server_dual_protocol::tokio_util::io::ReaderStream;
use http::{header, StatusCode};
use crate::{CarlInstallDirectory};
use http::Request;
use tower_http::services::ServeFile;

use crate::CarlInstallDirectory;
use crate::util::{CLEO_IDENTIFIER, CleoArch};

pub async fn download_cleo(
Path(architecture): Path<CleoArch>,
State(carl_install_directory): State<CarlInstallDirectory>,
) -> impl IntoResponse {
let cleo_file_name = format!("{}-{}.tar.gz", &architecture.distribution_name(), crate::app_info::CRATE_VERSION);
let cleo_dir = carl_install_directory.path.join(CLEO_IDENTIFIER).join(&cleo_file_name);
let cleo_file_path = carl_install_directory.path.join(CLEO_IDENTIFIER).join(&cleo_file_name);

let file = match tokio::fs::File::open(cleo_dir).await {
Ok(file) => { file }
Err(_) => { return StatusCode::NOT_FOUND.into_response(); }
};

let stream = ReaderStream::new(file);
let body = StreamBody::new(stream);
let content_disposition = format!("attachment; filename=\"{}\"", cleo_file_name);
let headers = [
(header::CONTENT_TYPE, "application/gzip"),
(
header::CONTENT_DISPOSITION,
content_disposition.as_str(),
),
];
(headers, body).into_response()
let request = Request::new(Body::empty());
ServeFile::new(cleo_file_path).try_call(request).await.unwrap()
}

#[cfg(test)]
mod test {
use std::fs;
use std::fs::File;

use assert_fs::fixture::PathChild;
use assert_fs::TempDir;
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use googletest::assert_that;
use googletest::matchers::eq;
use http::header;
use crate::CarlInstallDirectory;

use crate::util::{CLEO_IDENTIFIER, CleoArch};
use crate::CarlInstallDirectory;
use crate::router::cleo::download_cleo;
use crate::util::{CLEO_IDENTIFIER, CleoArch};

#[tokio::test()]
async fn download_cleo_succeeds() -> anyhow::Result<()> {
Expand All @@ -70,4 +57,4 @@ mod test {

Ok(())
}
}
}
28 changes: 7 additions & 21 deletions opendut-carl/src/http/router/edgar.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use axum::body::StreamBody;
use axum::body::{Body};
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use axum_server_dual_protocol::tokio_util::io::ReaderStream;
use http::{header, StatusCode};
use http::Request;
use tower_http::services::ServeFile;
use crate::http::state::CarlInstallDirectory;
use crate::util::{EDGAR_IDENTIFIER, EdgarArch};

Expand All @@ -12,24 +12,10 @@ pub async fn download_edgar(
) -> impl IntoResponse {

let file_name = format!("{}-{}.tar.gz", &architecture.distribution_name(), crate::app_info::CRATE_VERSION);
let edgar_dir = carl_install_directory.path.join(EDGAR_IDENTIFIER).join(&file_name);
let edgar_path = carl_install_directory.path.join(EDGAR_IDENTIFIER).join(&file_name);

let file = match tokio::fs::File::open(edgar_dir).await {
Ok(file) => { file }
Err(_) => { return StatusCode::NOT_FOUND.into_response(); }
};

let stream = ReaderStream::new(file);
let body = StreamBody::new(stream);
let content_disposition = format!("attachment; filename=\"{}\"", file_name);
let headers = [
(header::CONTENT_TYPE, "application/gzip"),
(
header::CONTENT_DISPOSITION,
content_disposition.as_str(),
),
];
(headers, body).into_response()
let request = Request::new(Body::empty());
ServeFile::new(edgar_path).try_call(request).await.unwrap()
}

#[cfg(test)]
Expand Down Expand Up @@ -70,4 +56,4 @@ mod test {

Ok(())
}
}
}
145 changes: 42 additions & 103 deletions opendut-carl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,15 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use axum::routing::get;
use axum_server::tls_rustls::RustlsConfig;
use axum_server_dual_protocol::ServerExt;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use ::http::{header::CONTENT_TYPE, Request};
use pem::Pem;
use tonic::transport::Server;
use tonic_async_interceptor::async_interceptor;
use tower::{make::Shared, steer::Steer, BoxError, ServiceExt};
use tower::ServiceExt;
use tower_http::services::{ServeDir, ServeFile};
use tracing::{debug, info, warn};
use tracing::{debug, info};
use uuid::Uuid;

use opendut_auth::confidential::pem::PemFromConfig;
use opendut_auth::registration::client::{RegistrationClient, RegistrationClientRef};
use opendut_auth::registration::client::RegistrationClient;
use opendut_auth::registration::resources::ResourceHomeUrl;
use opendut_util::settings::LoadedConfig;
use opendut_util::telemetry::logging::LoggingConfig;
Expand All @@ -30,15 +25,15 @@ use util::in_memory_cache::CustomInMemoryCache;

use crate::auth::grpc_auth_layer::GrpcAuthenticationLayer;
use crate::auth::json_web_key::JwkCacheValue;
use crate::cluster::manager::{ClusterManager, ClusterManagerOptions, ClusterManagerRef};
use crate::cluster::manager::{ClusterManager, ClusterManagerOptions};
use crate::grpc::{ClusterManagerFacade, MetadataProviderFacade, PeerManagerFacade, PeerMessagingBrokerFacade};
use crate::http::router;
use crate::http::state::{CarlInstallDirectory, HttpState, LeaConfig, LeaIdentityProviderConfig};
use crate::peer::broker::{PeerMessagingBroker, PeerMessagingBrokerOptions, PeerMessagingBrokerRef};
use crate::multiplex_service::GrpcMultiplexLayer;
use crate::peer::broker::{PeerMessagingBroker, PeerMessagingBrokerOptions};
use crate::provisioning::cleo_script::CleoScript;
use crate::resources::manager::{ResourcesManager, ResourcesManagerRef};
use crate::resources::manager::ResourcesManager;
use crate::resources::storage::PersistenceOptions;
use crate::vpn::Vpn;

pub mod grpc;
pub mod util;
Expand All @@ -55,6 +50,7 @@ mod vpn;
mod http;
mod provisioning;
mod auth;
mod multiplex_service;

#[tracing::instrument]
pub async fn create_with_telemetry(settings_override: config::Config) -> anyhow::Result<()> {
Expand Down Expand Up @@ -148,40 +144,10 @@ pub async fn create(settings: LoadedConfig) -> anyhow::Result<()> {
}
};

info!("Server listening at {address}...");
spawn_server(
address,
tls_config,
resources_manager,
cluster_manager,
peer_messaging_broker,
vpn,
carl_url,
settings.config,
ca_certificate,
oidc_registration_client,
grpc_auth_layer,
).await.unwrap();

Ok(())
}
//TODO remove
let ca = ca_certificate;
let settings = settings.config;

/// Isolation in function returning BoxFuture needed due to this: https://github.com/rust-lang/rust/issues/102211#issuecomment-1397600424
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, level="TRACE")]
fn spawn_server(
address: SocketAddr,
tls_config: TlsConfig,
resources_manager: ResourcesManagerRef,
cluster_manager: ClusterManagerRef,
peer_messaging_broker: PeerMessagingBrokerRef,
vpn: Vpn,
carl_url: ResourceHomeUrl,
settings: config::Config,
ca: Pem,
oidc_registration_client: Option<RegistrationClientRef>,
grpc_auth_layer: GrpcAuthenticationLayer,
) -> BoxFuture<'static, anyhow::Result<()>> {
let oidc_enabled = settings.get_bool("network.oidc.enabled").unwrap_or(false);

let cluster_manager_facade = ClusterManagerFacade::new(Arc::clone(&cluster_manager), Arc::clone(&resources_manager));
Expand All @@ -196,19 +162,6 @@ fn spawn_server(
);
let peer_messaging_broker_facade = PeerMessagingBrokerFacade::new(Arc::clone(&peer_messaging_broker));

let grpc = Server::builder()
.layer(async_interceptor(move |request| {
Clone::clone(&grpc_auth_layer).auth_interceptor(request)
}))
.accept_http1(true) //gRPC-web uses HTTP1
.add_service(cluster_manager_facade.into_grpc_service())
.add_service(metadata_provider_facade.into_grpc_service())
.add_service(peer_manager_facade.into_grpc_service())
.add_service(peer_messaging_broker_facade.into_grpc_service())
.into_service()
.map_response(|response| response.map(axum::body::boxed))
.boxed_clone();

let lea_dir = project::make_path_absolute(settings.get_string("serve.ui.directory")
.expect("Failed to find configuration for `serve.ui.directory`."))
.expect("Failure while making path absolute.");
Expand Down Expand Up @@ -258,52 +211,38 @@ fn spawn_server(
}

let http = axum::Router::new()
.fallback_service(
axum::Router::new()
.nest_service(
"/api/licenses",
ServeDir::new(&licenses_dir)
.fallback(ServeFile::new(licenses_dir.join("index.json")))
)
.route("/api/cleo/:architecture/download", get(router::cleo::download_cleo))
.route("/api/edgar/:architecture/download", get(router::edgar::download_edgar))
.route("/api/lea/config", get(router::lea_config))
.nest_service(
"/",
ServeDir::new(&lea_dir)
.fallback(ServeFile::new(lea_index_html))
)
.with_state(app_state)
.nest_service(
"/api/licenses",
ServeDir::new(&licenses_dir)
.fallback(ServeFile::new(licenses_dir.join("index.json")))
)
.map_err(BoxError::from)
.boxed_clone();

let http_grpc = Steer::new(vec![grpc, http], |request: &Request<_>, _services: &[_]| {
request.headers()
.get(CONTENT_TYPE)
.map(|content_type| {
let content_type = content_type.as_bytes();

if content_type.starts_with(b"application/grpc") {
0
} else {
1
}
}).unwrap_or(1)
});

match tls_config {
TlsConfig::Enabled(tls_config) => {
Box::pin(axum_server_dual_protocol::bind_dual_protocol(address, tls_config)
.set_upgrade(true) //http -> https
.serve(Shared::new(http_grpc))
.map_err(|cause| anyhow!(cause)))
}
TlsConfig::Disabled => {
// Disable TLS in case a load balancer with TLS termination is present
Box::pin(axum_server::bind(address).serve(Shared::new(http_grpc)).map_err(From::from))
}
}
.route("/api/cleo/:architecture/download", get(router::cleo::download_cleo))
.route("/api/edgar/:architecture/download", get(router::edgar::download_edgar))
.route("/api/lea/config", get(router::lea_config))
.nest_service(
"/",
ServeDir::new(&lea_dir)
.fallback(ServeFile::new(lea_index_html))
)
.with_state(app_state)
.into_service()
.map_response(|response| response.map(tonic::body::boxed));

let grpc = tonic::transport::Server::builder()
.layer(async_interceptor(move |request| {
Clone::clone(&grpc_auth_layer).auth_interceptor(request)
}))
.accept_http1(true) //gRPC-web uses HTTP1
.layer(GrpcMultiplexLayer::new(http))
.add_service(cluster_manager_facade.into_grpc_service())
.add_service(metadata_provider_facade.into_grpc_service())
.add_service(peer_manager_facade.into_grpc_service())
.add_service(peer_messaging_broker_facade.into_grpc_service());

info!("Server listening at {address}...");
grpc.serve(address).await?; //TODO tls_config?

Ok(())
}

enum TlsConfig {
Expand Down
Loading

0 comments on commit 900fd17

Please sign in to comment.