diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 44a713c734bf..167a9b638c6a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -81,6 +81,7 @@ pub mod authorize; mod dashboard; pub mod dyn_log; pub mod event; +mod extractor; pub mod handler; pub mod header; pub mod influxdb; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 16d55a7a1a7c..d969b8bd16e5 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -47,13 +47,15 @@ use crate::error::{ DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; +use crate::http::extractor::TableName; use crate::http::result::greptime_manage_resp::GreptimedbManageResponse; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED, - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER, + METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE, }; use crate::prom_store; use crate::query_handler::LogHandlerRef; @@ -365,11 +367,14 @@ pub async fn loki_ingest( State(log_state): State, Extension(mut ctx): Extension, TypedHeader(content_type): TypedHeader, + table_name: TableName, bytes: Bytes, ) -> Result { - // TODO(shuiyisong): should change channel to loki - ctx.set_channel(Channel::Http); + ctx.set_channel(Channel::Loki); let ctx = Arc::new(ctx); + let db = ctx.get_db_string(); + let db_str = db.as_str(); + let table_name = table_name.0.unwrap_or("loki_logs".to_string()); let exec_timer = std::time::Instant::now(); // decompress req @@ -480,8 +485,7 @@ pub async fn loki_ingest( }; let ins_req = RowInsertRequest { - // TODO(shuiyisong): table name - table_name: "test_table_name".to_string(), + table_name, rows: Some(rows), }; let ins_reqs = RowInsertRequests { @@ -491,23 +495,22 @@ pub async fn loki_ingest( let handler = log_state.log_handler; let output = handler.insert_logs(ins_reqs, ctx).await; - // TODO(shuiyisong): add metrics - // if let Ok(Output { - // data: OutputData::AffectedRows(rows), - // meta: _, - // }) = &output - // { - // METRIC_HTTP_LOGS_INGESTION_COUNTER - // .with_label_values(&[db.as_str()]) - // .inc_by(*rows as u64); - // METRIC_HTTP_LOGS_INGESTION_ELAPSED - // .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) - // .observe(exec_timer.elapsed().as_secs_f64()); - // } else { - // METRIC_HTTP_LOGS_INGESTION_ELAPSED - // .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - // .observe(exec_timer.elapsed().as_secs_f64()); - // } + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + METRIC_LOKI_LOGS_INGESTION_COUNTER + .with_label_values(&[db_str]) + .inc_by(*rows as u64); + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } else { + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } let response = GreptimedbV1Response::from_output(vec![output]) .await diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs new file mode 100644 index 000000000000..9634a8952019 --- /dev/null +++ b/src/servers/src/http/extractor.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::str; +use std::result::Result as StdResult; + +use axum::async_trait; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use axum::http::StatusCode; +use http::HeaderValue; + +use crate::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME; + +pub struct TableName(pub Option); + +#[async_trait] +impl FromRequestParts for TableName +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME); + + match table_name { + Some(name) => Ok(TableName(Some(pipeline_header_error( + name, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + )?))), + None => Ok(TableName(None)), + } + } +} + +pub(crate) fn pipeline_header_error( + header: &HeaderValue, + key: &str, +) -> StdResult { + let header_utf8 = str::from_utf8(header.as_bytes()); + match header_utf8 { + Ok(s) => Ok(s.to_string()), + Err(_) => Err(( + StatusCode::BAD_REQUEST, + format!("`{}` header is not valid UTF-8 string type.", key), + )), + } +} diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 6e5a583c0d62..dfb7fe7b4490 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::str; use std::result::Result as StdResult; use std::sync::Arc; use axum::extract::{FromRequestParts, State}; -use axum::http::header::HeaderValue; use axum::http::request::Parts; use axum::http::{header, StatusCode}; use axum::response::IntoResponse; @@ -42,9 +40,9 @@ use snafu::prelude::*; use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, PipelineSnafu, Result}; +use crate::http::extractor::{pipeline_header_error, TableName}; use crate::http::header::constants::{ GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, }; use crate::query_handler::OpenTelemetryProtocolHandlerRef; @@ -107,20 +105,6 @@ pub struct PipelineInfo { pub pipeline_version: Option, } -fn pipeline_header_error( - header: &HeaderValue, - key: &str, -) -> StdResult { - let header_utf8 = str::from_utf8(header.as_bytes()); - match header_utf8 { - Ok(s) => Ok(s.to_string()), - Err(_) => Err(( - StatusCode::BAD_REQUEST, - format!("`{}` header is not valid UTF-8 string type.", key), - )), - } -} - #[async_trait] impl FromRequestParts for PipelineInfo where @@ -157,31 +141,6 @@ where } } -pub struct TableInfo { - table_name: String, -} - -#[async_trait] -impl FromRequestParts for TableInfo -where - S: Send + Sync, -{ - type Rejection = (StatusCode, String); - - async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME); - - match table_name { - Some(name) => Ok(TableInfo { - table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?, - }), - None => Ok(TableInfo { - table_name: "opentelemetry_logs".to_string(), - }), - } - } -} - pub struct SelectInfoWrapper(SelectInfo); #[async_trait] @@ -215,10 +174,11 @@ pub async fn logs( State(handler): State, Extension(mut query_ctx): Extension, pipeline_info: PipelineInfo, - table_info: TableInfo, + tablename: TableName, SelectInfoWrapper(select_info): SelectInfoWrapper, bytes: Bytes, ) -> Result> { + let tablename = tablename.0.unwrap_or("opentelemetry_logs".to_string()); let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -245,7 +205,7 @@ pub async fn logs( }; handler - .logs(request, pipeline_way, table_info.table_name, query_ctx) + .logs(request, pipeline_way, tablename, query_ctx) .await .map(|o| OtlpResponse { resp_body: ExportLogsServiceResponse { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index ead86f3ad88b..87ab38dc8215 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -161,6 +161,19 @@ lazy_static! { &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] ) .unwrap(); + pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( + "greptime_servers_loki_logs_ingestion_counter", + "servers loki logs ingestion counter", + &[METRIC_DB_LABEL] + ) + .unwrap(); + pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_loki_logs_ingestion_elapsed", + "servers loki logs ingestion elapsed", + &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_logs_transform_elapsed", diff --git a/src/session/src/context.rs b/src/session/src/context.rs index f85a8ceea313..490d77b3366e 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -355,6 +355,7 @@ pub enum Channel { Grpc = 6, Influx = 7, Opentsdb = 8, + Loki = 9, } impl From for Channel { @@ -368,6 +369,7 @@ impl From for Channel { 6 => Self::Grpc, 7 => Self::Influx, 8 => Self::Opentsdb, + 9 => Self::Loki, _ => Self::Unknown, } @@ -395,6 +397,7 @@ impl Display for Channel { Channel::Grpc => write!(f, "grpc"), Channel::Influx => write!(f, "influx"), Channel::Opentsdb => write!(f, "opentsdb"), + Channel::Loki => write!(f, "loki"), Channel::Unknown => write!(f, "unknown"), } }