Class: Norikra::Engine
- Inherits:
-
Object
- Object
- Norikra::Engine
- Defined in:
- lib/norikra/engine.rb
Instance Attribute Summary collapse
-
#output_pool ⇒ Object
readonly
Returns the value of attribute output_pool.
-
#queries ⇒ Object
readonly
Returns the value of attribute queries.
-
#targets ⇒ Object
readonly
Returns the value of attribute targets.
-
#typedef_manager ⇒ Object
readonly
Returns the value of attribute typedef_manager.
Instance Method Summary collapse
- #camelize(sym) ⇒ Object
- #close(target_name) ⇒ Object
- #configuration(opts) ⇒ Object
- #deregister(query_name) ⇒ Object
-
#initialize(output_pool, typedef_manager, opts = {}) ⇒ Engine
constructor
A new instance of Engine.
- #load(plugin_klass) ⇒ Object
- #memory_statistics ⇒ Object
- #modify(target_name, auto_field) ⇒ Object
- #open(target_name, fields = nil, auto_field = true) ⇒ Object
- #register(query) ⇒ Object
- #reserve(target_name, field, type) ⇒ Object
- #send(target_name, events) ⇒ Object
- #start ⇒ Object
- #statistics ⇒ Object
- #stop ⇒ Object
- #uptime ⇒ Object
Constructor Details
#initialize(output_pool, typedef_manager, opts = {}) ⇒ Engine
Returns a new instance of Engine.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/norikra/engine.rb', line 21 def initialize(output_pool, typedef_manager, opts={}) @statistics = { started: Time.now, events: { input: 0, processed: 0, output: 0, }, } @output_pool = output_pool @typedef_manager = typedef_manager conf = configuration(opts) @service = com.espertech.esper.client.EPServiceProviderManager.getDefaultProvider(conf) @config = @service.getEPAdministrator.getConfiguration @mutex = Mutex.new # fieldsets already registered into @runtime @registered_fieldsets = {} # {target => {fieldset_summary => Fieldset} @targets = [] @queries = [] @waiting_queries = [] end |
Instance Attribute Details
#output_pool ⇒ Object (readonly)
Returns the value of attribute output_pool.
19 20 21 |
# File 'lib/norikra/engine.rb', line 19 def output_pool @output_pool end |
#queries ⇒ Object (readonly)
Returns the value of attribute queries.
19 20 21 |
# File 'lib/norikra/engine.rb', line 19 def queries @queries end |
#targets ⇒ Object (readonly)
Returns the value of attribute targets.
19 20 21 |
# File 'lib/norikra/engine.rb', line 19 def targets @targets end |
#typedef_manager ⇒ Object (readonly)
Returns the value of attribute typedef_manager.
19 20 21 |
# File 'lib/norikra/engine.rb', line 19 def typedef_manager @typedef_manager end |
Instance Method Details
#camelize(sym) ⇒ Object
92 93 94 |
# File 'lib/norikra/engine.rb', line 92 def camelize(sym) sym.to_s.split(/_/).map(&:capitalize).join end |
#close(target_name) ⇒ Object
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/norikra/engine.rb', line 144 def close(target_name) info "closing target", :target => target_name targets = @targets.select{|t| t.name == target_name} return false if targets.size != 1 target = targets.first @queries.select{|q| q.targets.include?(target.name)}.each do |query| deregister_query(query) end close_target(target) end |
#configuration(opts) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/norikra/engine.rb', line 96 def configuration(opts) conf = com.espertech.esper.client.Configuration.new defaults = conf.getEngineDefaults if opts[:thread] t = opts[:thread] threading = defaults.getThreading [:inbound, :outbound, :route_exec, :timer_exec].each do |sym| next unless t[sym] && t[sym][:threads] && t[sym][:threads] > 0 threads = t[sym][:threads].to_i capacity = t[sym][:capacity].to_i info "Engine #{sym} thread pool enabling", :threads => threads, :capacity => (capacity == 0 ? 'default' : capacity) cam = camelize(sym) threading.send("setThreadPool#{cam}".to_sym, true) threading.send("setThreadPool#{cam}NumThreads".to_sym, threads) if t[sym][:capacity] && t[sym][:capacity] > 0 threading.send("setThreadPool#{cam}Capacity".to_sym, capacity) end end end conf end |
#deregister(query_name) ⇒ Object
180 181 182 183 184 185 186 |
# File 'lib/norikra/engine.rb', line 180 def deregister(query_name) info "de-registering query", :name => query_name queries = @queries.select{|q| q.name == query_name } return nil unless queries.size == 1 # just ignore for 'not found' deregister_query(queries.first) end |
#load(plugin_klass) ⇒ Object
240 241 242 |
# File 'lib/norikra/engine.rb', line 240 def load(plugin_klass) load_udf(plugin_klass) end |
#memory_statistics ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/norikra/engine.rb', line 68 def memory_statistics mb = 1024 * 1024 memoryBean = Java::JavaLangManagement::ManagementFactory.getMemoryMXBean() usage = memoryBean.getHeapMemoryUsage() total = usage.getMax() / mb committed = usage.getCommitted() / mb committed_percent = (committed.to_f / total * 1000).floor / 10.0 used = usage.getUsed() / mb used_percent = (used.to_f / total * 1000).floor / 10.0 heap = { max: total, committed: committed, committed_percent: committed_percent, used: used, used_percent: used_percent } usage = memoryBean.getNonHeapMemoryUsage() total = usage.getMax() / mb committed = usage.getCommitted() / mb committed_percent = (committed.to_f / total * 1000).floor / 10.0 used = usage.getUsed() / mb used_percent = (used.to_f / total * 1000).floor / 10.0 non_heap = { max: total, committed: committed, committed_percent: committed_percent, used: used, used_percent: used_percent } { heap: heap, nonheap: non_heap } end |
#modify(target_name, auto_field) ⇒ Object
155 156 157 158 159 160 161 162 163 |
# File 'lib/norikra/engine.rb', line 155 def modify(target_name, auto_field) info "modify target", :target => target_name, :auto_field => auto_field targets = @targets.select{|t| t.name == target_name} if targets.size != 1 raise Norikra::ArgumentError, "target name '#{target_name}' not found" end target = targets.first target.auto_field = auto_field end |
#open(target_name, fields = nil, auto_field = true) ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/norikra/engine.rb', line 134 def open(target_name, fields=nil, auto_field=true) # fields nil || [] => lazy # fields {'fieldname' => 'type'} : type 'string', 'boolean', 'int', 'long', 'float', 'double' info "opening target", :target => target_name, :fields => fields, :auto_field => auto_field raise Norikra::ArgumentError, "invalid target name" unless Norikra::Target.valid?(target_name) target = Norikra::Target.new(target_name, fields, auto_field) return false if @targets.include?(target) open_target(target) end |
#register(query) ⇒ Object
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/norikra/engine.rb', line 169 def register(query) info "registering query", :name => query.name, :targets => query.targets, :expression => query.expression raise Norikra::ClientError, "query name '#{query.name}' already exists" if @queries.select{|q| q.name == query.name }.size > 0 raise Norikra::ClientError, "query '#{query.name}' is invalid query for Norikra" if query.invalid? query.targets.each do |target_name| open(target_name) unless @targets.any?{|t| t.name == target_name} end register_query(query) end |
#reserve(target_name, field, type) ⇒ Object
165 166 167 |
# File 'lib/norikra/engine.rb', line 165 def reserve(target_name, field, type) @typedef_manager.reserve(target_name, field, type) end |
#send(target_name, events) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/norikra/engine.rb', line 188 def send(target_name, events) trace "send messages", :target => target_name, :events => events @statistics[:events][:input] += events.size unless @targets.any?{|t| t.name == target_name} # discard events for target not registered trace "messages skipped for non-opened target", :target => target_name return end return if events.size < 1 target = @targets.select{|t| t.name == target_name}.first if @typedef_manager.lazy?(target.name) info "opening lazy target", :target => target debug "generating base fieldset from event", :target => target.name, :event => events.first base_fieldset = @typedef_manager.generate_base_fieldset(target.name, events.first) debug "registering base fieldset", :target => target.name, :base => base_fieldset register_base_fieldset(target.name, base_fieldset) info "target successfully opened with fieldset", :target => target, :base => base_fieldset end registered_data_fieldset = @registered_fieldsets[target_name][:data] strict_refer = (not target.auto_field?) events.each do |event| fieldset = @typedef_manager.refer(target_name, event, strict_refer) unless registered_data_fieldset[fieldset.summary] # register waiting queries including this fieldset, and this fieldset itself debug "registering unknown fieldset", :target => target_name, :fieldset => fieldset register_fieldset(target_name, fieldset) debug "successfully registered" # fieldset should be refined, when waiting_queries rewrite inheritance structure and data fieldset be renewed. fieldset = @typedef_manager.refer(target_name, event, strict_refer) end trace "calling sendEvent with bound fieldset (w/ valid event_type_name)", :target => target_name, :event => event trace "This is assert for valid event_type_name", :event_type_name => fieldset.event_type_name formed = fieldset.format(event) trace "sendEvent", :data => formed @runtime.sendEvent(formed.to_java, fieldset.event_type_name) end target.update! @statistics[:events][:processed] += events.size nil end |
#start ⇒ Object
122 123 124 125 126 |
# File 'lib/norikra/engine.rb', line 122 def start debug "norikra engine starting: creating esper runtime" @runtime = @service.getEPRuntime debug "norikra engine started" end |
#statistics ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/norikra/engine.rb', line 45 def statistics s = @statistics { started: s[:started].rfc2822, uptime: self.uptime, memory: self.memory_statistics, input_events: s[:events][:input], processed_events: s[:events][:processed], output_events: s[:events][:output], queries: @queries.size, targets: @targets.size, } end |
#stop ⇒ Object
128 129 130 131 132 |
# File 'lib/norikra/engine.rb', line 128 def stop debug "stopping norikra engine: stop all statements on esper" @service.getEPAdministrator.stopAllStatements debug "norikra engine stopped" end |
#uptime ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/norikra/engine.rb', line 59 def uptime # up 239 days, 20:40 seconds = (Time.now - @statistics[:started]).to_i days = seconds / (24*60*60) hours = (seconds - days * (24*60*60)) / (60*60) minutes = (seconds - days * (24*60*60) - hours * (60*60)) / 60 "#{days} days, #{sprintf("%02d", hours)}:#{sprintf("%02d", minutes)}" end |