Class: Faktory::Processor

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/faktory/processor.rb

Overview

The Processor is a standalone thread which:

  1. fetches a job

  2. executes the job

a. instantiate the Worker
b. run the middleware chain
c. call #perform

A Processor can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)

If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.

Constant Summary collapse

@@busy_lock =
Mutex.new
@@busy_count =
0

Constants included from Util

Util::EXPIRY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #server, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(mgr) ⇒ Processor

Returns a new instance of Processor.



35
36
37
38
39
40
41
42
43
# File 'lib/faktory/processor.rb', line 35

def initialize(mgr)
  @mgr = mgr
  @down = false
  @done = false
  @thread = nil
  @reloader = mgr.options[:reloader]
  @logging = (mgr.options[:job_logger] || Faktory::JobLogger).new
  @fetcher = Faktory::Fetcher.new(mgr.options)
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



27
28
29
# File 'lib/faktory/processor.rb', line 27

def job
  @job
end

#threadObject (readonly)

Returns the value of attribute thread.



26
27
28
# File 'lib/faktory/processor.rb', line 26

def thread
  @thread
end

Class Method Details

.busy_countObject



31
32
33
# File 'lib/faktory/processor.rb', line 31

def self.busy_count
  @@busy_count
end

Instance Method Details

#constantize(str) ⇒ Object



166
167
168
169
170
171
172
173
# File 'lib/faktory/processor.rb', line 166

def constantize(str)
  names = str.split("::")
  names.shift if names.empty? || names.first.empty?

  names.inject(Object) do |constant, name|
    constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
  end
end

#dispatch(payload) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/faktory/processor.rb', line 123

def dispatch(payload)
  Faktory::Logging.with_job_hash_context(payload) do
    @logging.call(payload) do
      # Rails 5 requires a Reloader to wrap code execution.  In order to
      # constantize the worker and instantiate an instance, we have to call
      # the Reloader.  It handles code loading, db connection management, etc.
      # Effectively this block denotes a "unit of work" to Rails.
      @reloader.call do
        klass = constantize(payload["jobtype"])
        jobinst = klass.new
        jobinst.jid = payload["jid"]
        jobinst._custom = payload["custom"]
        yield jobinst
      end
    end
  end
end

#fetchObject



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/faktory/processor.rb', line 99

def fetch
  work = @fetcher.retrieve_work
  if @down
    (logger.info { "Faktory is online, #{Time.now - @down} sec downtime" }
     @down = nil)
  end
  work
rescue Faktory::Shutdown
rescue => ex
  handle_fetch_exception(ex)
end

#handle_fetch_exception(ex) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/faktory/processor.rb', line 111

def handle_fetch_exception(ex)
  if !@down
    @down = Time.now
    logger.error("Error fetching job: #{ex}")
    ex.backtrace.each do |bt|
      logger.error(bt)
    end
  end
  sleep(1)
  nil
end

#kill(wait = false) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/faktory/processor.rb', line 51

def kill(wait = false)
  @done = true
  return if !@thread
  # unlike the other actors, terminate does not wait
  # for the thread to finish because we don't know how
  # long the job will take to finish.  Instead we
  # provide a `kill` method to call after the shutdown
  # timeout passes.
  @thread.raise ::Faktory::Shutdown
  @thread.value if wait
end

#process(work) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/faktory/processor.rb', line 141

def process(work)
  payload = work.job
  begin
    dispatch(payload) do |jobinst|
      Faktory.worker_middleware.invoke(jobinst, payload) do
        jobinst.perform(*payload["args"])
      end
    end
    work.acknowledge
  rescue Faktory::Shutdown => shut
    # Had to force kill this job because it didn't finish within
    # the timeout.  Fail it so we can release any locks server-side
    # and immediately restart it.
    work.fail(shut)
  rescue Exception => ex # rubocop:disable Lint/RescueException
    handle_exception(ex, {context: "Job raised exception", job: work.job})
    work.fail(ex)
    raise ex
  end
end

#process_oneObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/faktory/processor.rb', line 80

def process_one
  work = fetch
  if work
    @@busy_lock.synchronize do
      @@busy_count += 1
    end
    begin
      @job = work.job
      process(work)
    ensure
      @@busy_lock.synchronize do
        @@busy_count -= 1
      end
    end
  else
    sleep 1
  end
end

#runObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/faktory/processor.rb', line 69

def run
  until @done
    process_one
  end
  @mgr.processor_stopped(self)
rescue Faktory::Shutdown
  @mgr.processor_stopped(self)
rescue Exception => ex # rubocop:disable Lint/RescueException
  @mgr.processor_died(self, ex)
end

#startObject



63
64
65
# File 'lib/faktory/processor.rb', line 63

def start
  @thread ||= safe_thread("processor", &method(:run))
end

#terminate(wait = false) ⇒ Object



45
46
47
48
49
# File 'lib/faktory/processor.rb', line 45

def terminate(wait = false)
  @done = true
  return if !@thread
  @thread.value if wait
end

#thread_identityObject



162
163
164
# File 'lib/faktory/processor.rb', line 162

def thread_identity
  @str ||= Thread.current.object_id.to_s(36)
end