Class: LogStash::Outputs::ClickHouse

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient, Stud::Buffer
Defined in:
lib/logstash/outputs/clickhouse.rb

Instance Method Summary collapse

Instance Method Details

#flush(events, close = false) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/logstash/outputs/clickhouse.rb', line 124

def flush(events, close = false)
  documents = {}

  events.each do |event|
    table_name = generate_table_name(event)
    documents[table_name] ||= ""
    documents[table_name] << LogStash::Json.dump(mutate(event.to_hash())) << "\n"
  end

  hosts = @http_hosts.clone

  documents.each do |table_name, document|
    make_request(document, hosts, generate_http_query(table_name), 1, 1, hosts.sample)
  end
end

#generate_http_query(table_name) ⇒ Object



96
97
98
99
# File 'lib/logstash/outputs/clickhouse.rb', line 96

def generate_http_query(table_name)
  params = { "query" => "INSERT INTO #{table_name} FORMAT JSONEachRow" }.merge(@extra_params)
  return "?#{URI.encode_www_form(params)}"
end

#generate_table_name(event) ⇒ Object



92
93
94
# File 'lib/logstash/outputs/clickhouse.rb', line 92

def generate_table_name(event)
  event.sprintf(@table_name)
end

#mutate(src) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/logstash/outputs/clickhouse.rb', line 101

def mutate(src)
  return src if @mutations.empty?
  res = {}
  @mutations.each_pair do |dstkey, source|
    case source
    when String
      scrkey = source
      next unless src.key?(scrkey)

      res[dstkey] = src[scrkey]
    when Array
      scrkey = source[0]
      next unless src.key?(scrkey)
      pattern = source[1]
      replace = source[2]
      res[dstkey] = src[scrkey].sub(Regexp.new(pattern), replace)
    end
  end
  res
end


50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/logstash/outputs/clickhouse.rb', line 50

def print_plugin_info()
  @@plugins = Gem::Specification.find_all { |spec| spec.name =~ /logstash-output-clickhouse/ }
  @plugin_name = @@plugins[0].name
  @plugin_version = @@plugins[0].version
  @logger.info("Running #{@plugin_name} version #{@plugin_version}")

  @logger.info("Initialized clickhouse with settings",
               :flush_size => @flush_size,
               :idle_flush_time => @idle_flush_time,
               :request_tokens => @pool_max,
               :http_hosts => @http_hosts,
               :headers => request_headers)
end

#receive(event) ⇒ Object

This module currently does not support parallel requests as that would circumvent the batching



88
89
90
# File 'lib/logstash/outputs/clickhouse.rb', line 88

def receive(event)
  buffer_receive(event)
end

#registerObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/logstash/outputs/clickhouse.rb', line 64

def register
  # Handle this deprecated option. TODO: remove the option
  #@ssl_certificate_validation = @verify_ssl if @verify_ssl

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times { |t| @request_tokens << true }
  @requests = Array.new

  @table_name = table

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger,
  )

  print_plugin_info()
end