Class: EventHub::Processor2

Inherits:
Object
  • Object
show all
Includes:
Helper
Defined in:
lib/eventhub/processor2.rb

Overview

Processor2 class

Constant Summary collapse

SIGNALS_FOR_TERMINATION =
[:INT, :TERM, :QUIT]
SIGNALS_FOR_RELOAD_CONFIG =
[:HUP]
ALL_SIGNALS =
SIGNALS_FOR_TERMINATION + SIGNALS_FOR_RELOAD_CONFIG

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper

#bunny_connection_properties, #get_name_from_class, #now_stamp

Constructor Details

#initialize(args = {}) ⇒ Processor2

Returns a new instance of Processor2.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/eventhub/processor2.rb', line 13

def initialize(args = {})
  # Set processor name
  EventHub::Configuration.name = args[:name] ||
                                 get_name_from_class(self)

  # Parse comand line options
  EventHub::Configuration.parse_options

  # Load configuration file
  EventHub::Configuration.load!(args)

  @command_queue = []

  @started_at = Time.now
  @statistics = EventHub::Statistics.new
end

Instance Attribute Details

#started_atObject (readonly)

Returns the value of attribute started_at.



11
12
13
# File 'lib/eventhub/processor2.rb', line 11

def started_at
  @started_at
end

#statisticsObject (readonly)

Returns the value of attribute statistics.



11
12
13
# File 'lib/eventhub/processor2.rb', line 11

def statistics
  @statistics
end

Instance Method Details

#after_stopObject



67
68
69
# File 'lib/eventhub/processor2.rb', line 67

def after_stop
  # can be implemented in derived class
end

#before_startObject



63
64
65
# File 'lib/eventhub/processor2.rb', line 63

def before_start
  # can be implemented in derived class
end

#handle_message(_message, _args = {}) ⇒ Object

get message as EventHub::Message class instance args contain :queue_name, :content_type, :priority, :delivery_tag



53
54
55
# File 'lib/eventhub/processor2.rb', line 53

def handle_message(_message, _args = {})
  raise 'need to be implemented in derived class'
end

#publish(args = {}) ⇒ Object

pass message: ‘{ “header”: … , “body”: { .. }}’ and optionally exchange_name: ‘your exchange name’



59
60
61
# File 'lib/eventhub/processor2.rb', line 59

def publish(args = {})
  Celluloid::Actor[:actor_listener].publish(args)
end

#startObject



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/eventhub/processor2.rb', line 30

def start
  EventHub.logger.info("#{Configuration.name} (#{version}): has been started")

  before_start
  main_event_loop
  after_stop

  EventHub.logger.info("#{Configuration.name} (#{version}): has been stopped")
rescue => ex
  EventHub.logger.error("Unexpected error in Processor2.start: #{ex}")
end

#stopObject



42
43
44
45
# File 'lib/eventhub/processor2.rb', line 42

def stop
  # used by rspec
  @command_queue << :TERM
end

#versionObject



47
48
49
# File 'lib/eventhub/processor2.rb', line 47

def version
  EventHub::VERSION
end