Class: Hope::Engine
Instance Attribute Summary collapse
-
#provider ⇒ Object
readonly
Provider API.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
Instance Method Summary collapse
- #add_epl(epl, name = nil) ⇒ Object
- #add_event_type(event_type) ⇒ Object
- #add_pattern(pattern, name = nil) ⇒ Object
-
#admin ⇒ Object
Admin API.
- #destroy ⇒ Object
- #destroyed? ⇒ Boolean
-
#initialize(uri = nil, config_file = ) ⇒ Engine
constructor
A new instance of Engine.
- #on_readable(socket, messages) ⇒ Object
- #register_source(src_name) ⇒ Object
- #reset ⇒ Object
-
#runtime ⇒ Object
Runtime API.
- #sendEvent(e, type = nil) ⇒ Object
-
#serializable_hash ⇒ Object
Misc.
- #start ⇒ Object
- #statement(stmt_name) ⇒ Object
- #statement_names ⇒ Object
- #statements ⇒ Object
- #stop ⇒ Object
- #subscribe(source_name) ⇒ Object
- #to_json ⇒ Object
- #unsubscribe(source_name) ⇒ Object
Constructor Details
#initialize(uri = nil, config_file = ) ⇒ Engine
Returns a new instance of Engine.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/hope/engine.rb', line 30 def initialize uri=nil, config_file=Hope.config['engines_cfg'] puts "Init engine #{uri} with config: #{config_file}" @uri = uri || "default" Hope.register_engine(self) @configuration = Configuration.new if config_file if File.exists?(config_file) @configuration.configure(config_file) else puts "I cant find this config file: #{config_file}" end end provider @sub = Hope.ctx.connect ZMQ::SUB, "ipc://hope", self if EM.reactor_running? @received = 0 @subscriptions = [] @registered_sources = {} @registered_types = {} end |
Instance Attribute Details
#provider ⇒ Object (readonly)
Provider API
92 93 94 |
# File 'lib/hope/engine.rb', line 92 def provider @provider end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
20 21 22 |
# File 'lib/hope/engine.rb', line 20 def uri @uri end |
Class Method Details
.all ⇒ Object
26 27 28 |
# File 'lib/hope/engine.rb', line 26 def self.all EPServiceProviderManager.getProviderURIs.to_a end |
Instance Method Details
#add_epl(epl, name = nil) ⇒ Object
137 138 139 140 |
# File 'lib/hope/engine.rb', line 137 def add_epl epl, name=nil name = nil if name.blank? Hope::Statement.new admin.createEPL(epl, name) end |
#add_event_type(event_type) ⇒ Object
147 148 149 150 151 |
# File 'lib/hope/engine.rb', line 147 def add_event_type event_type return if @registered_types[event_type.name] @registered_types[event_type.name] = event_type.schema add_epl(event_type.schema) end |
#add_pattern(pattern, name = nil) ⇒ Object
142 143 144 145 |
# File 'lib/hope/engine.rb', line 142 def add_pattern pattern, name=nil name = nil if name.blank? Hope::Statement.new admin.createPattern(pattern, name) end |
#admin ⇒ Object
Admin API
120 121 122 |
# File 'lib/hope/engine.rb', line 120 def admin provider.getEPAdministrator end |
#destroy ⇒ Object
100 101 102 103 104 105 |
# File 'lib/hope/engine.rb', line 100 def destroy unless provider.destroyed? provider.destroy Hope.unregister_engine(self) end end |
#destroyed? ⇒ Boolean
107 108 109 |
# File 'lib/hope/engine.rb', line 107 def destroyed? provider.destroyed? end |
#on_readable(socket, messages) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/hope/engine.rb', line 51 def on_readable(socket, ) puts "Received event from #{socket}" @received += 1 src_name, msg = .map(&:copy_out_string) if src = self.register_source(src_name) evts = src.parse(msg) Array(evts).map do |e| puts ">> sendEvent: #{e.inspect}" self.sendEvent(e) end else puts "Error: SOURCE #{src_name}, not registered !" end end |
#register_source(src_name) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/hope/engine.rb', line 67 def register_source src_name return @registered_sources[src_name] if @registered_sources[src_name] src = Hope::Source.sources[src_name] return false if src.nil? src.class.event_types.each do |event_type| # puts "Adding eventType to engine #{self.uri}: #{event_type.name}:\n #{event_type.properties.inspect}" self.add_event_type(event_type) end @registered_sources[src] = src src end |
#reset ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/hope/engine.rb', line 111 def reset # @subscriptions.each { |sub| self.unsubscribe(sub) } statements.map do |st| st.remove_all_listeners st.destroy end end |
#runtime ⇒ Object
Runtime API
162 163 164 |
# File 'lib/hope/engine.rb', line 162 def runtime provider.getEPRuntime end |
#sendEvent(e, type = nil) ⇒ Object
166 167 168 169 170 171 172 |
# File 'lib/hope/engine.rb', line 166 def sendEvent(e, type=nil) if type runtime.sendEvent(e, type) else runtime.sendEvent(e) end end |
#serializable_hash ⇒ Object
Misc
175 176 177 178 179 180 181 182 |
# File 'lib/hope/engine.rb', line 175 def serializable_hash { :id => uri, :received => @received, :statements => statements.map(&:serializable_hash), :subscriptions => @subscriptions } end |
#start ⇒ Object
157 158 159 |
# File 'lib/hope/engine.rb', line 157 def start admin.startAllStatements end |
#statement(stmt_name) ⇒ Object
132 133 134 135 |
# File 'lib/hope/engine.rb', line 132 def statement stmt_name s = admin.getStatement stmt_name Statement.new(s) unless s.nil? end |
#statement_names ⇒ Object
124 125 126 |
# File 'lib/hope/engine.rb', line 124 def statement_names admin.getStatementNames.to_a end |
#statements ⇒ Object
128 129 130 |
# File 'lib/hope/engine.rb', line 128 def statements statement_names.map { |n| Hope::Statement.new(admin.getStatement(n)) } end |
#stop ⇒ Object
153 154 155 |
# File 'lib/hope/engine.rb', line 153 def stop admin.stopAllStatements end |
#subscribe(source_name) ⇒ Object
79 80 81 82 83 84 |
# File 'lib/hope/engine.rb', line 79 def subscribe source_name return true if @subscriptions.include?(source_name) @sub.subscribe source_name @subscriptions << source_name register_source(source_name) end |
#to_json ⇒ Object
184 185 186 |
# File 'lib/hope/engine.rb', line 184 def to_json serializable_hash.to_json end |
#unsubscribe(source_name) ⇒ Object
86 87 88 89 |
# File 'lib/hope/engine.rb', line 86 def unsubscribe source_name @subscriptions - [source_name] @sub.unsubscribe source_name end |