Module: Hope

Includes:
Java
Defined in:
lib/hope.rb,
lib/hope/engine.rb,
lib/hope/server.rb,
lib/hope/source.rb,
lib/hope/version.rb,
lib/hope/statement.rb,
lib/hope/event_type.rb,
lib/hope/server/app.rb,
lib/hope/source/sub.rb,
lib/hope/source/base.rb,
lib/hope/listener/base.rb,
lib/hope/source/twitter.rb,
lib/hope/server/resources/engine.rb,
lib/hope/server/resources/source.rb,
lib/hope/server/resources/statement.rb

Defined Under Namespace

Modules: Listener, Server, Source Classes: Engine, EventType, Statement

Constant Summary collapse

VERSION =
'0.1.0'.freeze

Class Method Summary collapse

Class Method Details

.configObject



41
42
43
# File 'lib/hope.rb', line 41

def self.config
  @config ||= {}
end

.configure(config_file, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/hope.rb', line 45

def self.configure config_file, &block
  @config = YAML::load_file(config_file)
  Hope::Source
  # Add Sources
  @config["sources"].each do |sn, src_opts|
    begin
      src_type = src_opts.delete("type")
      src_klass = src_type.constantize
      source = src_type.constantize.new(sn, src_opts)
    rescue => err
      puts "Error creating source #{src_type}: #{err}"
      raise err
    end
  end unless config["sources"].nil?
  
  if @config["reload_config"].to_i > 0
    EM::PeriodicTimer.new(@config["reload_config"].to_i) do
      new_config_hash = Digest::MD5.hexdigest(open(config_file).read)
      self.load_engines(YAML::load_file(config_file)['engines']) if new_config_hash != @config_hash
      @config_hash = new_config_hash
    end
  else
    self.load_engines @config['engines'] unless @config['engines'].nil?
  end

  yield if block_given?
end

.ctxObject



33
34
35
# File 'lib/hope.rb', line 33

def self.ctx
  @ctx ||= EM::ZeroMQ::Context.new(1)
end

.enginesObject



3
4
5
# File 'lib/hope/engine.rb', line 3

def self.engines
  @engines ||= {}
end

.load_engines(engines_config) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/hope.rb', line 73

def self.load_engines engines_config
  engines_config.each do |uri, econf|
    if e = Hope::Engine.get(uri)
      e.stop
      e.reset
    else
      # Create engine
      puts "Creating engine with config: #{@config['engines_cfg']}"
      e = Hope::Engine.new(uri, @config['engines_cfg'])
      # Subscribe to named sources
      econf['subscriptions'].each do |sub|
        e.subscribe(sub)
      end if econf['subscriptions']
    end


    # Add and configure Statements
    econf['statements'].each do |sn, st|
      s = e.add_epl st['epl'], sn
      st['listeners'].each do |listener_name, listener_opts|
        listener_klass = listener_opts.delete('class').constantize
        listener_args = listener_opts.delete('args')
        s.add_listener listener_klass.new("#{listener_name}-#{rand(1000000)}", *listener_args)
      end if st['listeners']
    end if econf['statements']
    e.start
  end unless engines_config.nil?
end

.pubObject



37
38
39
# File 'lib/hope.rb', line 37

def self.pub
  @pub ||= ctx.bind ZMQ::PUB, 'ipc://hope'
end

.register_engine(eng) ⇒ Object



7
8
9
# File 'lib/hope/engine.rb', line 7

def self.register_engine eng
  self.engines[eng.uri] = eng
end

.unregister_engine(eng) ⇒ Object



11
12
13
# File 'lib/hope/engine.rb', line 11

def self.unregister_engine eng
  self.engines.delete eng.uri
end