From 205d6d94af669e679885501591cc9ca333e579a4 Mon Sep 17 00:00:00 2001 From: arenard Date: Sun, 24 Feb 2019 18:53:07 +0100 Subject: [PATCH] Changed expected datatype of various options to number: `auto_commit_interval_ms`, `connections_max_idle_ms`, `fetch_max_bytes`, `fetch_max_wait_ms`, `fetch_min_bytes`, `heartbeat_interval_ms`, `max_partition_fetch_bytes`, `max_poll_interval_ms`, `max_poll_records`, `metadata_max_age_ms`, `receive_buffer_bytes`, `reconnect_backoff_ms`, `request_timeout_ms`, `retry_backoff_ms`, `send_buffer_bytes`, `session_timeout_ms` --- CHANGELOG.md | 8 +++++ docs/index.asciidoc | 66 ++++++++++++++++++------------------ lib/logstash/inputs/kafka.rb | 56 +++++++++++++++--------------- logstash-input-kafka.gemspec | 2 +- 4 files changed, 70 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac971f2..55b63b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 10.0.0 + - Changed expected datatype of various options to number: + `auto_commit_interval_ms`, `connections_max_idle_ms`, `fetch_max_bytes`, `fetch_max_wait_ms`, + `fetch_min_bytes`, `heartbeat_interval_ms`, `max_partition_fetch_bytes`, `max_poll_interval_ms`, + `max_poll_records`, `metadata_max_age_ms`, `receive_buffer_bytes`, `reconnect_backoff_ms`, + `request_timeout_ms`, `retry_backoff_ms`, `send_buffer_bytes`, `session_timeout_ms` + [#309](https://github.com/logstash-plugins/logstash-input-kafka/pull/309) + ## 9.0.0 - Removed obsolete `ssl` option diff --git a/docs/index.asciidoc b/docs/index.asciidoc index be86047..d9479a5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -77,39 +77,39 @@ https://kafka.apache.org/documentation for more details. [cols="<,<,<",options="header",] |======================================================================= |Setting |Input type|Required -| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>|No -| <> |<>|No -| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No -| <> |<>|No +| <> |<>|No | <> |a valid filesystem path|No | <> |a valid filesystem path|No | <> |<>|No -| <> |<>|No -| <> |<>|No -| <> |<>|No -| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>|No -| <> |<>|No -| <> |<>|No -| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No -| <> |<>|No -| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |a valid filesystem path|No @@ -131,8 +131,8 @@ input plugins. [id="plugins-{type}s-{plugin}-auto_commit_interval_ms"] ===== `auto_commit_interval_ms` - * Value type is <> - * Default value is `"5000"` + * Value type is <> + * Default value is `5000` The frequency in milliseconds that the consumer offsets are committed to Kafka. @@ -184,7 +184,7 @@ a logical application name to be included. [id="plugins-{type}s-{plugin}-connections_max_idle_ms"] ===== `connections_max_idle_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. Close idle connections after the number of milliseconds specified by this config. @@ -235,7 +235,7 @@ If set to true the only way to receive records from an internal topic is subscri [id="plugins-{type}s-{plugin}-fetch_max_bytes"] ===== `fetch_max_bytes` - * Value type is <> + * Value type is <> * There is no default value for this setting. The maximum amount of data the server should return for a fetch request. This is not an @@ -245,7 +245,7 @@ than this value, the message will still be returned to ensure that the consumer [id="plugins-{type}s-{plugin}-fetch_max_wait_ms"] ===== `fetch_max_wait_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The maximum amount of time the server will block before answering the fetch request if @@ -255,7 +255,7 @@ should be less than or equal to the timeout used in `poll_timeout_ms` [id="plugins-{type}s-{plugin}-fetch_min_bytes"] ===== `fetch_min_bytes` - * Value type is <> + * Value type is <> * There is no default value for this setting. The minimum amount of data the server should return for a fetch request. If insufficient @@ -275,7 +275,7 @@ Logstash instances with the same `group_id` [id="plugins-{type}s-{plugin}-heartbeat_interval_ms"] ===== `heartbeat_interval_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure @@ -326,7 +326,7 @@ Java Class used to deserialize the record's key [id="plugins-{type}s-{plugin}-max_partition_fetch_bytes"] ===== `max_partition_fetch_bytes` - * Value type is <> + * Value type is <> * There is no default value for this setting. The maximum amount of data per-partition the server will return. The maximum total memory used for a @@ -338,7 +338,7 @@ to fetch a large message on a certain partition. [id="plugins-{type}s-{plugin}-max_poll_interval_ms"] ===== `max_poll_interval_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The maximum delay between invocations of poll() when using consumer group management. This places @@ -350,7 +350,7 @@ The value of the configuration `request_timeout_ms` must always be larger than m [id="plugins-{type}s-{plugin}-max_poll_records"] ===== `max_poll_records` - * Value type is <> + * Value type is <> * There is no default value for this setting. The maximum number of records returned in a single call to poll(). @@ -358,7 +358,7 @@ The maximum number of records returned in a single call to poll(). [id="plugins-{type}s-{plugin}-metadata_max_age_ms"] ===== `metadata_max_age_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The period of time in milliseconds after which we force a refresh of metadata even if @@ -386,7 +386,7 @@ Time kafka consumer will wait to receive new messages from topics [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` - * Value type is <> + * Value type is <> * There is no default value for this setting. The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. @@ -394,7 +394,7 @@ The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. [id="plugins-{type}s-{plugin}-reconnect_backoff_ms"] ===== `reconnect_backoff_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The amount of time to wait before attempting to reconnect to a given host. @@ -404,7 +404,7 @@ This backoff applies to all requests sent by the consumer to the broker. [id="plugins-{type}s-{plugin}-request_timeout_ms"] ===== `request_timeout_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The configuration controls the maximum amount of time the client will wait @@ -415,7 +415,7 @@ retries are exhausted. [id="plugins-{type}s-{plugin}-retry_backoff_ms"] ===== `retry_backoff_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The amount of time to wait before attempting to retry a failed fetch request @@ -451,7 +451,7 @@ Security protocol to use, which can be either of PLAINTEXT,SSL,SASL_PLAINTEXT,SA [id="plugins-{type}s-{plugin}-send_buffer_bytes"] ===== `send_buffer_bytes` - * Value type is <> + * Value type is <> * There is no default value for this setting. The size of the TCP send buffer (SO_SNDBUF) to use when sending data @@ -459,7 +459,7 @@ The size of the TCP send buffer (SO_SNDBUF) to use when sending data [id="plugins-{type}s-{plugin}-session_timeout_ms"] ===== `session_timeout_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 4ce411e..858e173 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -53,7 +53,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base default :codec, 'plain' # The frequency in milliseconds that the consumer offsets are committed to Kafka. - config :auto_commit_interval_ms, :validate => :string, :default => "5000" + config :auto_commit_interval_ms, :validate => :number, :default => 5000 # What to do when there is no initial offset in Kafka or if an offset is out of range: # # * earliest: automatically reset the offset to the earliest offset @@ -76,7 +76,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # a logical application name to be included. config :client_id, :validate => :string, :default => "logstash" # Close idle connections after the number of milliseconds specified by this config. - config :connections_max_idle_ms, :validate => :string + config :connections_max_idle_ms, :validate => :number # Ideally you should have as many threads as the number of partitions for a perfect # balance — more threads than partitions means that some threads will be idle config :consumer_threads, :validate => :number, :default => 1 @@ -90,15 +90,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # The maximum amount of data the server should return for a fetch request. This is not an # absolute maximum, if the first message in the first non-empty partition of the fetch is larger # than this value, the message will still be returned to ensure that the consumer can make progress. - config :fetch_max_bytes, :validate => :string + config :fetch_max_bytes, :validate => :number # The maximum amount of time the server will block before answering the fetch request if # there isn't sufficient data to immediately satisfy `fetch_min_bytes`. This # should be less than or equal to the timeout used in `poll_timeout_ms` - config :fetch_max_wait_ms, :validate => :string + config :fetch_max_wait_ms, :validate => :number # The minimum amount of data the server should return for a fetch request. If insufficient # data is available the request will wait for that much data to accumulate # before answering the request. - config :fetch_min_bytes, :validate => :string + config :fetch_min_bytes, :validate => :number # The identifier of the group this consumer belongs to. Consumer group is a single logical subscriber # that happens to be made up of multiple processors. Messages in a topic will be distributed to all # Logstash instances with the same `group_id` @@ -108,7 +108,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # consumers join or leave the group. The value must be set lower than # `session.timeout.ms`, but typically should be set no higher than 1/3 of that value. # It can be adjusted even lower to control the expected time for normal rebalances. - config :heartbeat_interval_ms, :validate => :string + config :heartbeat_interval_ms, :validate => :number # Java Class used to deserialize the record's key config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer" # The maximum delay between invocations of poll() when using consumer group management. This places @@ -116,40 +116,40 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # If poll() is not called before expiration of this timeout, then the consumer is considered failed and # the group will rebalance in order to reassign the partitions to another member. # The value of the configuration `request_timeout_ms` must always be larger than max_poll_interval_ms - config :max_poll_interval_ms, :validate => :string + config :max_poll_interval_ms, :validate => :number # The maximum amount of data per-partition the server will return. The maximum total memory used for a # request will be #partitions * max.partition.fetch.bytes. This size must be at least # as large as the maximum message size the server allows or else it is possible for the producer to # send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying # to fetch a large message on a certain partition. - config :max_partition_fetch_bytes, :validate => :string + config :max_partition_fetch_bytes, :validate => :number # The maximum number of records returned in a single call to poll(). - config :max_poll_records, :validate => :string + config :max_poll_records, :validate => :number # The period of time in milliseconds after which we force a refresh of metadata even if # we haven't seen any partition leadership changes to proactively discover any new brokers or partitions - config :metadata_max_age_ms, :validate => :string + config :metadata_max_age_ms, :validate => :number # The class name of the partition assignment strategy that the client will use to distribute # partition ownership amongst consumer instances config :partition_assignment_strategy, :validate => :string # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. - config :receive_buffer_bytes, :validate => :string + config :receive_buffer_bytes, :validate => :number # The amount of time to wait before attempting to reconnect to a given host. # This avoids repeatedly connecting to a host in a tight loop. # This backoff applies to all requests sent by the consumer to the broker. - config :reconnect_backoff_ms, :validate => :string + config :reconnect_backoff_ms, :validate => :number # The configuration controls the maximum amount of time the client will wait # for the response of a request. If the response is not received before the timeout # elapses the client will resend the request if necessary or fail the request if # retries are exhausted. - config :request_timeout_ms, :validate => :string + config :request_timeout_ms, :validate => :number # The amount of time to wait before attempting to retry a failed fetch request # to a given topic partition. This avoids repeated fetching-and-failing in a tight loop. - config :retry_backoff_ms, :validate => :string + config :retry_backoff_ms, :validate => :number # The size of the TCP send buffer (SO_SNDBUF) to use when sending data - config :send_buffer_bytes, :validate => :string + config :send_buffer_bytes, :validate => :number # The timeout after which, if the `poll_timeout_ms` is not invoked, the consumer is marked dead # and a rebalance operation is triggered for the group identified by `group_id` - config :session_timeout_ms, :validate => :string + config :session_timeout_ms, :validate => :number # Java Class used to deserialize the record's value config :value_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer" # A list of topics to subscribe to, defaults to ["logstash"]. @@ -285,7 +285,7 @@ def create_consumer(client_id) props = java.util.Properties.new kafka = org.apache.kafka.clients.consumer.ConsumerConfig - props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms) + props.put(kafka::AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval_ms.to_i) props.put(kafka::AUTO_OFFSET_RESET_CONFIG, auto_offset_reset) unless auto_offset_reset.nil? props.put(kafka::BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers) props.put(kafka::CHECK_CRCS_CONFIG, check_crcs) unless check_crcs.nil? @@ -293,23 +293,23 @@ def create_consumer(client_id) props.put(kafka::CONNECTIONS_MAX_IDLE_MS_CONFIG, connections_max_idle_ms) unless connections_max_idle_ms.nil? props.put(kafka::ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit) props.put(kafka::EXCLUDE_INTERNAL_TOPICS_CONFIG, exclude_internal_topics) unless exclude_internal_topics.nil? - props.put(kafka::FETCH_MAX_BYTES_CONFIG, fetch_max_bytes) unless fetch_max_bytes.nil? - props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms) unless fetch_max_wait_ms.nil? - props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes) unless fetch_min_bytes.nil? + props.put(kafka::FETCH_MAX_BYTES_CONFIG, fetch_max_bytes.to_i) unless fetch_max_bytes.nil? + props.put(kafka::FETCH_MAX_WAIT_MS_CONFIG, fetch_max_wait_ms.to_i) unless fetch_max_wait_ms.nil? + props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_i) unless fetch_min_bytes.nil? props.put(kafka::GROUP_ID_CONFIG, group_id) - props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms) unless heartbeat_interval_ms.nil? + props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_i) unless heartbeat_interval_ms.nil? props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class) - props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes) unless max_partition_fetch_bytes.nil? - props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil? - props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms) unless max_poll_interval_ms.nil? + props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes.to_i) unless max_partition_fetch_bytes.nil? + props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records.to_i) unless max_poll_records.nil? + props.put(kafka::MAX_POLL_INTERVAL_MS_CONFIG, max_poll_interval_ms.to_i) unless max_poll_interval_ms.nil? props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms) unless metadata_max_age_ms.nil? props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? - props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? + props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_i) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? - props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? + props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms.to_i) unless request_timeout_ms.nil? props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil? - props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil? - props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms) unless session_timeout_ms.nil? + props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_i) unless send_buffer_bytes.nil? + props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms.to_i) unless session_timeout_ms.nil? props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class) props.put("security.protocol", security_protocol) unless security_protocol.nil? diff --git a/logstash-input-kafka.gemspec b/logstash-input-kafka.gemspec index 8434d4b..7a0b59d 100644 --- a/logstash-input-kafka.gemspec +++ b/logstash-input-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-kafka' - s.version = '9.0.0' + s.version = '10.0.0' s.licenses = ['Apache-2.0'] s.summary = "Reads events from a Kafka topic" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"