Class: Pione::Agent::TaskWorker

Inherits:
TupleSpaceClient show all
Defined in:
lib/pione/agent/task-worker.rb

Overview

TaskWorker is an agent to process tasks

Instance Attribute Summary collapse

Attributes inherited from BasicAgent

#chain_threads

Instance Method Summary collapse

Methods inherited from TupleSpaceClient

#bye, #call_transition_method, #hello, #transit_to_init, #transit_to_terminate

Methods included from TupleSpaceClientOperation

#base_location, #bye, #finalize, #hello, #notify_exception, #read, #take

Methods included from TupleSpace::TupleSpaceInterface

#process_log, #processing_error, #read!, #set_tuple_space, #take!, #take_all!, tuple_space_operation, #tuple_space_server, #with_process_log

Methods inherited from BasicAgent

agent_type, inherited, set_agent_type, #start, #states, #terminate, #terminated?, #transit, #wait_until, #wait_until_after, #wait_until_before, #wait_until_terminated

Methods included from StateTransitionSingletonMethod

#chain, #define_exception_handler, #define_transition, #exception_handler, #start, #transition_chain

Constructor Details

#initialize(tuple_space, features, env = nil) ⇒ TaskWorker

Returns a new instance of TaskWorker.



15
16
17
18
19
20
# File 'lib/pione/agent/task-worker.rb', line 15

def initialize(tuple_space, features, env=nil)
  super(tuple_space)
  @tuple_space = tuple_space
  @features = features
  @env = env || get_environment
end

Instance Attribute Details

#execution_threadObject (readonly)

Returns the value of attribute execution_thread.



12
13
14
# File 'lib/pione/agent/task-worker.rb', line 12

def execution_thread
  @execution_thread
end

#onceObject

the agent will be killed at task completion if true



13
14
15
# File 'lib/pione/agent/task-worker.rb', line 13

def once
  @once
end

#tuple_spaceObject (readonly)

instance methods



11
12
13
# File 'lib/pione/agent/task-worker.rb', line 11

def tuple_space
  @tuple_space
end

Instance Method Details

#get_environmentObject

Get a environment object from tuple space.



114
115
116
117
118
119
120
# File 'lib/pione/agent/task-worker.rb', line 114

def get_environment
  if env = read!(TupleSpace::EnvTuple.new)
    env.obj
  else
    raise TupleSpaceError.new("the tuple space is invalid because \"env\" tuple not found.")
  end
end

#make_engine(task) ⇒ Object

Make an engine from the task.



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pione/agent/task-worker.rb', line 123

def make_engine(task)
  RuleEngine.make(
    @tuple_space,
    @env,
    task.package_id,
    task.rule_name,
    task.inputs,
    task.param_set,
    task.domain_id,
    task.caller_id
  )
end

#spawn_child_task_worker(task) ⇒ Object

Spawn child task worker. This method repeats to create a child agent while rule execution thread is alive.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/pione/agent/task-worker.rb', line 138

def spawn_child_task_worker(task)
  child_agent = nil
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  # child worker loop
  while @execution_thread.alive? do
    if @execution_thread.status == "sleep"
      if child_agent.nil? or child_agent.terminated?
        # when there isn't active child agent
        child_agent = self.class.new(tuple_space_server, @features, @env)
        child_agent.once = true

        # make log record
        record = Log::CreateChildTaskWorkerProcessRecord.new.tap do |x|
          x.parent = uuid
          x.child = child_agent.uuid
        end

        # spawn child agent with logging
        with_process_log(record) do
          # turn background
          take!(foreground)
          # start child agent
          child_agent.start
        end

        # wait until the child agent completes the task
        child_agent.wait_until_terminated(nil)
      end
    else
      sleep 0.1 # FIXME : rewrite this sleep by more sophisticated way
    end
  end

  # turn foreground
  write(foreground) unless read!(foreground)
end

#transit_to_connection_error(e) ⇒ Object

Report the connection error.



105
106
107
# File 'lib/pione/agent/task-worker.rb', line 105

def transit_to_connection_error(e)
  Log::SystemLog.warn("task worker agent was disconnected from tuple space unexpectedly, goes to termination.")
end

#transit_to_execute_task(task) ⇒ Object

Execute the task.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/pione/agent/task-worker.rb', line 72

def transit_to_execute_task(task)
  # setup rule engine
  engine = make_engine(task)

  # start the engine
  @execution_thread = Thread.new do
    begin
      engine.handle
    rescue RuleEngine::ActionError, Lang::MethodNotFound => e
      write(TupleSpace::CommandTuple.new("terminate", [System::Status.error(e)]))
      terminate
    end
  end

  # spawn child task worker if flow
  if engine.rule_definition.rule_type == "flow"
    spawn_child_task_worker(task)
  end

  # wait until the engine ends
  @execution_thread.join

  # go next transition
  return task
end

#transit_to_finalize_task(task) ⇒ Object

Finalize the task. This method will turn working flag off and background.



99
100
101
102
# File 'lib/pione/agent/task-worker.rb', line 99

def transit_to_finalize_task(task)
  take!(TupleSpace::WorkingTuple.new(task.domain_id, task.digest))
  take!(TupleSpace::ForegroundTuple.new(task.domain_id, task.digest))
end

#transit_to_init_task(task) ⇒ Object

Initialize the task.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/pione/agent/task-worker.rb', line 52

def transit_to_init_task(task)
  # make flag tuples
  working = TupleSpace::WorkingTuple.new(task.domain_id, task.digest)
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  if read!(working)
    # the task is working already, so we will dicard the task
    raise Restart.new
  else
    # turn foreground flag on
    write(working)
    write(foreground)
    # go next transition
    return task
  end
rescue Rinda::RedundantTupleError
  raise Restart.new
end

#transit_to_take_taskObject

Take a task and turn it to foreground.



47
48
49
# File 'lib/pione/agent/task-worker.rb', line 47

def transit_to_take_task
  return take(TupleSpace::TaskTuple.new(features: @features))
end