Class: Wakame::Action

Inherits:
Object
  • Object
show all
Includes:
AttributeHelper, ThreadImmutable
Defined in:
lib/wakame/action.rb

Defined Under Namespace

Classes: ProcAction

Constant Summary

Constants included from AttributeHelper

AttributeHelper::CLASS_TYPE_KEY, AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods included from AttributeHelper

#dump_attrs, #retrieve_attr_attribute

Instance Attribute Details

#action_managerObject

Returns the value of attribute action_manager.



11
12
13
# File 'lib/wakame/action.rb', line 11

def action_manager
  @action_manager
end

Instance Method Details

#acquire_lock(*args) ⇒ Object

Set the lock flags to arbitrary object names.

acquire_lock('Lock Target 1', 'Lock Target 2')
acquire_lock(%w(aaaa bbbb cccc))


132
133
134
135
136
# File 'lib/wakame/action.rb', line 132

def acquire_lock(*args)
  args.flatten.each {|r| action_manager.lock_queue.set(r.to_s, self.job_id) }
  
  action_manager.lock_queue.wait(self.job_id)
end

#actor_request(agent_id, path, *args, &blk) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/wakame/action.rb', line 111

def actor_request(agent_id, path, *args, &blk)
  request = master.actor_request(agent_id, path, *args)
  if blk
    request.request
    blk.call(request)
  end
  request
end

#agent_monitorObject



17
18
19
# File 'lib/wakame/action.rb', line 17

def agent_monitor
  master.agent_monitor
end

#all_subactions_complete?Boolean

Returns:

  • (Boolean)


78
79
80
81
82
83
84
# File 'lib/wakame/action.rb', line 78

def all_subactions_complete?
  subactions.each { |a|
    #Wakame.log.debug("#{a.class}.status=#{a.status}")
    return false unless a.status == :complete && a.all_subactions_complete?
  }
  true
end

#flush_subactions(sec = 60*30) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/wakame/action.rb', line 59

def flush_subactions(sec=60*30)
  job_context = action_manager.active_jobs[self.job_id]
  return if job_context.nil?
  
  timeout(sec) {
    until all_subactions_complete?
      #Wakame.log.debug "#{self.class} all_subactions_complete?=#{all_subactions_complete?}"
      Thread.pass
      src = notify_queue.deq
      # Exit the current action when a subaction notified exception.
      if src.is_a?(Exception)
        raise src
      end
      #Wakame.log.debug "#{self.class} notified by #{src.class}, all_subactions_complete?=#{all_subactions_complete?}"
    end
  }
end

#masterObject



13
14
15
# File 'lib/wakame/action.rb', line 13

def master
  action_manager.master
end

#notesObject



125
126
127
# File 'lib/wakame/action.rb', line 125

def notes
  action_manager.active_jobs[self.job_id][:notes]
end

#notify(src = nil) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/wakame/action.rb', line 91

def notify(src=nil)
  #Wakame.log.debug("#{self.class}.notify() has been called")
  src = self if src.nil?
  if status == :complete && parent_action
    # Escalate the notification to parent if the action is finished.
    parent_action.notify(src)
  else
    notify_queue.clear if notify_queue.size > 0
    notify_queue.enq(src) #if notify_queue.num_waiting > 0
  end
end

#on_canceledObject



146
147
# File 'lib/wakame/action.rb', line 146

def on_canceled
end

#on_failedObject



143
144
# File 'lib/wakame/action.rb', line 143

def on_failed
end

#runObject

Raises:

  • (NotImplementedError)


139
140
141
# File 'lib/wakame/action.rb', line 139

def run
  raise NotImplementedError
end

#service_clusterObject Also known as: cluster

Tentative utility method for



22
23
24
25
26
27
28
29
# File 'lib/wakame/action.rb', line 22

def service_cluster
  StatusDB.barrier {
    cluster_id = master.cluster_manager.clusters.first
    raise "There is no cluster loaded" if cluster_id.nil?

    Service::ServiceCluster.find(cluster_id)
  }
end

#status=(status) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/wakame/action.rb', line 32

def status=(status)
  if @status != status
    @status = status
    # Notify to observers after updating the attribute
    notify
  end
  @status
end

#subactionsObject



42
43
44
# File 'lib/wakame/action.rb', line 42

def subactions
  @subactions ||= []
end

#sync_actor_request(agent_id, path, *args) ⇒ Object



120
121
122
123
# File 'lib/wakame/action.rb', line 120

def sync_actor_request(agent_id, path, *args)
  request = actor_request(agent_id, path, *args).request
  request.wait
end

#trigger_action(subaction = nil, &blk) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/wakame/action.rb', line 46

def trigger_action(subaction=nil, &blk)
  if blk 
    subaction = ProcAction.new(blk)
  end

  subactions << subaction
  subaction.parent_action = self
  subaction.job_id = self.job_id
  subaction.action_manager = self.action_manager
  
  action_manager.run_action(subaction)
end

#walk_subactions(&blk) ⇒ Object

Recursively iterate the sub action descendants.



104
105
106
107
108
109
# File 'lib/wakame/action.rb', line 104

def walk_subactions(&blk)
  blk.call(self)
  self.subactions.each{ |a|
    a.walk_subactions(&blk)
  }
end