Class: LogStash::Outputs::ClickHouse
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ClickHouse
- Includes:
- PluginMixins::HttpClient, Stud::Buffer
- Defined in:
- lib/logstash/outputs/clickhouse.rb
Instance Method Summary collapse
- #flush(events, close = false) ⇒ Object
- #generate_http_query(table_name) ⇒ Object
- #generate_table_name(event) ⇒ Object
- #mutate(src) ⇒ Object
- #print_plugin_info ⇒ Object
-
#receive(event) ⇒ Object
This module currently does not support parallel requests as that would circumvent the batching.
- #register ⇒ Object
Instance Method Details
#flush(events, close = false) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/logstash/outputs/clickhouse.rb', line 124 def flush(events, close = false) documents = {} events.each do |event| table_name = generate_table_name(event) documents[table_name] ||= "" documents[table_name] << LogStash::Json.dump(mutate(event.to_hash())) << "\n" end hosts = @http_hosts.clone documents.each do |table_name, document| make_request(document, hosts, generate_http_query(table_name), 1, 1, hosts.sample) end end |
#generate_http_query(table_name) ⇒ Object
96 97 98 99 |
# File 'lib/logstash/outputs/clickhouse.rb', line 96 def generate_http_query(table_name) params = { "query" => "INSERT INTO #{table_name} FORMAT JSONEachRow" }.merge(@extra_params) return "?#{URI.encode_www_form(params)}" end |
#generate_table_name(event) ⇒ Object
92 93 94 |
# File 'lib/logstash/outputs/clickhouse.rb', line 92 def generate_table_name(event) event.sprintf(@table_name) end |
#mutate(src) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/logstash/outputs/clickhouse.rb', line 101 def mutate(src) return src if @mutations.empty? res = {} @mutations.each_pair do |dstkey, source| case source when String scrkey = source next unless src.key?(scrkey) res[dstkey] = src[scrkey] when Array scrkey = source[0] next unless src.key?(scrkey) pattern = source[1] replace = source[2] res[dstkey] = src[scrkey].sub(Regexp.new(pattern), replace) end end res end |
#print_plugin_info ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/logstash/outputs/clickhouse.rb', line 50 def print_plugin_info() @@plugins = Gem::Specification.find_all { |spec| spec.name =~ /logstash-output-clickhouse/ } @plugin_name = @@plugins[0].name @plugin_version = @@plugins[0].version @logger.info("Running #{@plugin_name} version #{@plugin_version}") @logger.info("Initialized clickhouse with settings", :flush_size => @flush_size, :idle_flush_time => @idle_flush_time, :request_tokens => @pool_max, :http_hosts => @http_hosts, :headers => request_headers) end |
#receive(event) ⇒ Object
This module currently does not support parallel requests as that would circumvent the batching
88 89 90 |
# File 'lib/logstash/outputs/clickhouse.rb', line 88 def receive(event) buffer_receive(event) end |
#register ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/logstash/outputs/clickhouse.rb', line 64 def register # Handle this deprecated option. TODO: remove the option #@ssl_certificate_validation = @verify_ssl if @verify_ssl # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times { |t| @request_tokens << true } @requests = Array.new @table_name = table buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger, ) print_plugin_info() end |