Module: LogStash::Util

Defined in:
lib/logstash/util.rb,
lib/logstash/namespace.rb,
lib/logstash/util/decorators.rb,
lib/logstash/util/plugin_version.rb,
lib/logstash/util/defaults_printer.rb,
lib/logstash/util/wrapped_synchronous_queue.rb,
lib/logstash/util/worker_threads_default_printer.rb

Defined Under Namespace

Modules: Decorators, FileTools, JavaVersion, SocketPeer, UnicodeTrimmer Classes: Charset, DefaultsPrinter, Password, PluginVersion, WorkerThreadsDefaultPrinter, WrappedSynchronousQueue

Constant Summary collapse

UNAME =
case RbConfig::CONFIG["host_os"]
  when /^linux/; "linux"
  else; RbConfig::CONFIG["host_os"]
end
PR_SET_NAME =
15

Class Method Summary collapse

Class Method Details

.deep_clone(o) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/logstash/util.rb', line 187

def self.deep_clone(o)
  case o
  when Hash
    o.inject({}) {|h, (k,v)| h[k] = deep_clone(v); h }
  when Array
    o.map {|v| deep_clone(v) }
  when Fixnum, Symbol, IO, TrueClass, FalseClass, NilClass
    o
  when LogStash::Codecs::Base
    o.clone
  when String
    o.clone #need to keep internal state e.g. frozen
  else
    Marshal.load(Marshal.dump(o))
  end
end

.get_thread_id(thread) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/logstash/util.rb', line 31

def self.get_thread_id(thread)
  if RUBY_ENGINE == "jruby"
    JRuby.reference(thread).native_thread.id
  else
    raise Exception.new("Native thread IDs aren't supported outside of JRuby")
  end
end

.hash_merge(dst, src) ⇒ Object

Merge hash ‘src’ into ‘dst’ nondestructively

Duplicate keys will become array values

src, dst[“foo”

]



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/logstash/util.rb', line 67

def self.hash_merge(dst, src)
  src.each do |name, svalue|
    if dst.include?(name)
      dvalue = dst[name]
      if dvalue.is_a?(Hash) && svalue.is_a?(Hash)
        dvalue = hash_merge(dvalue, svalue)
      elsif svalue.is_a?(Array)
        if dvalue.is_a?(Array)
          # merge arrays without duplicates.
          dvalue |= svalue
        else
          dvalue = [dvalue] | svalue
        end
      else
        if dvalue.is_a?(Array)
          dvalue << svalue unless dvalue.include?(svalue)
        else
          dvalue = [dvalue, svalue] unless dvalue == svalue
        end
      end

      dst[name] = dvalue
    else
      # dst doesn't have this key, just set it.
      dst[name] = svalue
    end
  end

  return dst
end

.hash_merge_many(*hashes) ⇒ Object

def self.hash_merge



135
136
137
138
139
140
141
# File 'lib/logstash/util.rb', line 135

def self.hash_merge_many(*hashes)
  dst = {}
  hashes.each do |hash|
    hash_merge_with_dups(dst, hash)
  end
  return dst
end

.hash_merge_with_dups(dst, src) ⇒ Object

Merge hash ‘src’ into ‘dst’ nondestructively

Duplicate keys will become array values Arrays merged will simply be appended.

src, dst[“foo”

]



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logstash/util.rb', line 104

def self.hash_merge_with_dups(dst, src)
  src.each do |name, svalue|
    if dst.include?(name)
      dvalue = dst[name]
      if dvalue.is_a?(Hash) && svalue.is_a?(Hash)
        dvalue = hash_merge(dvalue, svalue)
      elsif svalue.is_a?(Array)
        if dvalue.is_a?(Array)
          # merge arrays without duplicates.
          dvalue += svalue
        else
          dvalue = [dvalue] + svalue
        end
      else
        if dvalue.is_a?(Array)
          dvalue << svalue unless dvalue.include?(svalue)
        else
          dvalue = [dvalue, svalue] unless dvalue == svalue
        end
      end

      dst[name] = dvalue
    else
      # dst doesn't have this key, just set it.
      dst[name] = svalue
    end
  end

  return dst
end

.normalize(o) ⇒ Object

identity function, pure Ruby object don’t need normalization.



157
158
159
160
161
162
163
164
165
166
# File 'lib/logstash/util.rb', line 157

def self.normalize(o)
  case o
  when Java::JavaUtil::LinkedHashMap
    o.inject({}){|r, (k, v)| r[k] = normalize(v); r}
  when Java::JavaUtil::ArrayList
    o.map{|i| normalize(i)}
  else
    o
  end
end

.set_thread_name(name) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/logstash/util.rb', line 12

def self.set_thread_name(name)
  if RUBY_ENGINE == "jruby"
    # Keep java and ruby thread names in sync.
    Java::java.lang.Thread.currentThread.setName(name)
  end
  Thread.current[:name] = name

  if UNAME == "linux"
    require "logstash/util/prctl"
    # prctl PR_SET_NAME allows up to 16 bytes for a process name
    # since MRI 1.9, JRuby, and Rubinius use system threads for this.
    LibC.prctl(PR_SET_NAME, name[0..16], 0, 0, 0)
  end
end

.set_thread_plugin(plugin) ⇒ Object

def set_thread_name



27
28
29
# File 'lib/logstash/util.rb', line 27

def self.set_thread_plugin(plugin)
  Thread.current[:plugin] = plugin
end

.stringify_symbols(o) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/logstash/util.rb', line 174

def self.stringify_symbols(o)
  case o
  when Hash
    o.inject({}){|r, (k, v)| r[k.is_a?(Symbol) ? k.to_s : k] = stringify_symbols(v); r}
  when Array
    o.map{|i| stringify_symbols(i)}
  when Symbol
    o.to_s
  else
    o
  end
end

.thread_info(thread) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/logstash/util.rb', line 39

def self.thread_info(thread)
  backtrace = thread.backtrace.map do |line|
    line.gsub(LogStash::Environment::LOGSTASH_HOME, "[...]")
  end

  blocked_on = case backtrace.first
               when /in `push'/ then "blocked_on_push"
               when /(?:pipeline|base).*pop/ then "waiting_for_events"
               else nil
               end

  {
    "thread_id" => get_thread_id(thread),
    "name" => thread[:name],
    "plugin" => (thread[:plugin] ? thread[:plugin].debug_info : nil),
    "backtrace" => backtrace,
    "blocked_on" => blocked_on,
    "status" => thread.status,
    "current_call" => backtrace.first
  }
end