Class: Celluloid::TaskThread
- Defined in:
- lib/celluloid/tasks/task_thread.rb
Overview
Tasks with a Thread backend
Instance Attribute Summary
Attributes inherited from Task
#chain_id, #guard_warnings, #meta, #status, #type
Instance Method Summary collapse
- #backtrace ⇒ Object
- #create ⇒ Object
- #deliver(value) ⇒ Object
-
#initialize(type, meta) ⇒ TaskThread
constructor
Run the given block within a task.
- #signal ⇒ Object
Methods inherited from Task
current, #exclusive, #exclusive?, #guard, #inspect, #resume, #running?, suspend, #suspend, #terminate
Constructor Details
#initialize(type, meta) ⇒ TaskThread
Run the given block within a task
5 6 7 8 9 10 11 12 |
# File 'lib/celluloid/tasks/task_thread.rb', line 5 def initialize(type, ) @resume_queue = Queue.new @exception_queue = Queue.new @yield_mutex = Mutex.new @yield_cond = ConditionVariable.new super end |
Instance Method Details
#backtrace ⇒ Object
49 50 51 |
# File 'lib/celluloid/tasks/task_thread.rb', line 49 def backtrace @thread.backtrace end |
#create ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/celluloid/tasks/task_thread.rb', line 14 def create # TODO: move this to ActorSystem#get_thread (ThreadHandle inside InternalPool) @thread = ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do begin ex = @resume_queue.pop raise ex if ex.is_a?(Task::TerminatedError) yield rescue Exception => ex @exception_queue << ex ensure @yield_cond.signal end end end |
#deliver(value) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/celluloid/tasks/task_thread.rb', line 35 def deliver(value) raise DeadTaskError, "cannot resume a dead task" unless @thread.alive? @yield_mutex.synchronize do @resume_queue.push(value) @yield_cond.wait(@yield_mutex) while @exception_queue.size > 0 raise @exception_queue.pop end end rescue ThreadError raise DeadTaskError, "cannot resume a dead task" end |
#signal ⇒ Object
30 31 32 33 |
# File 'lib/celluloid/tasks/task_thread.rb', line 30 def signal @yield_cond.signal @resume_queue.pop end |