Class: Collective::Worker
- Inherits:
-
Object
- Object
- Collective::Worker
- Includes:
- Utilities::Observeable
- Defined in:
- lib/collective/worker.rb
Overview
A Worker is a forked process which runs jobs.
Jobs are short lived and run repeatedly.
Instance Attribute Summary collapse
-
#job ⇒ Object
readonly
Returns the value of attribute job.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#policy ⇒ Object
readonly
Returns the value of attribute policy.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
-
#worker_expire ⇒ Object
readonly
Returns the value of attribute worker_expire.
-
#worker_jobs ⇒ Object
readonly
Returns the value of attribute worker_jobs.
Class Method Summary collapse
-
.spawn(prototype_job, options = {}) ⇒ Object
forks a new process creates a new instance of the job class runs a loop which calls the job.
Instance Method Summary collapse
-
#initialize(prototype_job, options = {}) ⇒ Worker
constructor
A new instance of Worker.
-
#key ⇒ Object
the key is a constant string which uniquely identifies this worker WARNING this would be invalidated if we forked or set this before forking.
- #mq ⇒ Object
- #quit! ⇒ Object
- #run ⇒ Object
- #running? ⇒ Boolean
- #to_s ⇒ Object
Methods included from Utilities::Observeable
Constructor Details
#initialize(prototype_job, options = {}) ⇒ Worker
Returns a new instance of Worker.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/collective/worker.rb', line 46 def initialize( prototype_job, = {} ) @policy = [:policy] || Collective::Policy.resolve @name = [:name] || policy.name || prototype_job.to_s @storage = policy.storage @registry = [:registry] || Collective::Registry.new( name, storage ) @job = Collective::Idler.new( resolve_job( prototype_job ), min_sleep: policy.worker_idle_min_sleep, max_sleep: policy.worker_idle_max_sleep ) # type checks policy.pool_min_workers registry.workers # post-fork processing storage.reconnect_after_fork registry.reconnect_after_fork # set up observers policy.observers.each do |observer| o = Collective::Utilities::ObserverBase.resolve(observer) add_observer(o) end # manage the registry via an observer add_observer( Collective::LifecycleObserver.new( key, registry ) ) end |
Instance Attribute Details
#job ⇒ Object (readonly)
Returns the value of attribute job.
34 35 36 |
# File 'lib/collective/worker.rb', line 34 def job @job end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
35 36 37 |
# File 'lib/collective/worker.rb', line 35 def name @name end |
#policy ⇒ Object (readonly)
Returns the value of attribute policy.
36 37 38 |
# File 'lib/collective/worker.rb', line 36 def policy @policy end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
37 38 39 |
# File 'lib/collective/worker.rb', line 37 def registry @registry end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
38 39 40 |
# File 'lib/collective/worker.rb', line 38 def state @state end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
39 40 41 |
# File 'lib/collective/worker.rb', line 39 def storage @storage end |
#worker_expire ⇒ Object (readonly)
Returns the value of attribute worker_expire.
40 41 42 |
# File 'lib/collective/worker.rb', line 40 def worker_expire @worker_expire end |
#worker_jobs ⇒ Object (readonly)
Returns the value of attribute worker_jobs.
41 42 43 |
# File 'lib/collective/worker.rb', line 41 def worker_jobs @worker_jobs end |
Class Method Details
.spawn(prototype_job, options = {}) ⇒ Object
forks a new process creates a new instance of the job class runs a loop which calls the job
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/collective/worker.rb', line 17 def self.spawn( prototype_job, = {} ) policy = [:policy] || Collective::Policy.resolve name = [:name] || policy.name || prototype_job.to_s storage = policy.storage registry = [:registry] || Collective::Registry.new( name, storage ) = { stdout: "/tmp/debug.log" } policy.before_fork Collective::Utilities::Process.fork_and_detach( ) do policy.after_fork # $0 = "$0 #{name}" worker = new( prototype_job, ) trap("TERM") { worker.quit! } worker.run end end |
Instance Method Details
#key ⇒ Object
the key is a constant string which uniquely identifies this worker WARNING this would be invalidated if we forked or set this before forking
102 103 104 |
# File 'lib/collective/worker.rb', line 102 def key @key ||= Collective::Key.new( name, Process.pid ) end |
#mq ⇒ Object
106 107 108 |
# File 'lib/collective/worker.rb', line 106 def mq @mq ||= Collective::Messager.new( storage, my_address: key ) end |
#quit! ⇒ Object
88 89 90 |
# File 'lib/collective/worker.rb', line 88 def quit!() @state = :quitting end |
#run ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/collective/worker.rb', line 71 def run() @state = :running @worker_jobs = 0 @worker_expire = Time.now + policy.worker_max_lifetime context = { worker: self } with_start_and_stop do while running? do with_quitting_checks do with_heartbeat do job.call(context) end end end end end |
#running? ⇒ Boolean
92 93 94 |
# File 'lib/collective/worker.rb', line 92 def running? state == :running end |
#to_s ⇒ Object
96 97 98 |
# File 'lib/collective/worker.rb', line 96 def to_s %Q[Worker(#{key})] end |