diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 5d9972dd4f4f..80a4f042c0f7 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -436,7 +436,7 @@ jobs: timeout-minutes: 60 strategy: matrix: - target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"] + target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"] mode: - name: "Remote WAL" minio: true @@ -449,6 +449,12 @@ jobs: minio: true kafka: false values: "with-minio.yaml" + - target: "fuzz_migrate_metric_regions" + mode: + name: "Local WAL" + minio: true + kafka: false + values: "with-minio.yaml" steps: - name: Remove unused software run: | diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index b0c33a1e28ff..cbac9df7133a 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -142,3 +142,10 @@ path = "targets/migration/fuzz_migrate_mito_regions.rs" test = false bench = false doc = false + +[[bin]] +name = "fuzz_migrate_metric_regions" +path = "targets/migration/fuzz_migrate_metric_regions.rs" +test = false +bench = false +doc = false diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs new file mode 100644 index 000000000000..d4fa4d08fd5e --- /dev/null +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -0,0 +1,475 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::info; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::{ChaCha20Rng, ChaChaRng}; +use snafu::{ensure, ResultExt}; +use sqlx::{Executor, MySql, Pool}; +use store_api::storage::RegionId; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map, + MappedGenerator, WordGenerator, +}; +use tests_fuzz::generator::create_expr::{ + CreateLogicalTableExprGeneratorBuilder, CreatePhysicalTableExprGeneratorBuilder, +}; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::Generator; +use tests_fuzz::ir::{ + generate_random_timestamp_for_mysql, generate_random_value, CreateTableExpr, Ident, + InsertIntoExpr, +}; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::utils::cluster_info::{fetch_nodes, PEER_TYPE_DATANODE}; +use tests_fuzz::utils::migration::migrate_region; +use tests_fuzz::utils::partition::{fetch_partition, fetch_partitions, region_distribution}; +use tests_fuzz::utils::procedure::procedure_state; +use tests_fuzz::utils::wait::wait_condition_fn; +use tests_fuzz::utils::{ + compact_table, flush_memtable, get_gt_fuzz_input_max_rows, get_gt_fuzz_input_max_tables, + init_greptime_connections_via_env, Connections, +}; +use tests_fuzz::validator::row::count_values; + +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Copy, Clone, Debug)] +struct FuzzInput { + seed: u64, + rows: usize, + tables: usize, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = u.int_in_range(u64::MIN..=u64::MAX)?; + let mut rng = ChaChaRng::seed_from_u64(seed); + let max_rows = get_gt_fuzz_input_max_rows(); + let rows = rng.gen_range(2..max_rows); + let max_tables = get_gt_fuzz_input_max_tables(); + let tables = rng.gen_range(1..max_tables); + + Ok(FuzzInput { rows, seed, tables }) + } +} + +fn generate_create_physical_table_expr(rng: &mut R) -> Result { + let physical_table_if_not_exists = rng.gen_bool(0.5); + let create_physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .if_not_exists(physical_table_if_not_exists) + .build() + .unwrap(); + create_physical_table_expr.generate(rng) +} + +async fn create_physical_table( + ctx: &FuzzContext, + rng: &mut R, +) -> Result { + // Create a physical table and a logical table on top of it + let create_physical_table_expr = generate_create_physical_table_expr(rng).unwrap(); + let translator = CreateTableExprTranslator; + let sql = translator.translate(&create_physical_table_expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Create physical table: {sql}, result: {result:?}"); + + Ok(Arc::new(TableContext::from(&create_physical_table_expr))) +} + +fn generate_create_logical_table_expr( + physical_table_ctx: TableContextRef, + rng: &mut R, +) -> Result { + let labels = rng.gen_range(1..=5); + let logical_table_if_not_exists = rng.gen_bool(0.5); + + let create_logical_table_expr = CreateLogicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .physical_table_ctx(physical_table_ctx) + .labels(labels) + .if_not_exists(logical_table_if_not_exists) + .build() + .unwrap(); + create_logical_table_expr.generate(rng) +} + +fn generate_insert_expr( + rows: usize, + rng: &mut R, + table_ctx: TableContextRef, +) -> Result { + let insert_generator = InsertExprGeneratorBuilder::default() + .omit_column_list(false) + .table_ctx(table_ctx) + .rows(rows) + .value_generator(Box::new(generate_random_value)) + .ts_value_generator(Box::new(generate_random_timestamp_for_mysql)) + .build() + .unwrap(); + insert_generator.generate(rng) +} + +async fn insert_values( + rows: usize, + ctx: &FuzzContext, + rng: &mut R, + logical_table_ctx: TableContextRef, +) -> Result { + let insert_expr = generate_insert_expr(rows, rng, logical_table_ctx.clone())?; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let result = ctx + .greptime + // unprepared query, see + .execute(sql.as_str()) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + + ensure!( + result.rows_affected() == rows as u64, + error::AssertSnafu { + reason: format!( + "expected rows affected: {}, actual: {}", + rows, + result.rows_affected(), + ) + } + ); + + Ok(insert_expr) +} + +#[derive(Debug, Clone, Copy)] +struct Migration { + from_peer: u64, + to_peer: u64, + region_id: RegionId, +} + +async fn create_logical_table_and_insert_values( + ctx: &FuzzContext, + input: &FuzzInput, + rng: &mut ChaCha20Rng, + physical_table_ctx: &TableContextRef, + table_num: usize, + tables: &mut HashMap, +) -> Result<()> { + for _ in 0..table_num { + let translator = CreateTableExprTranslator; + let create_logical_table_expr = + generate_create_logical_table_expr(physical_table_ctx.clone(), rng).unwrap(); + if tables.contains_key(&create_logical_table_expr.table_name) { + // Ignores same name logical table. + continue; + } + let sql = translator.translate(&create_logical_table_expr)?; + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Create logical table: {sql}, result: {result:?}"); + let logical_table_ctx = Arc::new(TableContext::from(&create_logical_table_expr)); + + let insert_expr = insert_values(input.rows, ctx, rng, logical_table_ctx.clone()).await?; + if rng.gen_bool(0.1) { + flush_memtable(&ctx.greptime, &physical_table_ctx.name).await?; + } + if rng.gen_bool(0.1) { + compact_table(&ctx.greptime, &physical_table_ctx.name).await?; + } + + tables.insert( + logical_table_ctx.name.clone(), + (logical_table_ctx.clone(), insert_expr), + ); + } + + Ok(()) +} + +async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + let mut rng = ChaCha20Rng::seed_from_u64(input.seed); + // Creates a physical table. + let physical_table_ctx = create_physical_table(&ctx, &mut rng).await?; + let mut tables = HashMap::with_capacity(input.tables); + + let table_num = (input.tables / 3).max(1); + + // Creates more logical tables and inserts values + create_logical_table_and_insert_values( + &ctx, + &input, + &mut rng, + &physical_table_ctx, + table_num, + &mut tables, + ) + .await?; + + // Fetches region distribution + let partitions = fetch_partitions(&ctx.greptime, physical_table_ctx.name.clone()).await?; + let num_partitions = partitions.len(); + let region_distribution = region_distribution(partitions); + info!("Region distribution: {region_distribution:?}"); + let datanodes = fetch_nodes(&ctx.greptime) + .await? + .into_iter() + .flat_map(|node| { + if node.peer_type == PEER_TYPE_DATANODE { + Some(node) + } else { + None + } + }) + .collect::>(); + info!("List datanodes: {:?}", datanodes); + + // Generates region migration task. + let mut migrations = Vec::with_capacity(num_partitions); + let mut new_distribution: BTreeMap> = BTreeMap::new(); + for (datanode_id, regions) in region_distribution { + let step = rng.gen_range(1..datanodes.len()); + for region in regions { + let to_peer = (datanode_id + step as u64) % datanodes.len() as u64; + new_distribution.entry(to_peer).or_default().insert(region); + migrations.push(Migration { + from_peer: datanode_id, + to_peer, + region_id: region, + }) + } + } + + let mut procedure_ids = Vec::with_capacity(migrations.len()); + // Triggers region migrations + for Migration { + from_peer, + to_peer, + region_id, + } in &migrations + { + let procedure_id = + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; + info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); + procedure_ids.push(procedure_id); + } + info!("Excepted new region distribution: {new_distribution:?}"); + for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) { + info!("Waits for migration: {migration:?}"); + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(120), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + fetch_partition(&greptime, region_id).await.unwrap() + } + }) + }, + |partition| { + info!("Region: {region_id}, datanode: {}", partition.datanode_id); + partition.datanode_id == migration.to_peer + }, + Duration::from_secs(1), + ) + .await; + } + + // Validates value rows + info!("Validates num of rows"); + for (table_ctx, insert_expr) in tables.values() { + let sql = format!("select count(1) as count from {}", table_ctx.name); + let values = count_values(&ctx.greptime, &sql).await?; + let expected_rows = insert_expr.values_list.len() as u64; + assert_eq!( + values.count as u64, expected_rows, + "Expected rows: {}, got: {}, table: {}", + expected_rows, values.count, table_ctx.name + ); + } + + // Creates more logical tables and inserts values + create_logical_table_and_insert_values( + &ctx, + &input, + &mut rng, + &physical_table_ctx, + table_num, + &mut tables, + ) + .await?; + + // Validates value rows + info!("Validates num of rows"); + for (table_ctx, insert_expr) in tables.values() { + let sql = format!("select count(1) as count from {}", table_ctx.name); + let values = count_values(&ctx.greptime, &sql).await?; + let expected_rows = insert_expr.values_list.len() as u64; + assert_eq!( + values.count as u64, expected_rows, + "Expected rows: {}, got: {}, table: {}", + expected_rows, values.count, table_ctx.name + ); + } + + // Recovers region distribution + let migrations = migrations + .into_iter() + .map( + |Migration { + from_peer, + to_peer, + region_id, + }| Migration { + from_peer: to_peer, + to_peer: from_peer, + region_id, + }, + ) + .collect::>(); + + let mut procedure_ids = Vec::with_capacity(migrations.len()); + // Triggers region migrations + for Migration { + from_peer, + to_peer, + region_id, + } in &migrations + { + let procedure_id = + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; + info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); + procedure_ids.push(procedure_id); + } + info!("Excepted new region distribution: {new_distribution:?}"); + for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) { + info!("Waits for migration: {migration:?}"); + let region_id = migration.region_id.as_u64(); + wait_condition_fn( + Duration::from_secs(120), + || { + let greptime = ctx.greptime.clone(); + let procedure_id = procedure_id.to_string(); + Box::pin(async move { + { + let output = procedure_state(&greptime, &procedure_id).await; + info!("Checking procedure: {procedure_id}, output: {output}"); + fetch_partition(&greptime, region_id).await.unwrap() + } + }) + }, + |partition| { + info!("Region: {region_id}, datanode: {}", partition.datanode_id); + partition.datanode_id == migration.to_peer + }, + Duration::from_secs(1), + ) + .await; + } + + // Creates more logical tables and inserts values + create_logical_table_and_insert_values( + &ctx, + &input, + &mut rng, + &physical_table_ctx, + table_num, + &mut tables, + ) + .await?; + + // Validates value rows + info!("Validates num of rows"); + for (table_ctx, insert_expr) in tables.values() { + let sql = format!("select count(1) as count from {}", table_ctx.name); + let values = count_values(&ctx.greptime, &sql).await?; + let expected_rows = insert_expr.values_list.len() as u64; + assert_eq!( + values.count as u64, expected_rows, + "Expected rows: {}, got: {}, table: {}", + expected_rows, values.count, table_ctx.name + ); + } + + // Clean up + for (table_ctx, _) in tables.values() { + let sql = format!("DROP TABLE {}", table_ctx.name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Drop table: {}\n\nResult: {result:?}\n\n", table_ctx.name); + } + + let sql = format!("DROP TABLE {}", physical_table_ctx.name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!( + "Drop table: {}\n\nResult: {result:?}\n\n", + physical_table_ctx.name + ); + + ctx.close().await; + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + execute_migration(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});