From 9cac101ba0e238105842eb4fcb3c5c05a79ac304 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 7 Nov 2024 19:45:13 +0800 Subject: [PATCH] test: loki write --- Cargo.lock | 1 + src/servers/src/http/event.rs | 5 ++- tests-integration/Cargo.toml | 1 + tests-integration/tests/http.rs | 68 +++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26669d9d5093..d94a44f99ad1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12364,6 +12364,7 @@ dependencies = [ "futures", "futures-util", "itertools 0.10.5", + "loki-api", "meta-client", "meta-srv", "moka", diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 29f1d2cf2dc9..b5d4f5fea848 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::result::Result as StdResult; use std::sync::Arc; use std::time::Instant; @@ -415,7 +415,8 @@ pub async fn loki_ingest( // encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 // use very dirty hack to parse labels let labels = stream.labels.replace("=", ":"); - let labels: HashMap = json5::from_str(&labels).context(ParseJson5Snafu)?; + // use btreemap to keep order + let labels: BTreeMap = json5::from_str(&labels).context(ParseJson5Snafu)?; // process entries for entry in stream.entries { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 38c3c01557fa..973ef9b8f58e 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -45,6 +45,7 @@ flow.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true futures-util.workspace = true +loki-api = "0.1" meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } moka.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 55dc253d8951..4f291e111474 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::io::Write; +use std::str::FromStr; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; @@ -21,6 +22,8 @@ use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter}; +use loki_api::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; @@ -28,6 +31,7 @@ use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use prost::Message; use serde_json::{json, Value}; use servers::http::handler::HealthResponse; +use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; use servers::http::result::error_result::ErrorResponse; @@ -92,6 +96,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, test_otlp_logs, + test_loki_logs, ); )* }; @@ -1690,6 +1695,69 @@ pub async fn test_otlp_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_loki_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loke_logs").await; + + let client = TestClient::new(app); + + // init loki request + let req: PushRequest = PushRequest { + streams: vec![StreamAdapter { + labels: "{service=\"test\",source=\"integration\"}".to_string(), + entries: vec![EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), + line: "this is a log message".to_string(), + }], + hash: rand::random(), + }], + }; + let encode = req.encode_to_vec(); + let body = prom_store::snappy_compress(&encode).unwrap(); + + // write to loki + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_table_name"), + ), + ], + "/v1/events/loki/api/v1/push", + body, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // test schema + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "loki_schema", + &client, + "show create table loki_table_name;", + expected, + ) + .await; + + // test content + let expected = r#"[[1730976830000000000,"this is a log message","test","integration"]]"#; + validate_data( + "loki_content", + &client, + "select * from loki_table_name;", + expected, + ) + .await; + + guard.remove_all().await; +} + async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str())