Class: Empathy::EM::Thread

Inherits:
Object show all
Defined in:
lib/empathy/em/thread.rb

Overview

Acts like a ::Thread using Fibers and EventMachine

Thread methods not implemented by Empathy

  • .exclusive - not implemented
  • #critical - not implemented
  • #set_trace_func - not implemented
  • #safe_level - not implemented
  • #priority - not implemented

Direct Known Subclasses

ProxyThread

Constant Summary collapse

@@em_threads =
{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Thread

Like ::Thread#initialize



160
161
162
163
164
# File 'lib/empathy/em/thread.rb', line 160

def initialize(*args,&block)

  ::Kernel.raise ThreadError, "no block" unless block_given?
  initialize_fiber(*args,&block)
end

Instance Attribute Details

#fiberFiber (readonly)

Returns The underlying fiber.

Returns:

  • (Fiber)

    The underlying fiber.



71
72
73
# File 'lib/empathy/em/thread.rb', line 71

def fiber
  @fiber
end

Class Method Details

.currentThread

Like ::Thread::current. Get the currently running EM::Thread, eg to access thread local variables

Returns:

  • (Thread)

    representing the current Fiber



90
91
92
# File 'lib/empathy/em/thread.rb', line 90

def self.current
  @@em_threads[Fiber.current] || ProxyThread.new(Fiber.current)
end

.exitThread

Like ::Thread.exit

Returns:



125
126
127
# File 'lib/empathy/em/thread.rb', line 125

def self.exit
  current.exit
end

.kill(thread) ⇒ Thread

Like ::Thread.kill

Returns:



131
132
133
# File 'lib/empathy/em/thread.rb', line 131

def self.kill(thread)
  thread.exit
end

.listArray<Thread>

Like ::Thread::list. Return an array of all EM::Threads that are alive.

Returns:



76
77
78
# File 'lib/empathy/em/thread.rb', line 76

def self.list
  @@em_threads.values.select { |s| s.alive? }
end

.mainThread

Like ::Thread.main

Returns:



83
84
85
# File 'lib/empathy/em/thread.rb', line 83

def self.main
  @@main
end

.passnil

Like ::Thread::pass. The fiber is paused and resumed on the next_tick of EM's event loop

Returns:

  • (nil)


116
117
118
119
120
121
# File 'lib/empathy/em/thread.rb', line 116

def self.pass
  em_thread = current
  ::EM.next_tick{ em_thread.__send__(:wake_resume) }
  em_thread.__send__(:yield_sleep)
  nil
end

.start(*args, &block) ⇒ Thread

Like ::Thread.start

Returns:



145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/empathy/em/thread.rb', line 145

def self.start(*args,&block)
  ::Kernel.raise ArgumentError, "no block" unless block_given?
  c = if self != Thread
        Class.new(self) do
          def initialize(*args,&block)
            initialize_fiber(*args,&block)
          end
        end
      else
        self
      end
  c.new(*args,&block)
end

.stopvoid

This method returns an undefined value.

Like ::Thread::stop. Sleep forever (until woken)



108
109
110
# File 'lib/empathy/em/thread.rb', line 108

def self.stop
  Kernel.sleep()
end

.yield(*args) ⇒ Object

Alias for Fiber::yield Equivalent to a thread being blocked on IO

WARNING: Be very careful about using #yield with the other thread like methods Specifically it is important to ensure user calls to #resume don't conflict with the resumes that are setup via EM.timer or EM.next_tick as a result of #::sleep or #::pass



101
102
103
# File 'lib/empathy/em/thread.rb', line 101

def self.yield(*args)
  Fiber.yield(*args)
end

Instance Method Details

#[](name) ⇒ Object?

Access to "fiber local" variables, akin to "thread local" variables.

Parameters:

  • name (Symbol)

Returns:



274
275
276
277
# File 'lib/empathy/em/thread.rb', line 274

def [](name)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals[name.to_sym]
end

#[]=(name, value) ⇒ Object

Access to "fiber local" variables, akin to "thread local" variables.



280
281
282
283
# File 'lib/empathy/em/thread.rb', line 280

def []=(name, value)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals[name.to_sym] = value
end

#alive?true, false

Like ::Thread#alive? or Fiber#alive?

Returns:

  • (true, false)


184
185
186
# File 'lib/empathy/em/thread.rb', line 184

def alive?
  fiber.alive?
end

#ensure_hook(key, &block) ⇒ void

This method returns an undefined value.

Do something when the fiber completes.



307
308
309
310
311
312
313
# File 'lib/empathy/em/thread.rb', line 307

def ensure_hook(key,&block)
  if block_given? then
    @ensure_hooks[key] = block
  else
    @ensure_hooks.delete(key)
  end
end

#exitnil, Thread Also known as: kill, terminate

Like ::Thread#exit. Signals thread to wakeup and die

Returns:



225
226
227
228
229
230
231
232
# File 'lib/empathy/em/thread.rb', line 225

def exit
  case @status
  when :sleep
    wake_resume(:exit)
  when :run
    throw :exit
  end
end

#inspectString

Like ::Thread#inspect

Returns:

  • (String)


301
302
303
# File 'lib/empathy/em/thread.rb', line 301

def inspect
  "#<Empathy::EM::Thread:0x%s %s %s" % [object_id, @fiber == Fiber.current ? "run" : "yielded", status || "dead" ]
end

#join(limit = nil) ⇒ nil, Thread

Like ::Thread#join.

Parameters:

  • limit (Numeric) (defaults to: nil)

    seconds to wait for thread to expire

Returns:

  • (nil, Thread)

    nil if timeout expires, otherwise this Thread



169
170
171
172
173
# File 'lib/empathy/em/thread.rb', line 169

def join(limit = nil)
  @mutex.synchronize { @join_cond.wait(@mutex,limit) } if alive?
  ::Kernel.raise @exception if @exception
  if alive? then nil else self end
end

#key?(name) ⇒ true, false

Like ::Thread#key? Is there a "fiber local" variable defined called +name+

Parameters:

  • name (Symbol)

Returns:

  • (true, false)


288
289
290
291
# File 'lib/empathy/em/thread.rb', line 288

def key?(name)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals.has_key?(name.to_sym)
end

#keysArray<Symbol>

Like ::Thread#keys The set of "em_thread local" variable keys

Returns:

  • (Array<Symbol>)


295
296
297
# File 'lib/empathy/em/thread.rb', line 295

def keys()
  @locals.keys
end

#raisevoid #raise(string) ⇒ void #raise(exception, string = nil, array = caller()) ⇒ void

This method returns an undefined value.

Like ::Thread#raise, raise an exception on a sleeping Thread

Overloads:

  • #raisevoid

    Raises:

    • RuntimeError

  • #raise(string) ⇒ void

    Parameters:

    • string (String)

    Raises:

    • RuntimeError

  • #raise(exception, string = nil, array = caller()) ⇒ void

    Parameters:

    • exception (Class, String, Object)
    • string (String) (defaults to: nil)

      exception message

    • array (Array<String>) (defaults to: caller())

      caller information

    Raises:

    • Exception



257
258
259
260
261
262
263
264
265
266
# File 'lib/empathy/em/thread.rb', line 257

def raise(*args)
  args << RuntimeError if args.empty?
  if fiber == Fiber.current
    ::Kernel.raise(*args)
  elsif status
    wake_resume(:raise,*args)
  else
    #dead em_thread, do nothing
  end
end

#resume(*args) ⇒ Object

Like Fiber#resume. Refer to warnings on #::yield



176
177
178
179
180
# File 'lib/empathy/em/thread.rb', line 176

def resume(*args)
  #TODO  should only allow if @status is :run, which really means
  # blocked by a call to Yield
  fiber.resume(*args)
end

#statusString, ...

Like ::Thread#status

Returns:

  • (String)
  • (false)
  • (nil)


199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/empathy/em/thread.rb', line 199

def status
  case @status
  when :run
    #TODO - if not the current fiber
    # we can only be in this state due to a yield on the
    # underlying fiber, which means we are actually in sleep
    # or we're a ProxyThread that is dead and not yet
    # cleaned up
    "run"
  when :sleep
    "sleep"
  when :dead, :killed
    false
  when :exception
    nil
  end
end

#stop?false, true

Like ::Thread#stop?

Returns:

  • (false)

    if called on the current fiber

  • (true)

    otherwise



191
192
193
# File 'lib/empathy/em/thread.rb', line 191

def stop?
  Fiber.current != fiber
end

#valueObject

Like ::Thread#value. Implicitly calls #join.



218
219
220
# File 'lib/empathy/em/thread.rb', line 218

def value
  join and @value
end

#wakeupThread Also known as: run

Like ::Thread#wakeup Wakes a sleeping Thread

Returns:



239
240
241
242
243
# File 'lib/empathy/em/thread.rb', line 239

def wakeup
  ::Kernel.raise ThreadError, "dead em_thread" unless status
  wake_resume()
  self
end