Class: Emque::Consuming::Runner

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/emque/consuming/runner.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Runner

Returns a new instance of Runner.



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/emque/consuming/runner.rb', line 20

def initialize(options = {})
  self.control = Emque::Consuming::Control.new
  self.options = options
  self.receivers = []
  self.status = Emque::Consuming::Status.new
  apply_options
  Emque::Consuming.application.initialize_logger
  self.class.instance = self
  self.pidfile = options.fetch(:pidfile, default_pidfile)
  self.pid = Emque::Consuming::Pidfile.new(pidfile)
end

Class Attribute Details

.instanceObject

Returns the value of attribute instance.



15
16
17
# File 'lib/emque/consuming/runner.rb', line 15

def instance
  @instance
end

Instance Attribute Details

#controlObject

Returns the value of attribute control.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def control
  @control
end

#pidfileObject

Returns the value of attribute pidfile.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def pidfile
  @pidfile
end

#statusObject

Returns the value of attribute status.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def status
  @status
end

Instance Method Details

#appObject



32
33
34
# File 'lib/emque/consuming/runner.rb', line 32

def app
  super
end

#consoleObject



36
37
38
39
# File 'lib/emque/consuming/runner.rb', line 36

def console
  require "pry"
  Pry.start
end

#http?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/emque/consuming/runner.rb', line 41

def http?
  config.status == :on
end

#phased_restartObject



45
46
47
# File 'lib/emque/consuming/runner.rb', line 45

def phased_restart
  receivers.each { |r| r.stop && r.start }
end

#restartObject



49
50
51
# File 'lib/emque/consuming/runner.rb', line 49

def restart
  stop && start
end

#restart_applicationObject



53
54
55
# File 'lib/emque/consuming/runner.rb', line 53

def restart_application
  receivers.first.restart
end

#sock?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/emque/consuming/runner.rb', line 57

def sock?
  true
end

#startObject



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/emque/consuming/runner.rb', line 61

def start
  exit_if_already_running!
  daemonize! if daemonize?
  write_pidfile!
  @persist = Thread.new { loop { sleep 1 } }
  set_process_title
  setup_receivers
  receivers.each(&:start)
  persist.join
rescue Interrupt
  stop
end

#stop(timeout: 5) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/emque/consuming/runner.rb', line 74

def stop(timeout: 5)
  if persist
    Thread.new do
      sleep timeout
      logger.error("Timeout Exceeded. Forcing Shutdown.")
      persist.exit if persist.alive?
    end
    receivers.each(&:stop)
    logger.info("Graceful shutdown successful.")
    logger.info("#{config.app_name.capitalize} stopped.")
    persist.exit if persist.alive?
  else
    Emque::Consuming::Transmitter.send(
      :command => :stop,
      :socket_path => config.socket_path
    )
  end
end