Class: Bumbleworks::Worker
- Inherits:
-
Ruote::Worker
- Object
- Ruote::Worker
- Bumbleworks::Worker
show all
- Defined in:
- lib/bumbleworks/worker.rb,
lib/bumbleworks/worker/info.rb,
lib/bumbleworks/worker/proxy.rb
Defined Under Namespace
Classes: Info, Proxy, WorkerStateNotChanged
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(*args, &block) ⇒ Worker
Returns a new instance of Worker.
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/bumbleworks/worker.rb', line 99
def initialize(*args, &block)
super
@pid = Process.pid
@id = SecureRandom.uuid
@launched_at = Time.now
@ip = Ruote.local_ip
@hostname = Socket.gethostname
@system = `uname -a`.strip rescue nil
if @info
@info = Info.new(self)
save_info
end
end
|
Instance Attribute Details
#hostname ⇒ Object
Returns the value of attribute hostname.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def hostname
@hostname
end
|
#id ⇒ Object
Returns the value of attribute id.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def id
@id
end
|
#ip ⇒ Object
Returns the value of attribute ip.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def ip
@ip
end
|
#launched_at ⇒ Object
Returns the value of attribute launched_at.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def launched_at
@launched_at
end
|
#pid ⇒ Object
Returns the value of attribute pid.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def pid
@pid
end
|
#system ⇒ Object
Returns the value of attribute system.
7
8
9
|
# File 'lib/bumbleworks/worker.rb', line 7
def system
@system
end
|
Class Method Details
.active_worker_states ⇒ Object
31
32
33
34
35
36
37
38
39
|
# File 'lib/bumbleworks/worker.rb', line 31
def active_worker_states
info.inject({}) { |hsh, info|
id, state = info.id, info.state
unless info.in_stopped_state?
hsh[id] = state
end
hsh
}
end
|
.change_worker_state(new_state, options = {}) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/bumbleworks/worker.rb', line 41
def change_worker_state(new_state, options = {})
with_worker_state_enabled do
Bumbleworks.dashboard.worker_state = new_state
Bumbleworks::Support.wait_until(options) do
active_worker_states.values.all? { |state| state == new_state }
end
end
return true
rescue Bumbleworks::Support::WaitTimeout
raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}"
end
|
.control_document ⇒ Object
82
83
84
85
86
87
88
|
# File 'lib/bumbleworks/worker.rb', line 82
def control_document
doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {}
doc['type'] = 'variables'
doc['_id'] = 'worker_control'
doc['workers'] ||= {}
doc
end
|
.info_document ⇒ Object
90
91
92
93
94
95
96
|
# File 'lib/bumbleworks/worker.rb', line 90
def info_document
doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {}
doc['type'] = 'variables'
doc['_id'] = 'workers'
doc['workers'] ||= {}
doc
end
|
.pause_all(options = {}) ⇒ Object
23
24
25
|
# File 'lib/bumbleworks/worker.rb', line 23
def pause_all(options = {})
change_worker_state('paused', options)
end
|
.refresh_worker_info(options = {}) ⇒ Object
53
54
55
56
57
58
59
60
61
|
# File 'lib/bumbleworks/worker.rb', line 53
def refresh_worker_info(options = {})
with_worker_state_enabled do
info.each do |worker_info|
if !worker_info.in_stopped_state? && worker_info.stalling?
worker_info.record_new_state("stalled")
end
end
end
end
|
.shutdown_all(options = {}) ⇒ Object
14
15
16
17
18
19
20
21
|
# File 'lib/bumbleworks/worker.rb', line 14
def shutdown_all(options = {})
change_worker_state('stopped', options)
ensure
change_worker_state('running', options)
end
|
.toggle_worker_state_enabled(switch) ⇒ Object
63
64
65
66
67
68
|
# File 'lib/bumbleworks/worker.rb', line 63
def toggle_worker_state_enabled(switch)
unless [true, false].include?(switch)
raise ArgumentError, "Must call with true or false"
end
Bumbleworks.dashboard.context['worker_state_enabled'] = switch
end
|
.unpause_all(options = {}) ⇒ Object
27
28
29
|
# File 'lib/bumbleworks/worker.rb', line 27
def unpause_all(options = {})
change_worker_state('running', options)
end
|
.with_worker_state_enabled ⇒ Object
74
75
76
77
78
79
80
|
# File 'lib/bumbleworks/worker.rb', line 74
def with_worker_state_enabled
was_already_enabled = worker_state_enabled?
toggle_worker_state_enabled(true) unless was_already_enabled
yield
ensure
toggle_worker_state_enabled(false) unless was_already_enabled
end
|
.worker_state_enabled? ⇒ Boolean
70
71
72
|
# File 'lib/bumbleworks/worker.rb', line 70
def worker_state_enabled?
!!Bumbleworks.dashboard.context['worker_state_enabled']
end
|
Instance Method Details
#class_name ⇒ Object
115
116
117
|
# File 'lib/bumbleworks/worker.rb', line 115
def class_name
self.class.to_s
end
|
#desired_state ⇒ Object
132
133
134
135
136
137
|
# File 'lib/bumbleworks/worker.rb', line 132
def desired_state
control_hash = worker_control_variable ||
@storage.get("variables", "worker") ||
{ "state" => "running" }
control_hash["state"]
end
|
#determine_state ⇒ Object
139
140
141
142
143
144
145
146
|
# File 'lib/bumbleworks/worker.rb', line 139
def determine_state
@state_mutex.synchronize do
if @state != "stopped" && @context["worker_state_enabled"]
@state = desired_state
save_info
end
end
end
|
#info ⇒ Object
148
149
150
|
# File 'lib/bumbleworks/worker.rb', line 148
def info
self.class.info[id]
end
|
#save_info ⇒ Object
119
120
121
|
# File 'lib/bumbleworks/worker.rb', line 119
def save_info
@info.save if @info
end
|
#shutdown ⇒ Object
123
124
125
126
|
# File 'lib/bumbleworks/worker.rb', line 123
def shutdown
super
save_info
end
|
#worker_control_variable ⇒ Object
128
129
130
|
# File 'lib/bumbleworks/worker.rb', line 128
def worker_control_variable
self.class.control_document["workers"][id]
end
|