Class: Collective::Worker

Inherits:
Object
  • Object
show all
Includes:
Utilities::Observeable
Defined in:
lib/collective/worker.rb

Overview

A Worker is a forked process which runs jobs.

Jobs are short lived and run repeatedly.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities::Observeable

#add_observer, #notify

Constructor Details

#initialize(prototype_job, options = {}) ⇒ Worker

Returns a new instance of Worker.

Parameters:

  • options (:name) (defaults to: {})

    is optional

  • options (:policy) (defaults to: {})

    is optional

  • options (:registry) (defaults to: {})

    is optional



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/collective/worker.rb', line 46

def initialize( prototype_job, options = {} )
  @policy   = options[:policy] || Collective::Policy.resolve
  @name     = options[:name] || policy.name || prototype_job.to_s
  @storage  = policy.storage
  @registry = options[:registry] || Collective::Registry.new( name, storage )
  @job      = Collective::Idler.new( resolve_job( prototype_job ), min_sleep: policy.worker_idle_min_sleep, max_sleep: policy.worker_idle_max_sleep )

  # type checks
  policy.pool_min_workers
  registry.workers

  # post-fork processing
  storage.reconnect_after_fork
  registry.reconnect_after_fork

  # set up observers
  policy.observers.each do |observer|
    o = Collective::Utilities::ObserverBase.resolve(observer)
    add_observer(o)
  end

  # manage the registry via an observer
  add_observer( Collective::LifecycleObserver.new( key, registry ) )
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



34
35
36
# File 'lib/collective/worker.rb', line 34

def job
  @job
end

#nameObject (readonly)

Returns the value of attribute name.



35
36
37
# File 'lib/collective/worker.rb', line 35

def name
  @name
end

#policyObject (readonly)

Returns the value of attribute policy.



36
37
38
# File 'lib/collective/worker.rb', line 36

def policy
  @policy
end

#registryObject (readonly)

Returns the value of attribute registry.



37
38
39
# File 'lib/collective/worker.rb', line 37

def registry
  @registry
end

#stateObject (readonly)

Returns the value of attribute state.



38
39
40
# File 'lib/collective/worker.rb', line 38

def state
  @state
end

#storageObject (readonly)

Returns the value of attribute storage.



39
40
41
# File 'lib/collective/worker.rb', line 39

def storage
  @storage
end

#worker_expireObject (readonly)

Returns the value of attribute worker_expire.



40
41
42
# File 'lib/collective/worker.rb', line 40

def worker_expire
  @worker_expire
end

#worker_jobsObject (readonly)

Returns the value of attribute worker_jobs.



41
42
43
# File 'lib/collective/worker.rb', line 41

def worker_jobs
  @worker_jobs
end

Class Method Details

.spawn(prototype_job, options = {}) ⇒ Object

forks a new process creates a new instance of the job class runs a loop which calls the job



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/collective/worker.rb', line 17

def self.spawn( prototype_job, options = {} )
  policy   = options[:policy] || Collective::Policy.resolve
  name     = options[:name] || policy.name || prototype_job.to_s
  storage  = policy.storage
  registry = options[:registry] || Collective::Registry.new( name, storage )

  foptions = { stdout: "/tmp/debug.log" }
  policy.before_fork
  Collective::Utilities::Process.fork_and_detach( foptions ) do
    policy.after_fork
    # $0 = "$0 #{name}"
    worker = new( prototype_job, options )
    trap("TERM") { worker.quit! }
    worker.run
  end
end

Instance Method Details

#keyObject

the key is a constant string which uniquely identifies this worker WARNING this would be invalidated if we forked or set this before forking



102
103
104
# File 'lib/collective/worker.rb', line 102

def key
  @key ||= Collective::Key.new( name, Process.pid )
end

#mqObject



106
107
108
# File 'lib/collective/worker.rb', line 106

def mq
  @mq ||= Collective::Messager.new( storage, my_address: key )
end

#quit!Object



88
89
90
# File 'lib/collective/worker.rb', line 88

def quit!()
  @state = :quitting
end

#runObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/collective/worker.rb', line 71

def run()
  @state         = :running
  @worker_jobs   = 0
  @worker_expire = Time.now + policy.worker_max_lifetime

  context = { worker: self }
  with_start_and_stop do
    while running? do
      with_quitting_checks do
        with_heartbeat do
          job.call(context)
        end
      end
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/collective/worker.rb', line 92

def running?
  state == :running
end

#to_sObject



96
97
98
# File 'lib/collective/worker.rb', line 96

def to_s
  %Q[Worker(#{key})]
end