Skip to content

Commit

Permalink
chore: finish todo in loki write
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Nov 5, 2024
1 parent bee41c5 commit 699434c
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 66 deletions.
1 change: 1 addition & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub mod authorize;
mod dashboard;
pub mod dyn_log;
pub mod event;
mod extractor;
pub mod handler;
pub mod header;
pub mod influxdb;
Expand Down
47 changes: 25 additions & 22 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::TableName;
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER,
METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::prom_store;
use crate::query_handler::LogHandlerRef;
Expand Down Expand Up @@ -365,11 +367,14 @@ pub async fn loki_ingest(
State(log_state): State<LogState>,
Extension(mut ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
table_name: TableName,
bytes: Bytes,
) -> Result<HttpResponse> {
// TODO(shuiyisong): should change channel to loki
ctx.set_channel(Channel::Http);
ctx.set_channel(Channel::Loki);
let ctx = Arc::new(ctx);
let db = ctx.get_db_string();
let db_str = db.as_str();
let table_name = table_name.0.unwrap_or("loki_logs".to_string());
let exec_timer = std::time::Instant::now();

// decompress req
Expand Down Expand Up @@ -480,8 +485,7 @@ pub async fn loki_ingest(
};

let ins_req = RowInsertRequest {
// TODO(shuiyisong): table name
table_name: "test_table_name".to_string(),
table_name,
rows: Some(rows),
};
let ins_reqs = RowInsertRequests {
Expand All @@ -491,23 +495,22 @@ pub async fn loki_ingest(
let handler = log_state.log_handler;
let output = handler.insert_logs(ins_reqs, ctx).await;

// TODO(shuiyisong): add metrics
// if let Ok(Output {
// data: OutputData::AffectedRows(rows),
// meta: _,
// }) = &output
// {
// METRIC_HTTP_LOGS_INGESTION_COUNTER
// .with_label_values(&[db.as_str()])
// .inc_by(*rows as u64);
// METRIC_HTTP_LOGS_INGESTION_ELAPSED
// .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
// .observe(exec_timer.elapsed().as_secs_f64());
// } else {
// METRIC_HTTP_LOGS_INGESTION_ELAPSED
// .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
// .observe(exec_timer.elapsed().as_secs_f64());
// }
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}

let response = GreptimedbV1Response::from_output(vec![output])
.await
Expand Down
60 changes: 60 additions & 0 deletions src/servers/src/http/extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use core::str;
use std::result::Result as StdResult;

use axum::async_trait;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::http::StatusCode;
use http::HeaderValue;

use crate::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME;

pub struct TableName(pub Option<String>);

#[async_trait]
impl<S> FromRequestParts<S> for TableName
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME);

match table_name {
Some(name) => Ok(TableName(Some(pipeline_header_error(
name,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
)?))),
None => Ok(TableName(None)),
}
}
}

pub(crate) fn pipeline_header_error(
header: &HeaderValue,
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", key),
)),
}
}
48 changes: 4 additions & 44 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::str;
use std::result::Result as StdResult;
use std::sync::Arc;

use axum::extract::{FromRequestParts, State};
use axum::http::header::HeaderValue;
use axum::http::request::Parts;
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
Expand All @@ -42,9 +40,9 @@ use snafu::prelude::*;
use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::http::extractor::{pipeline_header_error, TableName};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

Expand Down Expand Up @@ -107,20 +105,6 @@ pub struct PipelineInfo {
pub pipeline_version: Option<String>,
}

fn pipeline_header_error(
header: &HeaderValue,
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", key),
)),
}
}

#[async_trait]
impl<S> FromRequestParts<S> for PipelineInfo
where
Expand Down Expand Up @@ -157,31 +141,6 @@ where
}
}

pub struct TableInfo {
table_name: String,
}

#[async_trait]
impl<S> FromRequestParts<S> for TableInfo
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME);

match table_name {
Some(name) => Ok(TableInfo {
table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?,
}),
None => Ok(TableInfo {
table_name: "opentelemetry_logs".to_string(),
}),
}
}
}

pub struct SelectInfoWrapper(SelectInfo);

#[async_trait]
Expand Down Expand Up @@ -215,10 +174,11 @@ pub async fn logs(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
table_info: TableInfo,
tablename: TableName,
SelectInfoWrapper(select_info): SelectInfoWrapper,
bytes: Bytes,
) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
let tablename = tablename.0.unwrap_or("opentelemetry_logs".to_string());
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
Expand All @@ -245,7 +205,7 @@ pub async fn logs(
};

handler
.logs(request, pipeline_way, table_info.table_name, query_ctx)
.logs(request, pipeline_way, tablename, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportLogsServiceResponse {
Expand Down
13 changes: 13 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ lazy_static! {
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_loki_logs_ingestion_counter",
"servers loki logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_LOKI_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_loki_logs_ingestion_elapsed",
"servers loki logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_transform_elapsed",
Expand Down
3 changes: 3 additions & 0 deletions src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ pub enum Channel {
Grpc = 6,
Influx = 7,
Opentsdb = 8,
Loki = 9,
}

impl From<u32> for Channel {
Expand All @@ -368,6 +369,7 @@ impl From<u32> for Channel {
6 => Self::Grpc,
7 => Self::Influx,
8 => Self::Opentsdb,
9 => Self::Loki,

_ => Self::Unknown,
}
Expand Down Expand Up @@ -395,6 +397,7 @@ impl Display for Channel {
Channel::Grpc => write!(f, "grpc"),
Channel::Influx => write!(f, "influx"),
Channel::Opentsdb => write!(f, "opentsdb"),
Channel::Loki => write!(f, "loki"),
Channel::Unknown => write!(f, "unknown"),
}
}
Expand Down

0 comments on commit 699434c

Please sign in to comment.