Class: Nestene::Actor::Core
- Inherits:
-
Object
- Object
- Nestene::Actor::Core
- Includes:
- Celluloid, Celluloid::Notifications
- Defined in:
- lib/nestene/actor/core.rb
Instance Method Summary collapse
- #auton_names ⇒ Object
- #cancel_delayed_step(auton_id, step_id) ⇒ Object
- #create_auton(type, auton_id = SecureRandom.uuid) ⇒ Object
- #get_credentials ⇒ Object
- #get_state(auton_id) ⇒ Object
-
#initialize(storage) ⇒ Core
constructor
A new instance of Core.
- #notify_waiters(topic, auton_id, state) ⇒ Object
- #repeat_step(auton_id, method_uuid) ⇒ Object
- #resume(auton_id) ⇒ Object
- #schedule_delayed_step(auton_id, delay, name, parameters = []) ⇒ Object
- #schedule_repeating_delayed_step(auton_id, every, delay, name, parameters = []) ⇒ Object
- #schedule_step(auton_id, step_name, parameters = [], callback_auton_id = nil, callback_method = nil) ⇒ Object
- #set_credentials(credentials) ⇒ Object
- #wait_for_execution_result(auton_id, method_uuid) ⇒ Object
Constructor Details
#initialize(storage) ⇒ Core
Returns a new instance of Core.
10 11 12 13 14 |
# File 'lib/nestene/actor/core.rb', line 10 def initialize(storage) subscribe('state_update', :notify_waiters) @execution_futures={} @storage = storage end |
Instance Method Details
#auton_names ⇒ Object
16 17 18 |
# File 'lib/nestene/actor/core.rb', line 16 def auton_names @storage.list end |
#cancel_delayed_step(auton_id, step_id) ⇒ Object
60 61 62 63 64 |
# File 'lib/nestene/actor/core.rb', line 60 def cancel_delayed_step auton_id, step_id Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.remove_delayed_method step_id end end |
#create_auton(type, auton_id = SecureRandom.uuid) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/nestene/actor/core.rb', line 28 def create_auton type, auton_id=SecureRandom.uuid storage = AutonStorage.new(auton_id, @storage) Celluloid::Actor["storage:%s" % auton_id] = storage storage.create(type) auton_id rescue Exception => e abort e end |
#get_credentials ⇒ Object
20 21 22 |
# File 'lib/nestene/actor/core.rb', line 20 def get_credentials @storage.load('__credentials__') || {} end |
#get_state(auton_id) ⇒ Object
37 38 39 40 41 |
# File 'lib/nestene/actor/core.rb', line 37 def get_state auton_id Celluloid::Actor["storage:%s" % auton_id].get rescue Exception => e abort e end |
#notify_waiters(topic, auton_id, state) ⇒ Object
140 141 142 143 144 145 |
# File 'lib/nestene/actor/core.rb', line 140 def notify_waiters topic, auton_id, state if state future = @execution_futures.delete(auton_id) future.signal(SelfValue.new) if future end end |
#repeat_step(auton_id, method_uuid) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/nestene/actor/core.rb', line 81 def repeat_step auton_id, method_uuid step_id = SecureRandom.uuid Celluloid::Actor["storage:%s" % auton_id].update do |state| original_step = state.queue.executed.find{|m| m.uuid == method_uuid} method = ScheduledMethod.new method.name = original_step.name method.parameters = original_step.parameters method.uuid = step_id state.queue.to_execute.unshift method state.queue.failed = false end step_id end |
#resume(auton_id) ⇒ Object
97 98 99 100 101 |
# File 'lib/nestene/actor/core.rb', line 97 def resume auton_id Celluloid::Actor["storage:%s" % auton_id].update do |state| state.queue.failed = false end end |
#schedule_delayed_step(auton_id, delay, name, parameters = []) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/nestene/actor/core.rb', line 66 def schedule_delayed_step auton_id, delay , name, parameters=[] Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.add_delayed_method name, parameters, delay delayed.uuid end end |
#schedule_repeating_delayed_step(auton_id, every, delay, name, parameters = []) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/nestene/actor/core.rb', line 74 def schedule_repeating_delayed_step auton_id, every, delay, name, parameters=[] Celluloid::Actor["storage:%s" % auton_id].update do |state| delayed = state.queue.add_delayed_method name, parameters, delay, every delayed.uuid end end |
#schedule_step(auton_id, step_name, parameters = [], callback_auton_id = nil, callback_method = nil) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/nestene/actor/core.rb', line 43 def schedule_step auton_id, step_name, parameters=[], callback_auton_id=nil, callback_method=nil step_id = SecureRandom.uuid Celluloid::Actor["storage:%s" % auton_id].update do |state| method = ScheduledMethod.new method.name = step_name method.parameters = parameters method.uuid = step_id if callback_auton_id && callback_method method.callback = Callback.new method.callback.name = callback_method method.callback.auton_id = callback_auton_id end state.queue.to_execute << method end step_id end |
#set_credentials(credentials) ⇒ Object
24 25 26 |
# File 'lib/nestene/actor/core.rb', line 24 def set_credentials credentials @storage.store('__credentials__',credentials) end |
#wait_for_execution_result(auton_id, method_uuid) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/nestene/actor/core.rb', line 104 def wait_for_execution_result auton_id, method_uuid future = nil executed = nil exclusive do state = Celluloid::Actor["storage:%s" % auton_id].get unless @execution_futures.has_key?(auton_id) @execution_futures[auton_id] = Celluloid::Future.new end future = @execution_futures[auton_id] executed = state ? state.queue.executed.find{|m| m.uuid == method_uuid} : nil end until executed executed = future.value state = Celluloid::Actor["storage:%s" % auton_id].get executed = state.queue.executed.find{|m| m.uuid == method_uuid} end if executed.error abort executed.error else return executed.result end end |