Class: Actuator::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/actuator/job.rb

Overview

TODO: Implement Job class in C++ extension so that we can do profiling and extra safety checks with minimal overhead

Constant Summary collapse

@@total =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#blockObject

Returns the value of attribute block.



83
84
85
# File 'lib/actuator/job.rb', line 83

def block
  @block
end

#fiberObject

Returns the value of attribute fiber.



83
84
85
# File 'lib/actuator/job.rb', line 83

def fiber
  @fiber
end

#idObject

Returns the value of attribute id.



83
84
85
# File 'lib/actuator/job.rb', line 83

def id
  @id
end

#is_yieldedObject

Returns the value of attribute is_yielded.



83
84
85
# File 'lib/actuator/job.rb', line 83

def is_yielded
  @is_yielded
end

#joined_onObject

Returns the value of attribute joined_on.



83
84
85
# File 'lib/actuator/job.rb', line 83

def joined_on
  @joined_on
end

#mutex_asleepObject

Returns the value of attribute mutex_asleep.



83
84
85
# File 'lib/actuator/job.rb', line 83

def mutex_asleep
  @mutex_asleep
end

#resumed_atObject

Returns the value of attribute resumed_at.



83
84
85
# File 'lib/actuator/job.rb', line 83

def resumed_at
  @resumed_at
end

#resumed_callerObject

Returns the value of attribute resumed_caller.



83
84
85
# File 'lib/actuator/job.rb', line 83

def resumed_caller
  @resumed_caller
end

#sleep_timerObject

Returns the value of attribute sleep_timer.



83
84
85
# File 'lib/actuator/job.rb', line 83

def sleep_timer
  @sleep_timer
end

#system_thread_localsObject

Returns the value of attribute system_thread_locals.



83
84
85
# File 'lib/actuator/job.rb', line 83

def system_thread_locals
  @system_thread_locals
end

#thread_localsObject

Returns the value of attribute thread_locals.



83
84
85
# File 'lib/actuator/job.rb', line 83

def thread_locals
  @thread_locals
end

#thread_variables=(value) ⇒ Object (writeonly)

Sets the attribute thread_variables

Parameters:

  • value

    the value to set the attribute thread_variables to.



84
85
86
# File 'lib/actuator/job.rb', line 84

def thread_variables=(value)
  @thread_variables = value
end

#time_warning_extraObject

Returns the value of attribute time_warning_extra.



83
84
85
# File 'lib/actuator/job.rb', line 83

def time_warning_extra
  @time_warning_extra
end

#time_warning_nameObject

Returns the value of attribute time_warning_name.



83
84
85
# File 'lib/actuator/job.rb', line 83

def time_warning_name
  @time_warning_name
end

#time_warning_started_atObject

Returns the value of attribute time_warning_started_at.



83
84
85
# File 'lib/actuator/job.rb', line 83

def time_warning_started_at
  @time_warning_started_at
end

#whoisObject

Returns the value of attribute whois.



83
84
85
# File 'lib/actuator/job.rb', line 83

def whois
  @whois
end

Class Method Details

.currentObject

def active

@@active_jobs

end



17
18
19
# File 'lib/actuator/job.rb', line 17

def current
  Fiber.current.job
end

.sleep(seconds) ⇒ Object



60
61
62
63
64
65
66
67
68
# File 'lib/actuator/job.rb', line 60

def sleep(seconds)
  job = Job.current
  job.sleep_timer = Timer.in(seconds) do
    job.fiber.resume true
  end
  Job.yield
ensure
  job.sleep_timer = nil
end

.wait(jobs, timeout = nil) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/actuator/job.rb', line 70

def wait(jobs, timeout=nil)
  job = Job.current
  jobs << job
  if timeout
    job.sleep_timer = Timer.in(timeout) do
      job.sleep_timer = nil
      job.fiber.resume true
    end
  end
  Job.yield
end

.yieldObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/actuator/job.rb', line 21

def yield
  job = Job.current
  # if job.time_warning_started_at
  #  duration = Actuator.now - job.time_warning_started_at
  #  job.time_warning_extra ||= 0.0
  #  job.time_warning_extra += duration
  #  Log.puts "[time_warning] suspended: #{job.time_warning_name} (#{(job.time_warning_extra * 1000).round(3)})" if job.time_warning_extra > 0.0025
  # end
  # if resumed_at = job.resumed_at
  #  delta = Actuator.now - resumed_at
  #  if delta > 0.0025
  #    Log.puts "Warning: Job #{job.id} yielding after #{(delta * 1000).round(3)} ms\nJob execution started from #{job.resumed_caller.join "\n"}"
  #  else
  #    #Log.puts "Job #{job.id} yielding - #{(delta * 1000).round(3)} ms"
  #  end
  # else
  #  #Log.puts "Job #{job.id} yielding"
  # end
  job.is_yielded = true
  value = Fiber.yield
  job.is_yielded = false
  if job.ended?
    # job.time_warning_name = nil
    # job.time_warning_started_at = nil
    # job.time_warning_extra = nil
    raise JobKilled
  else
    # now = job.resumed_at = Actuator.now
    # #job.resumed_caller = caller
    # if job.time_warning_started_at
    #  Log.puts "[time_warning] resumed: #{job.time_warning_name} (#{(job.time_warning_extra * 1000).round(3)})" if job.time_warning_extra > 0.0025
    #  job.time_warning_started_at = now
    # else
    #  #Log.puts "Job #{job.id} resumed"
    # end
  end
  value
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/actuator/job.rb', line 112

def alive?
  !@has_ended
end

#asleep?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/actuator/job.rb', line 108

def asleep?
  @sleep_timer || @mutex_asleep
end

#ended?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/actuator/job.rb', line 116

def ended?
  @has_ended
end

#job_endedObject



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/actuator/job.rb', line 92

def job_ended
  @has_ended = true
  @resumed_at = nil
  # @resumed_caller = nil
  # @@active_jobs.delete self
  if defined? @joined_jobs
    joined_jobs = @joined_jobs
    @joined_jobs = nil
    joined_jobs.each(&:resume)
  end
end

#job_startedObject



86
87
88
89
90
# File 'lib/actuator/job.rb', line 86

def job_started
  #raise "[Job #@id] job_started called multiple times" if @id
  @id = @@total += 1
  # @@active_jobs << self
end

#joinObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/actuator/job.rb', line 147

def join
  return if @has_ended
  fiber = Fiber.current
  job = fiber.job
  begin
    job.joined_on = self
    (@joined_jobs ||= []) << fiber
    Job.yield
    raise "Job#join - resumed before job #@id ended" unless @has_ended
  ensure
    job.joined_on = nil
    @joined_jobs.delete fiber if @joined_jobs
  end
end

#killObject

Raises:



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/actuator/job.rb', line 162

def kill
  return if @has_ended
  raise JobKilled if Job.current == self
  if @sleep_timer
    # puts "Kill requested by job #{Job.current.id} (asleep)"
    @sleep_timer.destroy
    @sleep_timer = nil
  elsif @mutex_asleep
    # puts "Kill requested by job #{Job.current.id} (mutex)"
    @mutex_asleep = nil
  elsif @is_yielded
    # puts "Kill requested by job #{Job.current.id} (yielded)"
  else
    raise "[Job #{Job.current.id}] Fiber#kill called on job #{@id} which is #{state}"
  end
  @has_ended = true
  @fiber.resume
end

#puts(msg) ⇒ Object



205
206
207
# File 'lib/actuator/job.rb', line 205

def puts(msg)
  Log.puts "[Job #@id] #{msg}"
end

#scheduleObject



128
129
130
131
132
133
134
135
136
137
# File 'lib/actuator/job.rb', line 128

def schedule
  return if @is_scheduled
  @is_scheduled = true
  @sleep_timer.destroy if @sleep_timer
  @sleep_timer = Timer.in(0) do
    @is_scheduled = false
    @sleep_timer = nil
    @fiber.resume
  end
end

#sleep(seconds) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/actuator/job.rb', line 120

def sleep(seconds)
  @sleep_timer = Timer.in(seconds) do
    @sleep_timer = nil
    @fiber.resume true
  end
  Job.yield
end

#stateObject



193
194
195
196
197
198
199
200
201
202
203
# File 'lib/actuator/job.rb', line 193

def state
  if @has_ended;        'ended'
  elsif @sleep_timer;   'asleep'
  elsif @mutex_asleep;  'mutex'
  elsif @joined_on;     "joined on job #{@joined_on.id}"
  elsif @is_yielded;    'yielded'
  elsif !@fiber;        'missing fiber'
  elsif @fiber.alive?;  'alive'
  else                  'dead fiber'
  end
end

#thread_variable_get(name) ⇒ Object



181
182
183
# File 'lib/actuator/job.rb', line 181

def thread_variable_get(name)
  @thread_variables[name] if @thread_variables
end

#thread_variable_set(name, value) ⇒ Object



185
186
187
188
189
190
191
# File 'lib/actuator/job.rb', line 185

def thread_variable_set(name, value)
  if @thread_variables
    @thread_variables[name] = value
  else
    @thread_variables = { name => value }
  end
end

#time_warning(name = nil) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/actuator/job.rb', line 209

def time_warning(name=nil)
  raise "Job#time_warning called for job #@id while it is suspended" if yielded?
  if @time_warning_started_at
    duration = Actuator.now - @time_warning_started_at
    @time_warning_extra ||= 0.0
    @time_warning_extra += duration
    (@time_warning_stack ||= []) << [@time_warning_name, @time_warning_extra]
  end
  @time_warning_extra = 0
  if name
    @time_warning_name = name
    @time_warning_started_at = Actuator.now
  else
    @time_warning_name = nil
    @time_warning_started_at = nil
  end
  duration
end

#time_warning!(name = nil) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/actuator/job.rb', line 228

def time_warning!(name=nil)
  raise "Job#time_warning! called for job #@id while it is suspended" if yielded?
  if @time_warning_started_at
    now = Actuator.now
    duration = now - @time_warning_started_at + @time_warning_extra
    if duration > 0.0025
      Log.warn "Time warning: #@time_warning_name took #{(duration * 1000).round(3)} ms"
    end
  end
  @time_warning_extra = 0
  if name
    @time_warning_name = name
    @time_warning_started_at = Actuator.now
  else
    if @time_warning_stack && (outstanding = @time_warning_stack.shift)
      @time_warning_name = outstanding[0]
      @time_warning_extra = outstanding[1]
      @time_warning_started_at = Actuator.now
    else
      @time_warning_name = nil
      @time_warning_started_at = nil
    end
  end
  duration
end

#wake!Object



139
140
141
142
143
144
145
# File 'lib/actuator/job.rb', line 139

def wake!
  unless timer = @sleep_timer
    raise "Tried to wake up a job which is not asleep"
  end
  @sleep_timer = nil
  timer.fire!
end

#yielded?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/actuator/job.rb', line 104

def yielded?
  @is_yielded
end