Class: Pione::Agent::TaskWorker
- Inherits:
-
TupleSpaceClient
- Object
- PioneObject
- BasicAgent
- TupleSpaceClient
- Pione::Agent::TaskWorker
- Defined in:
- lib/pione/agent/task-worker.rb
Overview
TaskWorker is an agent to process tasks
Constant Summary
Constants included from Log::MessageLog
Log::MessageLog::MESSAGE_QUEUE
Instance Attribute Summary collapse
-
#execution_thread ⇒ Object
readonly
Returns the value of attribute execution_thread.
-
#once ⇒ Object
the agent will be killed at task completion if true.
-
#tuple_space ⇒ Object
readonly
instance methods.
Attributes inherited from BasicAgent
Instance Method Summary collapse
-
#get_environment ⇒ Object
Get a environment object from tuple space.
-
#initialize(tuple_space, features, env = nil) ⇒ TaskWorker
constructor
A new instance of TaskWorker.
-
#make_engine(task) ⇒ Object
Make an engine from the task.
-
#spawn_child_task_worker(task) ⇒ Object
Spawn child task worker.
-
#transit_to_connection_error(e) ⇒ Object
Report the connection error.
-
#transit_to_execute_task(task) ⇒ Object
Execute the task.
-
#transit_to_finalize_task(task) ⇒ Object
Finalize the task.
-
#transit_to_init ⇒ Object
transitions.
-
#transit_to_init_task(task) ⇒ Object
Initialize the task.
-
#transit_to_take_task ⇒ Object
Take a task and turn it to foreground.
Methods inherited from TupleSpaceClient
#bye, #call_transition_method, #hello, #transit_to_terminate
Methods included from Log::MessageLog
#debug_message, #debug_message_begin, #debug_message_end, debug_mode, debug_mode=, debug_mode?, message, quiet_mode, quiet_mode=, quiet_mode?, #show, #user_message, #user_message_begin, #user_message_end
Methods included from TupleSpaceClientOperation
#base_location, #bye, #finalize, #hello, #notify_exception, #read, #take
Methods included from TupleSpace::TupleSpaceInterface
#process_log, #processing_error, #set_tuple_space, tuple_space_operation, #tuple_space_server, #with_process_log
Methods inherited from BasicAgent
agent_type, inherited, set_agent_type, #start, #start!, #states, #terminate, #terminated?, #transit, #wait_until, #wait_until_after, #wait_until_before, #wait_until_terminated
Methods included from StateTransitionSingletonMethod
#chain, #define_exception_handler, #define_transition, #exception_handler, #start, #transition_chain
Constructor Details
#initialize(tuple_space, features, env = nil) ⇒ TaskWorker
Returns a new instance of TaskWorker.
26 27 28 29 30 31 |
# File 'lib/pione/agent/task-worker.rb', line 26 def initialize(tuple_space, features, env=nil) super(tuple_space) @tuple_space = tuple_space @features = features @env = env || get_environment end |
Instance Attribute Details
#execution_thread ⇒ Object (readonly)
Returns the value of attribute execution_thread.
12 13 14 |
# File 'lib/pione/agent/task-worker.rb', line 12 def execution_thread @execution_thread end |
#once ⇒ Object
the agent will be killed at task completion if true
13 14 15 |
# File 'lib/pione/agent/task-worker.rb', line 13 def once @once end |
#tuple_space ⇒ Object (readonly)
instance methods
11 12 13 |
# File 'lib/pione/agent/task-worker.rb', line 11 def tuple_space @tuple_space end |
Instance Method Details
#get_environment ⇒ Object
Get a environment object from tuple space.
127 128 129 130 131 132 133 |
# File 'lib/pione/agent/task-worker.rb', line 127 def get_environment if env = read!(TupleSpace::EnvTuple.new) env.obj else raise TupleSpaceError.new("the tuple space is invalid because \"env\" tuple not found.") end end |
#make_engine(task) ⇒ Object
Make an engine from the task.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/pione/agent/task-worker.rb', line 136 def make_engine(task) param = { :tuple_space => @tuple_space, :env => @env, :package_id => task.package_id, :rule_name => task.rule_name, :inputs => task.inputs, :param_set => task.param_set, :domain_id => task.domain_id, :caller_id => task.caller_id, :request_from => @request_from, :session_id => @session_id, :client_ui => @client_ui } RuleEngine.make(param) end |
#spawn_child_task_worker(task) ⇒ Object
Spawn child task worker. This method repeats to create a child agent while rule execution thread is alive.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/pione/agent/task-worker.rb', line 156 def spawn_child_task_worker(task) child_agent = nil foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest) # child worker loop while @execution_thread.alive? do if @execution_thread.status == "sleep" if child_agent.nil? or child_agent.terminated? # when there isn't active child agent child_agent = self.class.new(tuple_space_server, @features, @env) child_agent.once = true # make log record record = Log::CreateChildTaskWorkerProcessRecord.new.tap do |x| x.parent = uuid x.child = child_agent.uuid end # spawn child agent with logging with_process_log(record) do # turn background take!(foreground) # start child agent child_agent.start end # wait until the child agent completes the task child_agent.wait_until_terminated(nil) end else sleep 0.1 # FIXME : rewrite this sleep by more sophisticated way end end # turn foreground write(foreground) unless read!(foreground) end |
#transit_to_connection_error(e) ⇒ Object
Report the connection error.
118 119 120 |
# File 'lib/pione/agent/task-worker.rb', line 118 def transit_to_connection_error(e) Log::SystemLog.warn("task worker agent was disconnected from tuple space unexpectedly, goes to termination.") end |
#transit_to_execute_task(task) ⇒ Object
Execute the task.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/pione/agent/task-worker.rb', line 90 def transit_to_execute_task(task) # setup rule engine engine = make_engine(task) # start the engine @execution_thread = Thread.new do engine.handle || terminate end # spawn child task worker if flow if engine.rule_definition.rule_type == "flow" spawn_child_task_worker(task) end # wait until the engine ends @execution_thread.join # go next transition return task end |
#transit_to_finalize_task(task) ⇒ Object
Finalize the task. This method will turn working flag off and background.
112 113 114 115 |
# File 'lib/pione/agent/task-worker.rb', line 112 def transit_to_finalize_task(task) take!(TupleSpace::WorkingTuple.new(task.domain_id, task.digest)) take!(TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)) end |
#transit_to_init ⇒ Object
transitions
57 58 59 60 61 62 |
# File 'lib/pione/agent/task-worker.rb', line 57 def transit_to_init @request_from = @tuple_space.attribute("request_from") @session_id = @tuple_space.attribute("session_id") @client_ui = @tuple_space.attribute("client_ui") super end |
#transit_to_init_task(task) ⇒ Object
Initialize the task.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pione/agent/task-worker.rb', line 70 def transit_to_init_task(task) # make flag tuples working = TupleSpace::WorkingTuple.new(task.domain_id, task.digest) foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest) if read!(working) # the task is working already, so we will dicard the task raise Restart.new else # turn foreground flag on write(working) write(foreground) # go next transition return task end rescue Rinda::RedundantTupleError raise Restart.new end |
#transit_to_take_task ⇒ Object
Take a task and turn it to foreground.
65 66 67 |
# File 'lib/pione/agent/task-worker.rb', line 65 def transit_to_take_task return take(TupleSpace::TaskTuple.new(features: @features)) end |