Class: Nestene::Actor::Core

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Notifications
Defined in:
lib/nestene/actor/core.rb

Instance Method Summary collapse

Constructor Details

#initialize(storage) ⇒ Core

Returns a new instance of Core.



10
11
12
13
14
# File 'lib/nestene/actor/core.rb', line 10

def initialize(storage)
  subscribe('state_update', :notify_waiters)
  @execution_futures={}
  @storage = storage
end

Instance Method Details

#auton_namesObject



16
17
18
# File 'lib/nestene/actor/core.rb', line 16

def auton_names
  @storage.list
end

#cancel_delayed_step(auton_id, step_id) ⇒ Object



60
61
62
63
64
# File 'lib/nestene/actor/core.rb', line 60

def cancel_delayed_step auton_id, step_id
  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    delayed = state.queue.remove_delayed_method step_id
  end
end

#create_auton(type, auton_id = SecureRandom.uuid) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/nestene/actor/core.rb', line 28

def create_auton type, auton_id=SecureRandom.uuid
  storage = AutonStorage.new(auton_id, @storage)
  Celluloid::Actor["storage:%s" % auton_id] = storage
  storage.create(type)
  auton_id
rescue Exception => e
  abort e
end

#get_credentialsObject



20
21
22
# File 'lib/nestene/actor/core.rb', line 20

def get_credentials
  @storage.load('__credentials__') || {}
end

#get_state(auton_id) ⇒ Object



37
38
39
40
41
# File 'lib/nestene/actor/core.rb', line 37

def get_state auton_id
  Celluloid::Actor["storage:%s" % auton_id].get
rescue Exception => e
  abort e
end

#notify_waiters(topic, auton_id, state) ⇒ Object



140
141
142
143
144
145
# File 'lib/nestene/actor/core.rb', line 140

def notify_waiters topic, auton_id, state
  if state
    future = @execution_futures.delete(auton_id)
    future.signal(SelfValue.new) if future
  end
end

#repeat_step(auton_id, method_uuid) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/nestene/actor/core.rb', line 81

def repeat_step auton_id, method_uuid
  step_id = SecureRandom.uuid
  Celluloid::Actor["storage:%s" % auton_id].update do |state|

    original_step = state.queue.executed.find{|m| m.uuid == method_uuid}

    method = ScheduledMethod.new
    method.name = original_step.name
    method.parameters = original_step.parameters
    method.uuid = step_id
    state.queue.to_execute.unshift method
    state.queue.failed = false
  end
  step_id
end

#resume(auton_id) ⇒ Object



97
98
99
100
101
# File 'lib/nestene/actor/core.rb', line 97

def resume auton_id
  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    state.queue.failed = false
  end
end

#schedule_delayed_step(auton_id, delay, name, parameters = []) ⇒ Object



66
67
68
69
70
71
# File 'lib/nestene/actor/core.rb', line 66

def schedule_delayed_step auton_id, delay , name, parameters=[]
  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    delayed = state.queue.add_delayed_method name, parameters, delay
    delayed.uuid
  end
end

#schedule_repeating_delayed_step(auton_id, every, delay, name, parameters = []) ⇒ Object



74
75
76
77
78
79
# File 'lib/nestene/actor/core.rb', line 74

def schedule_repeating_delayed_step auton_id, every, delay, name, parameters=[]
  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    delayed = state.queue.add_delayed_method name, parameters, delay, every
    delayed.uuid
  end
end

#schedule_step(auton_id, step_name, parameters = [], callback_auton_id = nil, callback_method = nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/nestene/actor/core.rb', line 43

def schedule_step auton_id, step_name, parameters=[], callback_auton_id=nil, callback_method=nil
  step_id = SecureRandom.uuid
  Celluloid::Actor["storage:%s" % auton_id].update do |state|
    method = ScheduledMethod.new
    method.name = step_name
    method.parameters = parameters
    method.uuid = step_id
    if callback_auton_id && callback_method
      method.callback = Callback.new
      method.callback.name = callback_method
      method.callback.auton_id = callback_auton_id
    end
    state.queue.to_execute << method
  end
  step_id
end

#set_credentials(credentials) ⇒ Object



24
25
26
# File 'lib/nestene/actor/core.rb', line 24

def set_credentials credentials
  @storage.store('__credentials__',credentials)
end

#wait_for_execution_result(auton_id, method_uuid) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/nestene/actor/core.rb', line 104

def wait_for_execution_result auton_id, method_uuid

  future = nil

  executed = nil


  exclusive do

    state = Celluloid::Actor["storage:%s" % auton_id].get

    unless @execution_futures.has_key?(auton_id)
      @execution_futures[auton_id] = Celluloid::Future.new
    end

    future = @execution_futures[auton_id]

    executed = state ? state.queue.executed.find{|m| m.uuid == method_uuid} : nil

  end

  until executed

    executed = future.value
    state = Celluloid::Actor["storage:%s" % auton_id].get
    executed = state.queue.executed.find{|m| m.uuid == method_uuid}
  end

  if executed.error
    abort executed.error
  else
    return executed.result
  end

end