Module: Hope
- Includes:
- Java
- Defined in:
- lib/hope.rb,
lib/hope/engine.rb,
lib/hope/server.rb,
lib/hope/source.rb,
lib/hope/version.rb,
lib/hope/statement.rb,
lib/hope/event_type.rb,
lib/hope/server/app.rb,
lib/hope/source/sub.rb,
lib/hope/source/base.rb,
lib/hope/listener/base.rb,
lib/hope/source/twitter.rb,
lib/hope/server/resources/engine.rb,
lib/hope/server/resources/source.rb,
lib/hope/server/resources/statement.rb
Defined Under Namespace
Modules: Listener, Server, Source
Classes: Engine, EventType, Statement
Constant Summary
collapse
- VERSION =
'0.1.0'.freeze
Class Method Summary
collapse
Class Method Details
41
42
43
|
# File 'lib/hope.rb', line 41
def self.config
@config ||= {}
end
|
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/hope.rb', line 45
def self.configure config_file, &block
@config = YAML::load_file(config_file)
Hope::Source
@config["sources"].each do |sn, src_opts|
begin
src_type = src_opts.delete("type")
src_klass = src_type.constantize
source = src_type.constantize.new(sn, src_opts)
rescue => err
puts "Error creating source #{src_type}: #{err}"
raise err
end
end unless config["sources"].nil?
if @config["reload_config"].to_i > 0
EM::PeriodicTimer.new(@config["reload_config"].to_i) do
new_config_hash = Digest::MD5.hexdigest(open(config_file).read)
self.load_engines(YAML::load_file(config_file)['engines']) if new_config_hash != @config_hash
@config_hash = new_config_hash
end
else
self.load_engines @config['engines'] unless @config['engines'].nil?
end
yield if block_given?
end
|
33
34
35
|
# File 'lib/hope.rb', line 33
def self.ctx
@ctx ||= EM::ZeroMQ::Context.new(1)
end
|
3
4
5
|
# File 'lib/hope/engine.rb', line 3
def self.engines
@engines ||= {}
end
|
.load_engines(engines_config) ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/hope.rb', line 73
def self.load_engines engines_config
engines_config.each do |uri, econf|
if e = Hope::Engine.get(uri)
e.stop
e.reset
else
puts "Creating engine with config: #{@config['engines_cfg']}"
e = Hope::Engine.new(uri, @config['engines_cfg'])
econf['subscriptions'].each do |sub|
e.subscribe(sub)
end if econf['subscriptions']
end
econf['statements'].each do |sn, st|
s = e.add_epl st['epl'], sn
st['listeners'].each do |listener_name, listener_opts|
listener_klass = listener_opts.delete('class').constantize
listener_args = listener_opts.delete('args')
s.add_listener listener_klass.new("#{listener_name}-#{rand(1000000)}", *listener_args)
end if st['listeners']
end if econf['statements']
e.start
end unless engines_config.nil?
end
|
37
38
39
|
# File 'lib/hope.rb', line 37
def self.pub
@pub ||= ctx.bind ZMQ::PUB, 'ipc://hope'
end
|
.register_engine(eng) ⇒ Object
7
8
9
|
# File 'lib/hope/engine.rb', line 7
def self.register_engine eng
self.engines[eng.uri] = eng
end
|
.unregister_engine(eng) ⇒ Object
11
12
13
|
# File 'lib/hope/engine.rb', line 11
def self.unregister_engine eng
self.engines.delete eng.uri
end
|