Skip to content

Commit

Permalink
Changed expected datatype of various options to number:
Browse files Browse the repository at this point in the history
`auto_commit_interval_ms`, `connections_max_idle_ms`, `fetch_max_bytes`, `fetch_max_wait_ms`,
`fetch_min_bytes`, `heartbeat_interval_ms`, `max_partition_fetch_bytes`, `max_poll_interval_ms`,
`max_poll_records`, `metadata_max_age_ms`, `receive_buffer_bytes`, `reconnect_backoff_ms`,
`request_timeout_ms`, `retry_backoff_ms`, `send_buffer_bytes`, `session_timeout_ms`
  • Loading branch information
arenard committed Feb 24, 2019
1 parent 4adfec6 commit 4493364
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 50 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 10.0.0
- Changed expected datatype of various options to number:
`auto_commit_interval_ms`, `connections_max_idle_ms`, `fetch_max_bytes`, `fetch_max_wait_ms`,
`fetch_min_bytes`, `heartbeat_interval_ms`, `max_partition_fetch_bytes`, `max_poll_interval_ms`,
`max_poll_records`, `metadata_max_age_ms`, `receive_buffer_bytes`, `reconnect_backoff_ms`,
`request_timeout_ms`, `retry_backoff_ms`, `send_buffer_bytes`, `session_timeout_ms`
[#309](https://github.com/logstash-plugins/logstash-input-kafka/pull/309)

## 9.0.0
- Removed obsolete `ssl` option

Expand Down
66 changes: 33 additions & 33 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,39 +77,39 @@ https://kafka.apache.org/documentation for more details.
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-auto_commit_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-auto_offset_reset>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-bootstrap_servers>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-check_crcs>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-consumer_threads>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-decorate_events>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-enable_auto_commit>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-exclude_internal_topics>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-fetch_max_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-fetch_max_wait_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-key_deserializer_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_poll_records>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-max_partition_fetch_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_poll_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_poll_records>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
| <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-session_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-ssl_endpoint_identification_algorithm>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-ssl_key_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-ssl_keystore_location>> |a valid filesystem path|No
Expand All @@ -131,8 +131,8 @@ input plugins.
[id="plugins-{type}s-{plugin}-auto_commit_interval_ms"]
===== `auto_commit_interval_ms`

* Value type is <<string,string>>
* Default value is `"5000"`
* Value type is <<number,number>>
* Default value is `5000`

The frequency in milliseconds that the consumer offsets are committed to Kafka.

Expand Down Expand Up @@ -184,7 +184,7 @@ a logical application name to be included.
[id="plugins-{type}s-{plugin}-connections_max_idle_ms"]
===== `connections_max_idle_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

Close idle connections after the number of milliseconds specified by this config.
Expand Down Expand Up @@ -235,7 +235,7 @@ If set to true the only way to receive records from an internal topic is subscri
[id="plugins-{type}s-{plugin}-fetch_max_bytes"]
===== `fetch_max_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The maximum amount of data the server should return for a fetch request. This is not an
Expand All @@ -245,7 +245,7 @@ than this value, the message will still be returned to ensure that the consumer
[id="plugins-{type}s-{plugin}-fetch_max_wait_ms"]
===== `fetch_max_wait_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The maximum amount of time the server will block before answering the fetch request if
Expand All @@ -255,7 +255,7 @@ should be less than or equal to the timeout used in `poll_timeout_ms`
[id="plugins-{type}s-{plugin}-fetch_min_bytes"]
===== `fetch_min_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The minimum amount of data the server should return for a fetch request. If insufficient
Expand All @@ -275,7 +275,7 @@ Logstash instances with the same `group_id`
[id="plugins-{type}s-{plugin}-heartbeat_interval_ms"]
===== `heartbeat_interval_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure
Expand Down Expand Up @@ -326,7 +326,7 @@ Java Class used to deserialize the record's key
[id="plugins-{type}s-{plugin}-max_partition_fetch_bytes"]
===== `max_partition_fetch_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The maximum amount of data per-partition the server will return. The maximum total memory used for a
Expand All @@ -338,7 +338,7 @@ to fetch a large message on a certain partition.
[id="plugins-{type}s-{plugin}-max_poll_interval_ms"]
===== `max_poll_interval_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The maximum delay between invocations of poll() when using consumer group management. This places
Expand All @@ -350,15 +350,15 @@ The value of the configuration `request_timeout_ms` must always be larger than m
[id="plugins-{type}s-{plugin}-max_poll_records"]
===== `max_poll_records`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The maximum number of records returned in a single call to poll().

[id="plugins-{type}s-{plugin}-metadata_max_age_ms"]
===== `metadata_max_age_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The period of time in milliseconds after which we force a refresh of metadata even if
Expand Down Expand Up @@ -386,15 +386,15 @@ Time kafka consumer will wait to receive new messages from topics
[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.

[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
===== `reconnect_backoff_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The amount of time to wait before attempting to reconnect to a given host.
Expand All @@ -404,7 +404,7 @@ This backoff applies to all requests sent by the consumer to the broker.
[id="plugins-{type}s-{plugin}-request_timeout_ms"]
===== `request_timeout_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The configuration controls the maximum amount of time the client will wait
Expand All @@ -415,7 +415,7 @@ retries are exhausted.
[id="plugins-{type}s-{plugin}-retry_backoff_ms"]
===== `retry_backoff_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The amount of time to wait before attempting to retry a failed fetch request
Expand Down Expand Up @@ -451,15 +451,15 @@ Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SA
[id="plugins-{type}s-{plugin}-send_buffer_bytes"]
===== `send_buffer_bytes`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The size of the TCP send buffer (SO_SNDBUF) to use when sending data

[id="plugins-{type}s-{plugin}-session_timeout_ms"]
===== `session_timeout_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead
Expand Down
32 changes: 16 additions & 16 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
default :codec, 'plain'

# The frequency in milliseconds that the consumer offsets are committed to Kafka.
config :auto_commit_interval_ms, :validate => :string, :default => "5000"
config :auto_commit_interval_ms, :validate => :number, :default => 5000
# What to do when there is no initial offset in Kafka or if an offset is out of range:
#
# * earliest: automatically reset the offset to the earliest offset
Expand All @@ -76,7 +76,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# a logical application name to be included.
config :client_id, :validate => :string, :default => "logstash"
# Close idle connections after the number of milliseconds specified by this config.
config :connections_max_idle_ms, :validate => :string
config :connections_max_idle_ms, :validate => :number
# Ideally you should have as many threads as the number of partitions for a perfect
# balance — more threads than partitions means that some threads will be idle
config :consumer_threads, :validate => :number, :default => 1
Expand All @@ -90,15 +90,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# The maximum amount of data the server should return for a fetch request. This is not an
# absolute maximum, if the first message in the first non-empty partition of the fetch is larger
# than this value, the message will still be returned to ensure that the consumer can make progress.
config :fetch_max_bytes, :validate => :string
config :fetch_max_bytes, :validate => :number
# The maximum amount of time the server will block before answering the fetch request if
# there isn't sufficient data to immediately satisfy `fetch_min_bytes`. This
# should be less than or equal to the timeout used in `poll_timeout_ms`
config :fetch_max_wait_ms, :validate => :string
config :fetch_max_wait_ms, :validate => :number
# The minimum amount of data the server should return for a fetch request. If insufficient
# data is available the request will wait for that much data to accumulate
# before answering the request.
config :fetch_min_bytes, :validate => :string
config :fetch_min_bytes, :validate => :number
# The identifier of the group this consumer belongs to. Consumer group is a single logical subscriber
# that happens to be made up of multiple processors. Messages in a topic will be distributed to all
# Logstash instances with the same `group_id`
Expand All @@ -108,48 +108,48 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# consumers join or leave the group. The value must be set lower than
# `session.timeout.ms`, but typically should be set no higher than 1/3 of that value.
# It can be adjusted even lower to control the expected time for normal rebalances.
config :heartbeat_interval_ms, :validate => :string
config :heartbeat_interval_ms, :validate => :number
# Java Class used to deserialize the record's key
config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer"
# The maximum delay between invocations of poll() when using consumer group management. This places
# an upper bound on the amount of time that the consumer can be idle before fetching more records.
# If poll() is not called before expiration of this timeout, then the consumer is considered failed and
# the group will rebalance in order to reassign the partitions to another member.
# The value of the configuration `request_timeout_ms` must always be larger than max_poll_interval_ms
config :max_poll_interval_ms, :validate => :string
config :max_poll_interval_ms, :validate => :number
# The maximum amount of data per-partition the server will return. The maximum total memory used for a
# request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least
# as large as the maximum message size the server allows or else it is possible for the producer to
# send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying
# to fetch a large message on a certain partition.
config :max_partition_fetch_bytes, :validate => :string
config :max_partition_fetch_bytes, :validate => :number
# The maximum number of records returned in a single call to poll().
config :max_poll_records, :validate => :string
config :max_poll_records, :validate => :number
# The period of time in milliseconds after which we force a refresh of metadata even if
# we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
config :metadata_max_age_ms, :validate => :string
config :metadata_max_age_ms, :validate => :number
# The class name of the partition assignment strategy that the client will use to distribute
# partition ownership amongst consumer instances
config :partition_assignment_strategy, :validate => :string
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
config :receive_buffer_bytes, :validate => :string
config :receive_buffer_bytes, :validate => :number
# The amount of time to wait before attempting to reconnect to a given host.
# This avoids repeatedly connecting to a host in a tight loop.
# This backoff applies to all requests sent by the consumer to the broker.
config :reconnect_backoff_ms, :validate => :string
config :reconnect_backoff_ms, :validate => :number
# The configuration controls the maximum amount of time the client will wait
# for the response of a request. If the response is not received before the timeout
# elapses the client will resend the request if necessary or fail the request if
# retries are exhausted.
config :request_timeout_ms, :validate => :string
config :request_timeout_ms, :validate => :number
# The amount of time to wait before attempting to retry a failed fetch request
# to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
config :retry_backoff_ms, :validate => :string
config :retry_backoff_ms, :validate => :number
# The size of the TCP send buffer (SO_SNDBUF) to use when sending data
config :send_buffer_bytes, :validate => :string
config :send_buffer_bytes, :validate => :number
# The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead
# and a rebalance operation is triggered for the group identified by `group_id`
config :session_timeout_ms, :validate => :string
config :session_timeout_ms, :validate => :number
# Java Class used to deserialize the record's value
config :value_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer"
# A list of topics to subscribe to, defaults to ["logstash"].
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-kafka'
s.version = '9.0.0'
s.version = '10.0.0'
s.licenses = ['Apache-2.0']
s.summary = "Reads events from a Kafka topic"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down

0 comments on commit 4493364

Please sign in to comment.