Class: Norikra::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/norikra/engine.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(output_pool, typedef_manager, opts = {}) ⇒ Engine

Returns a new instance of Engine.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/norikra/engine.rb', line 21

def initialize(output_pool, typedef_manager, opts={})
  @statistics = {
    started: Time.now,
    events: { input: 0, processed: 0, output: 0, },
  }

  @output_pool = output_pool
  @typedef_manager = typedef_manager

  conf = configuration(opts)
  @service = com.espertech.esper.client.EPServiceProviderManager.getDefaultProvider(conf)
  @config = @service.getEPAdministrator.getConfiguration

  @mutex = Mutex.new

  # fieldsets already registered into @runtime
  @registered_fieldsets = {} # {target => {fieldset_summary => Fieldset}

  @targets = []
  @queries = []

  @waiting_queries = []
end

Instance Attribute Details

#output_poolObject (readonly)

Returns the value of attribute output_pool.



19
20
21
# File 'lib/norikra/engine.rb', line 19

def output_pool
  @output_pool
end

#queriesObject (readonly)

Returns the value of attribute queries.



19
20
21
# File 'lib/norikra/engine.rb', line 19

def queries
  @queries
end

#targetsObject (readonly)

Returns the value of attribute targets.



19
20
21
# File 'lib/norikra/engine.rb', line 19

def targets
  @targets
end

#typedef_managerObject (readonly)

Returns the value of attribute typedef_manager.



19
20
21
# File 'lib/norikra/engine.rb', line 19

def typedef_manager
  @typedef_manager
end

Instance Method Details

#camelize(sym) ⇒ Object



92
93
94
# File 'lib/norikra/engine.rb', line 92

def camelize(sym)
  sym.to_s.split(/_/).map(&:capitalize).join
end

#close(target_name) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/norikra/engine.rb', line 144

def close(target_name)
  info "closing target", :target => target_name
  targets = @targets.select{|t| t.name == target_name}
  return false if targets.size != 1
  target = targets.first
  @queries.select{|q| q.targets.include?(target.name)}.each do |query|
    deregister_query(query)
  end
  close_target(target)
end

#configuration(opts) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/norikra/engine.rb', line 96

def configuration(opts)
  conf = com.espertech.esper.client.Configuration.new
  defaults = conf.getEngineDefaults

  if opts[:thread]
    t = opts[:thread]
    threading = defaults.getThreading
    [:inbound, :outbound, :route_exec, :timer_exec].each do |sym|
      next unless t[sym] && t[sym][:threads] && t[sym][:threads] > 0

      threads = t[sym][:threads].to_i
      capacity = t[sym][:capacity].to_i
      info "Engine #{sym} thread pool enabling", :threads => threads, :capacity => (capacity == 0 ? 'default' : capacity)

      cam = camelize(sym)
      threading.send("setThreadPool#{cam}".to_sym, true)
      threading.send("setThreadPool#{cam}NumThreads".to_sym, threads)
      if t[sym][:capacity] && t[sym][:capacity] > 0
        threading.send("setThreadPool#{cam}Capacity".to_sym, capacity)
      end
    end
  end

  conf
end

#deregister(query_name) ⇒ Object



180
181
182
183
184
185
186
# File 'lib/norikra/engine.rb', line 180

def deregister(query_name)
  info "de-registering query", :name => query_name
  queries = @queries.select{|q| q.name == query_name }
  return nil unless queries.size == 1 # just ignore for 'not found'

  deregister_query(queries.first)
end

#load(plugin_klass) ⇒ Object



240
241
242
# File 'lib/norikra/engine.rb', line 240

def load(plugin_klass)
  load_udf(plugin_klass)
end

#memory_statisticsObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/norikra/engine.rb', line 68

def memory_statistics
  mb = 1024 * 1024

  memoryBean = Java::JavaLangManagement::ManagementFactory.getMemoryMXBean()

  usage = memoryBean.getHeapMemoryUsage()
  total = usage.getMax() / mb
  committed = usage.getCommitted() / mb
  committed_percent = (committed.to_f / total * 1000).floor / 10.0
  used = usage.getUsed() / mb
  used_percent = (used.to_f / total * 1000).floor / 10.0
  heap = { max: total, committed: committed, committed_percent: committed_percent, used: used, used_percent: used_percent }

  usage = memoryBean.getNonHeapMemoryUsage()
  total = usage.getMax() / mb
  committed = usage.getCommitted() / mb
  committed_percent = (committed.to_f / total * 1000).floor / 10.0
  used = usage.getUsed() / mb
  used_percent = (used.to_f / total * 1000).floor / 10.0
  non_heap = { max: total, committed: committed, committed_percent: committed_percent, used: used, used_percent: used_percent }

  { heap: heap, nonheap: non_heap }
end

#modify(target_name, auto_field) ⇒ Object



155
156
157
158
159
160
161
162
163
# File 'lib/norikra/engine.rb', line 155

def modify(target_name, auto_field)
  info "modify target", :target => target_name, :auto_field => auto_field
  targets = @targets.select{|t| t.name == target_name}
  if targets.size != 1
    raise Norikra::ArgumentError, "target name '#{target_name}' not found"
  end
  target = targets.first
  target.auto_field = auto_field
end

#open(target_name, fields = nil, auto_field = true) ⇒ Object



134
135
136
137
138
139
140
141
142
# File 'lib/norikra/engine.rb', line 134

def open(target_name, fields=nil, auto_field=true)
  # fields nil || [] => lazy
  # fields {'fieldname' => 'type'} : type 'string', 'boolean', 'int', 'long', 'float', 'double'
  info "opening target", :target => target_name, :fields => fields, :auto_field => auto_field
  raise Norikra::ArgumentError, "invalid target name" unless Norikra::Target.valid?(target_name)
  target = Norikra::Target.new(target_name, fields, auto_field)
  return false if @targets.include?(target)
  open_target(target)
end

#register(query) ⇒ Object



169
170
171
172
173
174
175
176
177
178
# File 'lib/norikra/engine.rb', line 169

def register(query)
  info "registering query", :name => query.name, :targets => query.targets, :expression => query.expression
  raise Norikra::ClientError, "query name '#{query.name}' already exists" if @queries.select{|q| q.name == query.name }.size > 0
  raise Norikra::ClientError, "query '#{query.name}' is invalid query for Norikra" if query.invalid?

  query.targets.each do |target_name|
    open(target_name) unless @targets.any?{|t| t.name == target_name}
  end
  register_query(query)
end

#reserve(target_name, field, type) ⇒ Object



165
166
167
# File 'lib/norikra/engine.rb', line 165

def reserve(target_name, field, type)
  @typedef_manager.reserve(target_name, field, type)
end

#send(target_name, events) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/norikra/engine.rb', line 188

def send(target_name, events)
  trace "send messages", :target => target_name, :events => events

  @statistics[:events][:input] += events.size

  unless @targets.any?{|t| t.name == target_name} # discard events for target not registered
    trace "messages skipped for non-opened target", :target => target_name
    return
  end
  return if events.size < 1

  target = @targets.select{|t| t.name == target_name}.first

  if @typedef_manager.lazy?(target.name)
    info "opening lazy target", :target => target
    debug "generating base fieldset from event", :target => target.name, :event => events.first
    base_fieldset = @typedef_manager.generate_base_fieldset(target.name, events.first)

    debug "registering base fieldset", :target => target.name, :base => base_fieldset
    register_base_fieldset(target.name, base_fieldset)

    info "target successfully opened with fieldset", :target => target, :base => base_fieldset
  end

  registered_data_fieldset = @registered_fieldsets[target_name][:data]

  strict_refer = (not target.auto_field?)

  events.each do |event|
    fieldset = @typedef_manager.refer(target_name, event, strict_refer)

    unless registered_data_fieldset[fieldset.summary]
      # register waiting queries including this fieldset, and this fieldset itself
      debug "registering unknown fieldset", :target => target_name, :fieldset => fieldset
      register_fieldset(target_name, fieldset)
      debug "successfully registered"

      # fieldset should be refined, when waiting_queries rewrite inheritance structure and data fieldset be renewed.
      fieldset = @typedef_manager.refer(target_name, event, strict_refer)
    end

    trace "calling sendEvent with bound fieldset (w/ valid event_type_name)", :target => target_name, :event => event
    trace "This is assert for valid event_type_name", :event_type_name => fieldset.event_type_name
    formed = fieldset.format(event)
    trace "sendEvent", :data => formed
    @runtime.sendEvent(formed.to_java, fieldset.event_type_name)
  end
  target.update!
  @statistics[:events][:processed] += events.size
  nil
end

#startObject



122
123
124
125
126
# File 'lib/norikra/engine.rb', line 122

def start
  debug "norikra engine starting: creating esper runtime"
  @runtime = @service.getEPRuntime
  debug "norikra engine started"
end

#statisticsObject



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/norikra/engine.rb', line 45

def statistics
  s = @statistics
  {
    started: s[:started].rfc2822,
    uptime: self.uptime,
    memory: self.memory_statistics,
    input_events: s[:events][:input],
    processed_events: s[:events][:processed],
    output_events: s[:events][:output],
    queries: @queries.size,
    targets: @targets.size,
  }
end

#stopObject



128
129
130
131
132
# File 'lib/norikra/engine.rb', line 128

def stop
  debug "stopping norikra engine: stop all statements on esper"
  @service.getEPAdministrator.stopAllStatements
  debug "norikra engine stopped"
end

#uptimeObject



59
60
61
62
63
64
65
66
# File 'lib/norikra/engine.rb', line 59

def uptime
  # up 239 days, 20:40
  seconds = (Time.now - @statistics[:started]).to_i
  days = seconds / (24*60*60)
  hours = (seconds - days * (24*60*60)) / (60*60)
  minutes = (seconds - days * (24*60*60) - hours * (60*60)) / 60
  "#{days} days, #{sprintf("%02d", hours)}:#{sprintf("%02d", minutes)}"
end