Skip to content

Commit

Permalink
add databrickssecretscope crd, works on qa tenant
Browse files Browse the repository at this point in the history
  • Loading branch information
mach-kernel committed Jul 15, 2024
1 parent 72e71cc commit 0907816
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 93 deletions.
4 changes: 4 additions & 0 deletions databricks-kube/src/crdgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ fn main() {
"---\n{}\n",
to_string(&crate::crds::repo::Repo::crd()).unwrap()
);
print!(
"---\n{}\n",
to_string(&crate::crds::databricks_secret_scope::DatabricksSecretScope::crd()).unwrap()
);
}
135 changes: 135 additions & 0 deletions databricks-kube/src/crds/databricks_secret_scope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::pin::Pin;
use std::{sync::Arc, time::SystemTime};

use crate::context::Context;
use async_stream::{stream, try_stream};
use databricks_rust_secrets::apis::secret_api;
use databricks_rust_secrets::models::{
WorkspaceCreateScope, WorkspaceDeleteScope, WorkspaceListScopesResponse, WorkspaceSecretScope,
};
use futures::{future, Stream, StreamExt, TryStreamExt};
use kube::{core::object::HasSpec, CustomResource};

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::traits::rest_config::RestConfig;
use crate::{error::DatabricksKubeError, traits::remote_api_resource::RemoteAPIResource};

#[derive(Clone, CustomResource, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)]
#[kube(
group = "com.dstancu.databricks",
version = "v1",
kind = "DatabricksSecretScope",
derive = "Default",
namespaced
)]
pub struct DatabricksSecretScopeSpec {
pub scope: WorkspaceSecretScope,
}

// API -> CRD
impl From<WorkspaceSecretScope> for DatabricksSecretScope {
fn from(scope: WorkspaceSecretScope) -> Self {
let k8s_name = scope.name.clone().unwrap_or(format!(
"noname-{}",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
));

Self::new(&k8s_name, DatabricksSecretScopeSpec { scope })
}
}

// CRD -> API
impl From<DatabricksSecretScope> for WorkspaceSecretScope {
fn from(value: DatabricksSecretScope) -> Self {
value.spec().scope.clone()
}
}

impl RemoteAPIResource<WorkspaceSecretScope> for DatabricksSecretScope {
fn remote_list_all(
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<WorkspaceSecretScope, DatabricksKubeError>> + Send>> {
try_stream! {
let config = WorkspaceListScopesResponse::get_rest_config(context.clone()).await.unwrap();

if let WorkspaceListScopesResponse {
scopes
} = secret_api::list_scopes(&config).await? {
if let Some(scopes) = scopes {
for scope in scopes {
yield scope;
}
}
}
}
.boxed()
}

fn remote_get(
&self,
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<WorkspaceSecretScope, DatabricksKubeError>> + Send>> {
let scope_name = self.spec().scope.name.clone().unwrap();

stream! {
let remote = Self::remote_list_all(context)
.try_filter(move |rs| future::ready(rs.clone().name.map(|rsn| rsn == scope_name.clone()).unwrap_or(false)))
.next()
.await;

yield remote.unwrap_or(Err(DatabricksKubeError::IDUnsetError))
}
.boxed()
}

fn remote_create(
&self,
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<Self, DatabricksKubeError>> + Send + '_>> {
let scope = self.spec().scope.clone();

try_stream! {
let config = WorkspaceSecretScope::get_rest_config(context.clone()).await.unwrap();

// Endpoint has no response value
secret_api::create_scope(&config, Some(WorkspaceCreateScope {
scope: scope.name.unwrap(),
..Default::default()
})).await?;

yield self.clone()
}
.boxed()
}

fn remote_update(
&self,
_: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<Self, DatabricksKubeError>> + Send + '_>> {
log::warn!("No change made! Secret scope resources are not updatable. Delete and recreate your resource.");
try_stream! { yield self.clone() }.boxed()
}

fn remote_delete(
&self,
context: Arc<Context>,
) -> Pin<Box<dyn Stream<Item = Result<(), DatabricksKubeError>> + Send + '_>> {
let scope = self.spec().scope.clone();

try_stream! {
let config = WorkspaceSecretScope::get_rest_config(context.clone()).await.unwrap();
secret_api::delete_scope(
&config,
Some(WorkspaceDeleteScope { scope: scope.name.unwrap() })
).await?;

yield ();
}
.boxed()
}
}
12 changes: 7 additions & 5 deletions databricks-kube/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ macro_rules! openapi_error_glue {
status,
content,
entity,
}) => Self::APIError(OpenAPIError::ResponseError(SerializableResponseContent {
status,
content,
entity: entity.and_then(|e| to_value(e).ok()),
})),
}) => {
Self::APIError(OpenAPIError::ResponseError(SerializableResponseContent {
status,
content,
entity: entity.and_then(|e| to_value(e).ok()),
}))
}
$error::Io(e) => Self::APIError(OpenAPIError::Io(e)),
$error::Serde(e) => Self::APIError(OpenAPIError::Serde(e)),
$error::Reqwest(e) => Self::APIError(OpenAPIError::Reqwest(e)),
Expand Down
16 changes: 15 additions & 1 deletion databricks-kube/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{collections::BTreeMap, hash::Hash, sync::Arc, time::Duration};
use databricks_kube::{
context::Context,
crds::databricks_job::DatabricksJob,
crds::databricks_secret_scope::DatabricksSecretScope,
crds::git_credential::GitCredential,
crds::repo::Repo,
error::DatabricksKubeError,
Expand Down Expand Up @@ -73,7 +74,8 @@ async fn main() -> Result<(), DatabricksKubeError> {

ensure_crd("databricksjobs.com.dstancu.databricks", crd_api.clone()).await?;
ensure_crd("gitcredentials.com.dstancu.databricks", crd_api.clone()).await?;
ensure_crd("repos.com.dstancu.databricks", crd_api).await?;
ensure_crd("repos.com.dstancu.databricks", crd_api.clone()).await?;
ensure_crd("databrickssecretscopes.com.dstancu.databricks", crd_api).await?;
ensure_configmap(cm_api.clone()).await?;

let configmap_store = watch_configmap(cm_api.clone()).await?;
Expand All @@ -95,6 +97,7 @@ async fn main() -> Result<(), DatabricksKubeError> {

let git_credential_controller = GitCredential::controller(ctx.clone());
let repo_controller = Repo::controller(ctx.clone());
let secret_scope_controller = DatabricksSecretScope::controller(ctx.clone());

Toplevel::new()
.start(
Expand Down Expand Up @@ -137,6 +140,17 @@ async fn main() -> Result<(), DatabricksKubeError> {
})
},
)
.start(
"secret_scope_controller",
|_: SubsystemHandle<DatabricksKubeError>| {
secret_scope_controller
.for_each(log_controller_event)
.map(|_| {
let res: Result<(), DatabricksKubeError> = Ok(());
res
})
},
)
.catch_signals()
.handle_shutdown_requests(Duration::from_secs(1))
.await
Expand Down
113 changes: 39 additions & 74 deletions databricks-kube/src/traits/rest_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ use databricks_rust_repos::{
apis::configuration::Configuration as RepoClientConfig, models::GetRepoResponse as Repo,
};

use databricks_rust_secrets::{
apis::configuration::Configuration as SecretClientConfig,
models::WorkspaceGetSecretResponse as Secret, models::WorkspaceListScopesResponse as Scopes,
models::WorkspaceSecretScope as Scope,
};

use futures::FutureExt;
use tokio::time::interval;

Expand All @@ -21,84 +27,43 @@ pub trait RestConfig<TConfigType> {
) -> Pin<Box<dyn futures::Future<Output = Option<TConfigType>> + std::marker::Send>>;
}

impl RestConfig<JobClientConfig> for Job {
fn get_rest_config(
context: Arc<Context>,
) -> Pin<Box<dyn futures::Future<Output = Option<JobClientConfig>> + std::marker::Send>> {
let mut wait = interval(Duration::from_secs(15));
#[macro_export]
macro_rules! openapi_config_glue {
($client_config:ident, $dto:ident) => {
impl RestConfig<$client_config> for $dto {
fn get_rest_config(
context: Arc<Context>,
) -> Pin<Box<dyn futures::Future<Output = Option<$client_config>> + std::marker::Send>>
{
let mut wait = interval(Duration::from_secs(15));

async move {
while let None = context.get_api_secret() {
wait.tick().await;
log::info!("Waiting for REST credentials...");
}
async move {
while let None = context.get_api_secret() {
wait.tick().await;
log::info!("Waiting for REST credentials...");
}

let api_secret = context.get_api_secret()?;
let (url, token) = (
api_secret.databricks_url.unwrap().to_string(),
api_secret.access_token.unwrap().to_string(),
);
Some(JobClientConfig {
base_path: url,
bearer_access_token: Some(token),
..JobClientConfig::default()
})
}
.boxed()
}
}

impl RestConfig<GitCredentialClientConfig> for GitCredential {
fn get_rest_config(
context: Arc<Context>,
) -> Pin<Box<dyn futures::Future<Output = Option<GitCredentialClientConfig>> + std::marker::Send>>
{
let mut wait = interval(Duration::from_secs(15));
let api_secret = context.get_api_secret()?;
let (url, token) = (
api_secret.databricks_url.unwrap().to_string(),
api_secret.access_token.unwrap().to_string(),
);

async move {
while let None = context.get_api_secret() {
wait.tick().await;
log::info!("Waiting for REST credentials...");
Some($client_config {
base_path: url,
bearer_access_token: Some(token),
..$client_config::default()
})
}
.boxed()
}

let api_secret = context.get_api_secret()?;
let (url, token) = (
api_secret.databricks_url.unwrap().to_string(),
api_secret.access_token.unwrap().to_string(),
);
Some(GitCredentialClientConfig {
base_path: format!("{}/2.0", url),
bearer_access_token: Some(token),
..GitCredentialClientConfig::default()
})
}
.boxed()
}
};
}

impl RestConfig<RepoClientConfig> for Repo {
fn get_rest_config(
context: Arc<Context>,
) -> Pin<Box<dyn futures::Future<Output = Option<RepoClientConfig>> + std::marker::Send>> {
let mut wait = interval(Duration::from_secs(15));

async move {
while let None = context.get_api_secret() {
wait.tick().await;
log::info!("Waiting for REST credentials...");
}

let api_secret = context.get_api_secret()?;
let (url, token) = (
api_secret.databricks_url.unwrap().to_string(),
api_secret.access_token.unwrap().to_string(),
);
Some(RepoClientConfig {
base_path: format!("{}/2.0", url),
bearer_access_token: Some(token),
..RepoClientConfig::default()
})
}
.boxed()
}
}
openapi_config_glue!(JobClientConfig, Job);
openapi_config_glue!(GitCredentialClientConfig, GitCredential);
openapi_config_glue!(RepoClientConfig, Repo);
openapi_config_glue!(SecretClientConfig, Secret);
openapi_config_glue!(SecretClientConfig, Scope);
openapi_config_glue!(SecretClientConfig, Scopes);
1 change: 0 additions & 1 deletion databricks-kube/tests/common/mock_k8s.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(dead_code)]

use std::time::{self, SystemTime};

use crate::common::fake_resource::FakeResource;

Expand Down
Loading

0 comments on commit 0907816

Please sign in to comment.