Class: DataCollector::Input::Rpc

Inherits:
Generic
  • Object
show all
Defined in:
lib/data_collector/input/rpc.rb

Instance Method Summary collapse

Methods inherited from Generic

#on_message, #paused?, #stopped?

Constructor Details

#initialize(uri, options = {}) ⇒ Rpc

Returns a new instance of Rpc.



6
7
8
# File 'lib/data_collector/input/rpc.rb', line 6

def initialize(uri, options = {})
  super
end

Instance Method Details

#pauseObject



27
28
29
30
31
# File 'lib/data_collector/input/rpc.rb', line 27

def pause
  raise "PAUSE not implemented."
rescue StandardError => e
  DataCollector::Core.error(e.message)
end

#run(should_block = false, &block) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/data_collector/input/rpc.rb', line 33

def run(should_block = false, &block)
    @listener.subscribe(@bunny_queue) do |payload|
      payload = JSON.parse(payload)
      response = BunnyBurrow::Server.create_response
      response[:data] = handle_on_message(@input, @output, payload)

      response
    end
    @running = true

    if should_block
      while running?
        yield block if block_given?
        @listener.wait
      end
    else
      yield block if block_given?
    end
rescue StandardError => e
  DataCollector::Core.error(e.message)
end

#running?Boolean

Returns:

  • (Boolean)


10
11
12
13
14
# File 'lib/data_collector/input/rpc.rb', line 10

def running?
  @running
rescue StandardError => e
  DataCollector::Core.error(e.message)
end

#stopObject



16
17
18
19
20
21
22
23
24
25
# File 'lib/data_collector/input/rpc.rb', line 16

def stop
  if running?
    @listener.shutdown
    @running = false
  end
rescue Bunny::ConnectionAlreadyClosed => e
  DataCollector::Core.log(e.message)
rescue StandardError => e
  DataCollector::Core.error(e.message)
end