Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No support for multiple fluentd workers #68

Open
DanielShor opened this issue Jul 10, 2020 · 0 comments
Open

No support for multiple fluentd workers #68

DanielShor opened this issue Jul 10, 2020 · 0 comments

Comments

@DanielShor
Copy link

Hi,

I need to use multiple fluentd workers, and when trying to do that looks like mdsd plugin doesn't support multiple workers.

fluentd conf:

    <system>
      workers 4
    </system>
    
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    <filter kubernetes.**>
      @type prometheus
      <metric>
        name fluentd_input_status_num_records_total
        type counter
        desc The total number of incoming records
        <labels>
          tag ${tag}
          hostname ${hostname}
        </labels>
      </metric>
    </filter>
    
    <filter kubernetes.**>
      @type record_modifier
      <record>
          # NOTE: This Ruby code written in ${} has to be written in one line!   
        _temp_ ${ if record.has_key?("log"); begin; parsed = JSON.parse(record["log"]); parsed.each do |key, value|; record[key] = value; end; rescue; record["message"] = record["log"]; end; end; nil }
        _ine_ ${ if record.has_key?("LOG"); begin; record["LOG"].each do |key, value|; record[key] = value; end; rescue; record["message"] = record["LOG"]; end; end; nil }
        _kube_ ${ if record.has_key?("kubernetes"); begin; kube = record["kubernetes"]; kube.each do |key, value|; record[key] = value; end; rescue; end; end; nil}
        _build_id_ ${ begin; record["build_id"] = record["container_image"].split(':')[1].split('-')[0] if record.has_key?("container_image"); rescue; end; nil} 
        _service_ ${ if record.has_key?("labels"); record["service_name"] = record["labels"]["app_kubernetes_io/name"] if record["labels"].has_key?("app_kubernetes_io/name") end; nil;}
        _node_ ${ record["node"] = record["host"] if record.has_key?('host'); nil}
      </record>
      remove_keys _temp_, _kube_, _service_, _build_id_, _node_, host, log, namespace_id, pod_id, container_image, container_image_id, namespace_labels, master_url, docker, kubernetes, _ine_
    </filter>
    
    <match kubernetes.**>
      @type rewrite_tag_filter
      <rule>
        key     SOURCE
        pattern ^(OmsTelemetryLog)$
        tag     custom.$1
      </rule>
      <rule>
        key     SOURCE
        pattern ^(OmsTelemetryLog)$
        tag     custom.${tag}
        invert  true
      </rule>
    </match>
    
    <filter custom.**>
      @type record_modifier
      remove_keys  DATA, TAG, SOURCE, stream, kubernetes, source1, LOG
    </filter>
    
    <match custom.**>
      @type copy
      <store>
        @type mdsd
        @log_level info
        # Full path to mdsd dynamic json socket file
        djsonsocket /var/run/mdsd/default_djson.socket
        # Max time in milliseconds to wait for mdsd acknowledge response. If 0, no wait.
        acktimeoutms 5000
        # An array of regex patterns for mdsd source name unification purpose.
        # The passed will be matched against each regex, and if there's a match, the matched substring will be used as the resulting mdsd source name.
        mdsd_tag_regex_patterns ["kubernetes", "OmsTelemetryLog"]
        # Convert hash value to valid json.
        convert_hash_to_json true
        <buffer>
          @type file
          path /var/log/td-agent/buffer/kubernetes_out_mdsd.buffer
          chunk_limit_size 8m
          queued_chunks_limit_size 2048
          flush_interval 15s
          flush_thread_count 1
          flush_at_shutdown true
          retry_max_times 10
          retry_timeout 10m
        </buffer>
      </store>
      <store>
        @type prometheus
        <metric>
          name fluentd_output_status_num_records_total
          type counter
          desc The total number of outgoing records
          <labels>
            tag ${tag}
            hostname ${hostname}
          </labels>
        </metric>
      </store>
    </match>

When starting fluentd I get this (before it's crashed):

2020-07-10 07:45:18 +0000 [info]: starting fluentd-1.9.2 pid=7 ruby="2.4.9"
2020-07-10 07:45:18 +0000 [info]: spawn command to main:  cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/opt/td-agent/embedded/bin/fluentd", "-c", "/etc/fluentd/fluentd.conf", "--log", "/dev/stdout", "--under-supervisor"]
2020-07-10 07:45:18 +0000 [info]: adding filter pattern="kubernetes.**" type="prometheus"
2020-07-10 07:45:19 +0000 [info]: adding filter pattern="kubernetes.**" type="record_modifier"
2020-07-10 07:45:19 +0000 [info]: #1 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000027aa9e8 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #1 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000027b5e60 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [info]: adding match pattern="kubernetes.**" type="rewrite_tag_filter"
2020-07-10 07:45:19 +0000 [info]: #0 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001f98908 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #0 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001f9e650 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [info]: adding filter pattern="custom.**" type="record_modifier"
2020-07-10 07:45:19 +0000 [error]: #1 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: adding match pattern="custom.**" type="copy"
2020-07-10 07:45:19 +0000 [info]: Worker 1 finished unexpectedly with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: #3 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000000fcd528 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #3 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000000fce5e0 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [error]: #0 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Worker 0 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: #2 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000011548b0 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "", "custom.$1"]
2020-07-10 07:45:19 +0000 [info]: #2 adding rewrite_tag_filter rule: SOURCE [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000001163108 @keys="SOURCE">, /^(OmsTelemetryLog)$/, "!", "custom.${tag}"]
2020-07-10 07:45:19 +0000 [error]: #3 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Worker 3 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [error]: #2 config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [error]: fluent/log.rb:362:error: config error file="/etc/fluentd/fluentd.conf" error_class=Fluent::ConfigError error="Plugin 'mdsd' does not support multi workers configuration (Fluent::OutputMdsd)"
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Worker 2 finished with status 2
2020-07-10 07:45:19 +0000 [info]: Received graceful stop
2020-07-10 07:45:19 +0000 [info]: Received graceful stop

Is it possible to run with multiple workers using mdsd plugin? Have I missed something with my configuration? If not possible, why? Can we add this capability (would love to help with some directions :) )

Thanks in advance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant