Class: Pione::Agent::BasicAgent

Inherits:
PioneObject
  • Object
show all
Extended by:
StateTransitionSingletonMethod
Includes:
DRbUndumped
Defined in:
lib/pione/agent/basic-agent.rb

Overview

BasicAgent is a super class for all PIONE agents.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from StateTransitionSingletonMethod

chain, define_exception_handler, define_transition, exception_handler, transition_chain

Constructor Details

#initializeBasicAgent

Returns a new instance of BasicAgent.



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/pione/agent/basic-agent.rb', line 115

def initialize
  @chain_threads = ThreadGroup.new

  # for wait_until_before method
  @__wait_until_before_mutex__ = Mutex.new
  @__wait_until_before_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}

  # for wait_until_after method
  @__wait_until_after_mutex__ = Mutex.new
  @__wait_until_after_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}
end

Instance Attribute Details

#chain_threadsObject (readonly)

transition chain thread group



113
114
115
# File 'lib/pione/agent/basic-agent.rb', line 113

def chain_threads
  @chain_threads
end

Class Method Details

.agent_typeObject

Return the agent type.



104
105
106
# File 'lib/pione/agent/basic-agent.rb', line 104

def self.agent_type
  @agent_type
end

.inherited(subclass) ⇒ Object

class methods



91
92
93
94
95
# File 'lib/pione/agent/basic-agent.rb', line 91

def self.inherited(subclass)
  subclass.instance_variable_set(:@transitions, @transitions.clone)
  subclass.instance_variable_set(:@transition_chain, @transition_chain.clone)
  subclass.instance_variable_set(:@exception_handler, @exception_handler.clone)
end

.set_agent_type(agent_type, klass = nil) ⇒ Object

Set the agent type.



98
99
100
101
# File 'lib/pione/agent/basic-agent.rb', line 98

def self.set_agent_type(agent_type, klass=nil)
  @agent_type = agent_type
  Agent.set_agent(klass) if klass
end

Instance Method Details

#startObject

Start agent activity.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/pione/agent/basic-agent.rb', line 128

def start
  unless @chain_threads.list.empty?
    raise TerminationError.new(self, states)
  end

  # save current thread
  @__owner_thread__ = Thread.current

  # start a new chain thread
  @chain_threads.add(start_running(:init, [], AgentState.new, true))
  @chain_threads.enclose

  return self
end

#statesObject

Return agent states.



187
188
189
# File 'lib/pione/agent/basic-agent.rb', line 187

def states
  @chain_threads.list.map {|th| th[:agent_state]}
end

#terminateObject

Terminate the agent activity.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/pione/agent/basic-agent.rb', line 228

def terminate
  state = nil

  Thread.new {
    # kill all chain threads
    @chain_threads.list.each do |thread|
      state = thread[:agent_state] # save last state
      unless thread == Thread.current
        thread.kill
        thread.join
      end
    end

    # fire "terminate" transtion
    begin
      Thread.current[:agent_state] = state || AgentState.new
      transit(:terminate, [])
    rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e
      Log::Debug.warn("raised a connection error when we terminated", self, e)
    end
  }.join
end

#terminated?Boolean

Return true if the agent has been terminated.

Returns:

  • (Boolean)


252
253
254
# File 'lib/pione/agent/basic-agent.rb', line 252

def terminated?
  return (@chain_threads.list.empty? and @chain_threads.enclosed?)
end

#transit(transition, transition_inputs) ⇒ Object

Fire the transtion with inputs.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/pione/agent/basic-agent.rb', line 144

def transit(transition, transition_inputs)
  # wake up threads that wait by wait_until_before method
  if @__wait_until_before_cv__.has_key?(transition)
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].broadcast
    end
  end

  # mark current transition
  Thread.current[:agent_state] =
    AgentState.new(previous: Thread.current[:agent_state].previous, current: transition)

  # call transition
  result = call_transition_method(transition, transition_inputs)
  result = result.nil? ? [] : result
  result = result.is_a?(Array) ? result : [result]

  # unmark current transition and mark previous transition
  Thread.current[:agent_state] = AgentState.new(previous: transition, current: nil)

  # wake up threads that wait by wait_until_after method
  if @__wait_until_after_cv__.has_key?(transition)
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].broadcast
    end
  end

  return transition, result
rescue StandardError => e
  # error handling
  if error_transition = get_exception_handler(e)
    raise unless error_transition.is_a?(Symbol)
    return transit(error_transition, [e])
  else
    if @__owner_thread and @__owner_thread__.alive?
      @__owner_thread__.raise e
    else
      raise e
    end
  end
end

#wait_until(transition, sec = 10) ⇒ Object



202
203
204
205
206
# File 'lib/pione/agent/basic-agent.rb', line 202

def wait_until(transition, sec=10)
  unless @chain_threads.list.any? {|th| th[:agent_state] and th[:agent_state].current == transition}
    wait_until_before(transition, sec)
  end
end

#wait_until_after(transition, sec = 10) ⇒ Object

Sleep until after the agent fires the transition.



209
210
211
212
213
214
215
216
217
# File 'lib/pione/agent/basic-agent.rb', line 209

def wait_until_after(transition, sec=10)
  timeout(sec) do
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].wait(@__wait_until_after_mutex__)
    end
  end
rescue Timeout::Error => e
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end

#wait_until_before(transition, sec = 10) ⇒ Object

Sleep until before the agent fires the transition.



192
193
194
195
196
197
198
199
200
# File 'lib/pione/agent/basic-agent.rb', line 192

def wait_until_before(transition, sec=10)
  timeout(sec) do
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].wait(@__wait_until_before_mutex__)
    end
  end
rescue Timeout::Error
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end

#wait_until_terminated(sec = 10) ⇒ Object

Sleep caller thread until the agent is terminated.



220
221
222
223
224
225
# File 'lib/pione/agent/basic-agent.rb', line 220

def wait_until_terminated(sec=10)
  unless terminated?
    wait_until_after(:terminate, sec)
    @chain_threads.list.each {|thread| thread.join}
  end
end