Skip to content

Commit

Permalink
Feature/make mapping optional (#62)
Browse files Browse the repository at this point in the history
* Make JSON field mapping optional
*Update and complete tests
  • Loading branch information
ag-ramachandran authored Dec 12, 2023
1 parent 4865615 commit 4830770
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ gradle/wrapper/gradle-wrapper.properties
.vscode/settings.json
rspec.xml
e2e/output_file.txt
logs.txt
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog


# 2.0.3

- Make JSON mapping optional


# 2.0.2

- Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,
it can completely be skipped


# 2.0.0

- Use (5.0.2) version of the java sdk, and retrieve it from maven with bundler. Supports logstash 8.6 versions and up
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ More information about configuring Logstash can be found in the [logstash config
| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional|
| **database**| Database name to place events | Required |
| **table** | Target table name to place events | Required
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Required |
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional |
| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | |
| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| |
| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| |
Expand All @@ -81,6 +81,8 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox

| Version | Release Date | Notes |
| --- | --- | --- |
| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|

Expand Down
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ dependencies {
implementation 'com.azure:azure-storage-blob:12.23.0'
implementation 'com.azure:azure-storage-common:12.22.0'
implementation 'com.azure:azure-storage-internal-avro:12.8.0'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.3'
implementation 'com.azure:azure-storage-queue:12.18.0'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.3'
implementation 'com.fasterxml.jackson.core:jackson-core:2.14.3'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.3'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.3'
implementation 'com.fasterxml.woodstox:woodstox-core:6.5.0'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure.kusto:kusto-data:5.0.2'
Expand Down
152 changes: 86 additions & 66 deletions e2e/e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,66 @@ def initialize
@engine_url = ENV["ENGINE_URL"]
@ingest_url = ENV["INGEST_URL"]
@app_id = ENV["APP_ID"]
@app_kay = ENV['APP_KEY']
@app_key = ENV['APP_KEY']
@tenant_id = ENV['TENANT_ID']
@database = ENV['TEST_DATABASE']
@table = "RubyE2E#{Time.now.getutc.to_i}"
@table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}"
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
@mapping_name = "test_mapping"
@csv_file = "dataset.csv"

@logstash_config = %{
input {
file { path => "#{@input_file}"}
}
filter {
csv { columns => [#{@csv_columns}]}
}
output {
file { path => "#{@output_file}"}
stdout { codec => rubydebug }
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_kay}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table}"
json_mapping => "#{@mapping_name}"
input {
file { path => "#{@input_file}"}
}
filter {
csv { columns => [#{@csv_columns}]}
}
output {
file { path => "#{@output_file}"}
stdout { codec => rubydebug }
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_with_mapping}"
json_mapping => "#{@mapping_name}"
}
kusto {
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_without_mapping}"
}
}
}
}
end

def create_table_and_mapping
puts "Creating table #{@table}"
@query_client.execute(@database, ".drop table #{@table} ifexists")
sleep(1)
@query_client.execute(@database, ".create table #{@table} #{@columns}")
@query_client.execute(@database, ".alter table #{@table} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'
")
@query_client.execute(@database, ".create table #{@table} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Creating table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
sleep(1)
@query_client.execute(@database, ".create table #{tableop} #{@columns}")
@query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'")
}
# Mapping only for one table
@query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
end


def drop_and_cleanup
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Dropping table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
sleep(1)
}
end

def run_logstash
Expand All @@ -76,54 +97,53 @@ def run_logstash
def assert_data
max_timeout = 10
csv_data = CSV.read(@csv_file)

(0...max_timeout).each do |_|
begin
sleep(5)
query = @query_client.execute(@database, "#{@table} | sort by rownumber asc")
result = query.getPrimaryResults()
raise "Wrong count - expected #{csv_data.length}, got #{result.count()}" unless result.count() == csv_data.length
rescue Exception => e
puts "Error: #{e}"
end
(0...csv_data.length).each do |i|
result.next()
puts "Item #{i}"
(0...@column_count).each do |j|
csv_item = csv_data[i][j]
result_item = result.getObject(j) == nil ? "null" : result.getString(j)

#special cases for data that is different in csv vs kusto
if j == 4 #kusto boolean field
csv_item = csv_item.to_s == "1" ? "true" : "false"
elsif j == 12 # date formatting
csv_item = csv_item.sub(".0000000", "")
elsif j == 15 # numbers as text
result_item = i.to_s
elsif j == 17 #null
next
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Validating results for table #{tableop}"
(0...max_timeout).each do |_|
begin
sleep(5)
query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc")
result = query.getPrimaryResults()
raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length
rescue Exception => e
puts "Error: #{e}"
end
(0...csv_data.length).each do |i|
result.next()
puts "Item #{i}"
(0...@column_count).each do |j|
csv_item = csv_data[i][j]
result_item = result.getObject(j) == nil ? "null" : result.getString(j)
#special cases for data that is different in csv vs kusto
if j == 4 #kusto boolean field
csv_item = csv_item.to_s == "1" ? "true" : "false"
elsif j == 12 # date formatting
csv_item = csv_item.sub(".0000000", "")
elsif j == 15 # numbers as text
result_item = i.to_s
elsif j == 17 #null
next
end
puts " csv[#{j}] = #{csv_item}"
puts " result[#{j}] = #{result_item}"
raise "Result Doesn't match csv in table #{tableop}" unless csv_item == result_item
end
puts " csv[#{j}] = #{csv_item}"
puts " result[#{j}] = #{result_item}"
raise "Result Doesn't match csv" unless csv_item == result_item
puts ""
end
puts ""
return
end
return

end
raise "Failed after timeouts"

raise "Failed after timeouts"
}
end

def start
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAadApplicationCredentials(@engine_url, @app_id,
@app_kay, @tenant_id))
@app_key, @tenant_id))
create_table_and_mapping
run_logstash
assert_data
end

drop_and_cleanup
end
end

E2E::new().start
8 changes: 5 additions & 3 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :table, validate: :string, required: true
# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
config :json_mapping, validate: :string, required: true
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
config :json_mapping, validate: :string, default: nil

# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true
Expand Down Expand Up @@ -126,11 +127,12 @@ def register
@io_mutex = Mutex.new

final_mapping = json_mapping
if final_mapping.empty?
if final_mapping.nil? || final_mapping.empty?
final_mapping = mapping
end

# TODO: add id to the tmp path to support multiple outputs of the same type
# TODO: add id to the tmp path to support multiple outputs of the same type.
# TODO: Fix final_mapping when dynamic routing is supported
# add fields from the meta that will note the destination of the events in the file
@path = if dynamic_event_routing
File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}")
Expand Down
12 changes: 8 additions & 4 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end

#
@logger.debug(Gem.loaded_specs.to_s)
# Unfortunately there's no way to avoid using the gem/plugin name directly...
Expand All @@ -69,10 +68,15 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
end
@delete_local = delete_local

@logger.debug('Kusto resources are ready.')
end

Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.2
2.0.3

0 comments on commit 4830770

Please sign in to comment.