Class: QueueingRabbit::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/queueing_rabbit/worker.rb

Defined Under Namespace

Classes: WorkerError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*jobs) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/queueing_rabbit/worker.rb', line 12

def initialize(*jobs)
  self.jobs = jobs.map { |job| job.to_s.strip }

  @messages_lock = Monitor.new
  @messages = {}
  @channels = []

  sync_stdio
  validate_jobs
  constantize_jobs
end

Instance Attribute Details

#jobsObject

Returns the value of attribute jobs.



10
11
12
# File 'lib/queueing_rabbit/worker.rb', line 10

def jobs
  @jobs
end

Instance Method Details

#checked_messages_countObject



24
25
26
27
28
# File 'lib/queueing_rabbit/worker.rb', line 24

def checked_messages_count
  @messages_lock.synchronize do
    @messages.count
  end
end

#checkin_message(delivery_tag) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/queueing_rabbit/worker.rb', line 30

def checkin_message(delivery_tag)
  return unless @working

  @messages_lock.synchronize do
    @messages[delivery_tag] = true
  end
end

#checkout_message(delivery_tag) ⇒ Object



38
39
40
41
42
# File 'lib/queueing_rabbit/worker.rb', line 38

def checkout_message(delivery_tag)
  @messages_lock.synchronize do
    @messages.delete(delivery_tag)
  end
end

#invoke_job(job, payload, metadata) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/queueing_rabbit/worker.rb', line 109

def invoke_job(job, payload, )
  info "performing job #{job}"
  
  if job.respond_to?(:perform)
    job.perform(payload, )
  elsif job <= QueueingRabbit::AbstractJob
    job.new(payload, ).perform
  else
    error "don't know how to perform job #{job}"
  end
rescue => e
  QueueingRabbit.trigger_event(:consumer_error, e)
  error "unexpected error #{e.class} occured: #{e.message}"
  debug e
end

#pidObject



89
90
91
# File 'lib/queueing_rabbit/worker.rb', line 89

def pid
  Process.pid
end

#pidfile_exists?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/queueing_rabbit/worker.rb', line 85

def pidfile_exists?
  @pidfile && File.exists?(@pidfile)
end

#read_pidfileObject



81
82
83
# File 'lib/queueing_rabbit/worker.rb', line 81

def read_pidfile
  File.read(@pidfile).to_i if pidfile_exists?
end

#remove_pidfileObject



77
78
79
# File 'lib/queueing_rabbit/worker.rb', line 77

def remove_pidfile
  File.delete(@pidfile) if pidfile_exists?
end

#stop(connection = QueueingRabbit.connection) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/queueing_rabbit/worker.rb', line 97

def stop(connection = QueueingRabbit.connection)
  connection.next_tick do
    @working = false
    close_channels do
      connection.close do
        info "gracefully shutting down the worker #{self}"
        remove_pidfile
      end
    end
  end
end

#to_sObject



93
94
95
# File 'lib/queueing_rabbit/worker.rb', line 93

def to_s
  "PID=#{pid}, JOBS=#{jobs.join(',')}"
end

#use_pidfile(filename) ⇒ Object



71
72
73
74
75
# File 'lib/queueing_rabbit/worker.rb', line 71

def use_pidfile(filename)
  @pidfile = filename
  cleanup_pidfile
  File.open(@pidfile, 'w') { |f| f << pid }
end

#workObject



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/queueing_rabbit/worker.rb', line 48

def work
  return if working?

  @working = true
  @channels = []

  QueueingRabbit.trigger_event(:worker_ready)

  jobs.each { |job| run_job(QueueingRabbit.connection, job) }

  QueueingRabbit.trigger_event(:consuming_started)
end

#work!Object



61
62
63
64
65
66
67
68
69
# File 'lib/queueing_rabbit/worker.rb', line 61

def work!
  return if working?

  trap_signals

  info "starting a new queueing_rabbit worker #{self}"

  QueueingRabbit.begin_worker_loop { work }
end

#working?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/queueing_rabbit/worker.rb', line 44

def working?
  @working
end