Class: Bumbleworks::Worker

Inherits:
Ruote::Worker
  • Object
show all
Defined in:
lib/bumbleworks/worker.rb,
lib/bumbleworks/worker/info.rb,
lib/bumbleworks/worker/proxy.rb

Defined Under Namespace

Classes: Info, Proxy, WorkerStateNotChanged

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Worker

Returns a new instance of Worker.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/bumbleworks/worker.rb', line 99

def initialize(*args, &block)
  super
  @pid = Process.pid
  @id = SecureRandom.uuid
  @launched_at = Time.now

  @ip = Ruote.local_ip
  @hostname = Socket.gethostname
  @system = `uname -a`.strip rescue nil

  if @info
    @info = Info.new(self)
    save_info
  end
end

Instance Attribute Details

#hostnameObject (readonly)

Returns the value of attribute hostname.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def hostname
  @hostname
end

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def id
  @id
end

#ipObject (readonly)

Returns the value of attribute ip.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def ip
  @ip
end

#launched_atObject (readonly)

Returns the value of attribute launched_at.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def launched_at
  @launched_at
end

#pidObject (readonly)

Returns the value of attribute pid.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def pid
  @pid
end

#systemObject (readonly)

Returns the value of attribute system.



7
8
9
# File 'lib/bumbleworks/worker.rb', line 7

def system
  @system
end

Class Method Details

.active_worker_statesObject



31
32
33
34
35
36
37
38
39
# File 'lib/bumbleworks/worker.rb', line 31

def active_worker_states
  info.inject({}) { |hsh, info|
    id, state = info.id, info.state
    unless info.in_stopped_state?
      hsh[id] = state
    end
    hsh
  }
end

.change_worker_state(new_state, options = {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bumbleworks/worker.rb', line 41

def change_worker_state(new_state, options = {})
  with_worker_state_enabled do
    Bumbleworks.dashboard.worker_state = new_state
    Bumbleworks::Support.wait_until(options) do
      active_worker_states.values.all? { |state| state == new_state }
    end
  end
  return true
rescue Bumbleworks::Support::WaitTimeout
  raise WorkerStateNotChanged, "Worker states: #{active_worker_states.inspect}"
end

.control_documentObject



82
83
84
85
86
87
88
# File 'lib/bumbleworks/worker.rb', line 82

def control_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'worker_control') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'worker_control'
  doc['workers'] ||= {}
  doc
end

.infoObject



10
11
12
# File 'lib/bumbleworks/worker.rb', line 10

def info
  Bumbleworks::Worker::Info || {}
end

.info_documentObject



90
91
92
93
94
95
96
# File 'lib/bumbleworks/worker.rb', line 90

def info_document
  doc = Bumbleworks.dashboard.storage.get('variables', 'workers') || {}
  doc['type'] = 'variables'
  doc['_id'] = 'workers'
  doc['workers'] ||= {}
  doc
end

.pause_all(options = {}) ⇒ Object



23
24
25
# File 'lib/bumbleworks/worker.rb', line 23

def pause_all(options = {})
  change_worker_state('paused', options)
end

.refresh_worker_info(options = {}) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/bumbleworks/worker.rb', line 53

def refresh_worker_info(options = {})
  with_worker_state_enabled do
    info.each do |worker_info|
      if !worker_info.in_stopped_state? && worker_info.stalling?
        worker_info.record_new_state("stalled")
      end
    end
  end
end

.shutdown_all(options = {}) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/bumbleworks/worker.rb', line 14

def shutdown_all(options = {})
  # First, send all running workers a message to stop
  change_worker_state('stopped', options)
ensure
  # Now ensure that future started workers will be started
  # in "running" mode instead of automatically stopped
  change_worker_state('running', options)
end

.toggle_worker_state_enabled(switch) ⇒ Object



63
64
65
66
67
68
# File 'lib/bumbleworks/worker.rb', line 63

def toggle_worker_state_enabled(switch)
  unless [true, false].include?(switch)
    raise ArgumentError, "Must call with true or false"
  end
  Bumbleworks.dashboard.context['worker_state_enabled'] = switch
end

.unpause_all(options = {}) ⇒ Object



27
28
29
# File 'lib/bumbleworks/worker.rb', line 27

def unpause_all(options = {})
  change_worker_state('running', options)
end

.with_worker_state_enabledObject



74
75
76
77
78
79
80
# File 'lib/bumbleworks/worker.rb', line 74

def with_worker_state_enabled
  was_already_enabled = worker_state_enabled?
  toggle_worker_state_enabled(true) unless was_already_enabled
  yield
ensure
  toggle_worker_state_enabled(false) unless was_already_enabled
end

.worker_state_enabled?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/bumbleworks/worker.rb', line 70

def worker_state_enabled?
  !!Bumbleworks.dashboard.context['worker_state_enabled']
end

Instance Method Details

#class_nameObject



115
116
117
# File 'lib/bumbleworks/worker.rb', line 115

def class_name
  self.class.to_s
end

#desired_stateObject



132
133
134
135
136
137
# File 'lib/bumbleworks/worker.rb', line 132

def desired_state
  control_hash = worker_control_variable ||
    @storage.get("variables", "worker") ||
    { "state" => "running" }
  control_hash["state"]
end

#determine_stateObject



139
140
141
142
143
144
145
146
# File 'lib/bumbleworks/worker.rb', line 139

def determine_state
  @state_mutex.synchronize do
    if @state != "stopped" && @context["worker_state_enabled"]
      @state = desired_state
      save_info
    end
  end
end

#infoObject



148
149
150
# File 'lib/bumbleworks/worker.rb', line 148

def info
  self.class.info[id]
end

#save_infoObject



119
120
121
# File 'lib/bumbleworks/worker.rb', line 119

def save_info
  @info.save if @info
end

#shutdownObject



123
124
125
126
# File 'lib/bumbleworks/worker.rb', line 123

def shutdown
  super
  save_info
end

#worker_control_variableObject



128
129
130
# File 'lib/bumbleworks/worker.rb', line 128

def worker_control_variable
  self.class.control_document["workers"][id]
end