Skip to content

Commit

Permalink
Move common analytics functions out of analytics command
Browse files Browse the repository at this point in the history
  • Loading branch information
Westwooo committed Nov 21, 2024
1 parent a148576 commit 17f8752
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 143 deletions.
139 changes: 3 additions & 136 deletions src/cli/analytics.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
use crate::cli::error::{
analytics_error, client_error_to_shell_error, deserialize_error, malformed_response_error,
unexpected_status_code_error, AnalyticsErrorReason,
};
use crate::cli::analytics_common::send_analytics_query;
use crate::cli::util::{
cluster_identifiers_from, convert_json_value_to_nu_value, convert_row_to_nu_value,
duration_to_golang_string, get_active_cluster,
cluster_identifiers_from, convert_json_value_to_nu_value, get_active_cluster,
};
use crate::client::http_handler::HttpStreamResponse;
use crate::client::AnalyticsQueryRequest;
use crate::state::State;
use crate::RemoteCluster;
use futures::StreamExt;
use log::debug;
use nu_engine::command_prelude::Call;
use nu_engine::CallExt;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, ListStream, PipelineData, ShellError, Signals, Signature, Span, SyntaxShape, Value,
Category, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use std::ops::Add;
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::time::Instant;
use tokio_stream::StreamMap;
use utilities::json_row_stream::JsonRowStream;
use utilities::raw_json_row_streamer::RawJsonRowStreamer;
Expand Down Expand Up @@ -184,127 +175,3 @@ fn run(
signals,
)))
}

pub fn send_analytics_query(
active_cluster: &RemoteCluster,
scope: impl Into<Option<(String, String)>>,
statement: impl Into<String>,
signals: Signals,
span: Span,
rt: Arc<Runtime>,
) -> Result<HttpStreamResponse, ShellError> {
let response = active_cluster
.cluster()
.http_client()
.analytics_query_request(
AnalyticsQueryRequest::Execute {
statement: statement.into(),
scope: scope.into(),
timeout: duration_to_golang_string(active_cluster.timeouts().analytics_timeout()),
},
Instant::now().add(active_cluster.timeouts().analytics_timeout()),
signals.clone(),
rt.clone(),
)
.map_err(|e| client_error_to_shell_error(e, span))?;

if response.status() != 200 {
return Err(unexpected_status_code_error(
response.status(),
response.content()?,
span,
));
}

Ok(response)
}

pub fn read_analytics_response(
identifier: String,
response: HttpStreamResponse,
span: Span,
with_meta: bool,
could_contain_mutations: bool,
) -> Result<Vec<Value>, ShellError> {
let content = response.content()?;

let content: serde_json::Value =
serde_json::from_str(&content).map_err(|e| deserialize_error(e.to_string(), span))?;

let mut results: Vec<Value> = vec![];
if with_meta {
let converted = &mut convert_row_to_nu_value(&content, span, identifier)?;
results.append(converted);
return Ok(results);
}

if let Some(content_errors) = content.get("errors") {
return if let Some(arr) = content_errors.as_array() {
if arr.len() == 1 {
let e = match arr.first() {
Some(e) => e,
None => {
return Err(malformed_response_error(
"analytics errors present but empty",
content_errors.to_string(),
span,
))
}
};
let code = e.get("code").map(|c| c.as_i64().unwrap_or_default());
let reason = match code {
Some(c) => AnalyticsErrorReason::from(c),
None => AnalyticsErrorReason::UnknownError,
};
let msg = match e.get("msg") {
Some(msg) => msg.to_string(),
None => "".to_string(),
};
Err(analytics_error(reason, code, msg, span))
} else {
let messages = arr
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(",");

Err(analytics_error(
AnalyticsErrorReason::MultiErrors,
None,
messages,
span,
))
}
} else {
Err(malformed_response_error(
"analytics errors not an array",
content_errors.to_string(),
span,
))
};
} else if let Some(content_results) = content.get("results") {
if let Some(arr) = content_results.as_array() {
for result in arr {
results.append(&mut convert_row_to_nu_value(
result,
span,
identifier.clone(),
)?)
}
} else {
return Err(malformed_response_error(
"analytics rows not an array",
content_results.to_string(),
span,
));
}
} else if !could_contain_mutations {
return Err(malformed_response_error(
"analytics toplevel result not an object",
content.to_string(),
span,
));
}

Ok(results)
}
3 changes: 2 additions & 1 deletion src/cli/analytics_buckets.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::send_analytics_query;

use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
Expand All @@ -11,6 +11,7 @@ use nu_protocol::{
Category, IntoPipelineData, PipelineData, ShellError, Signature, SyntaxShape, Value,
};

use crate::cli::analytics_common::read_analytics_response;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;

Expand Down
137 changes: 137 additions & 0 deletions src/cli/analytics_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::cli::util::{convert_row_to_nu_value, duration_to_golang_string};
use crate::cli::{
analytics_error, client_error_to_shell_error, deserialize_error, malformed_response_error,
unexpected_status_code_error, AnalyticsErrorReason,
};
use crate::client::http_handler::HttpStreamResponse;
use crate::client::AnalyticsQueryRequest;
use crate::remote_cluster::RemoteCluster;
use nu_protocol::{ShellError, Signals, Span, Value};
use std::ops::Add;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::time::Instant;

pub fn send_analytics_query(
active_cluster: &RemoteCluster,
scope: impl Into<Option<(String, String)>>,
statement: impl Into<String>,
signals: Signals,
span: Span,
rt: Arc<Runtime>,
) -> Result<HttpStreamResponse, ShellError> {
let response = active_cluster
.cluster()
.http_client()
.analytics_query_request(
AnalyticsQueryRequest::Execute {
statement: statement.into(),
scope: scope.into(),
timeout: duration_to_golang_string(active_cluster.timeouts().analytics_timeout()),
},
Instant::now().add(active_cluster.timeouts().analytics_timeout()),
signals.clone(),
rt.clone(),
)
.map_err(|e| client_error_to_shell_error(e, span))?;

if response.status() != 200 {
return Err(unexpected_status_code_error(
response.status(),
response.content()?,
span,
));
}

Ok(response)
}

pub fn read_analytics_response(
identifier: String,
response: HttpStreamResponse,
span: Span,
with_meta: bool,
could_contain_mutations: bool,
) -> Result<Vec<Value>, ShellError> {
let content = response.content()?;

let content: serde_json::Value =
serde_json::from_str(&content).map_err(|e| deserialize_error(e.to_string(), span))?;

let mut results: Vec<Value> = vec![];
if with_meta {
let converted = &mut convert_row_to_nu_value(&content, span, identifier)?;
results.append(converted);
return Ok(results);
}

if let Some(content_errors) = content.get("errors") {
return if let Some(arr) = content_errors.as_array() {
if arr.len() == 1 {
let e = match arr.first() {
Some(e) => e,
None => {
return Err(malformed_response_error(
"analytics errors present but empty",
content_errors.to_string(),
span,
))
}
};
let code = e.get("code").map(|c| c.as_i64().unwrap_or_default());
let reason = match code {
Some(c) => AnalyticsErrorReason::from(c),
None => AnalyticsErrorReason::UnknownError,
};
let msg = match e.get("msg") {
Some(msg) => msg.to_string(),
None => "".to_string(),
};
Err(analytics_error(reason, code, msg, span))
} else {
let messages = arr
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(",");

Err(analytics_error(
AnalyticsErrorReason::MultiErrors,
None,
messages,
span,
))
}
} else {
Err(malformed_response_error(
"analytics errors not an array",
content_errors.to_string(),
span,
))
};
} else if let Some(content_results) = content.get("results") {
if let Some(arr) = content_results.as_array() {
for result in arr {
results.append(&mut convert_row_to_nu_value(
result,
span,
identifier.clone(),
)?)
}
} else {
return Err(malformed_response_error(
"analytics rows not an array",
content_results.to_string(),
span,
));
}
} else if !could_contain_mutations {
return Err(malformed_response_error(
"analytics toplevel result not an object",
content.to_string(),
span,
));
}

Ok(results)
}
2 changes: 1 addition & 1 deletion src/cli/analytics_datasets.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/analytics_dataverses.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/analytics_indexes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/analytics_links.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/columnar_databases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::generic_error;
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/columnar_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cli::analytics::{read_analytics_response, send_analytics_query};
use crate::cli::analytics_common::{read_analytics_response, send_analytics_query};
use crate::cli::util::{cluster_identifiers_from, get_active_cluster};
use crate::state::State;
use log::debug;
Expand Down
1 change: 1 addition & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod allow_ip;
mod analytics;
mod analytics_buckets;
mod analytics_common;
mod analytics_datasets;
mod analytics_dataverses;
mod analytics_indexes;
Expand Down

0 comments on commit 17f8752

Please sign in to comment.