Class: RightScale::Cook

Inherits:
Object show all
Defined in:
lib/instance/cook/cook.rb

Defined Under Namespace

Classes: BlockingError, TagError

Constant Summary collapse

AGENT_NAME =

Name of agent running the cook process

'instance'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.instanceObject

Access cook instance from anywhere to send requests to core through command protocol



217
218
219
# File 'lib/instance/cook/cook.rb', line 217

def self.instance
  @@instance
end

Instance Method Details

#add_tag(tag_name, timeout) ⇒ Object

Add given tag to tags exposed by corresponding server

Parameters

tag(String)

Tag to be added

timeout(Fixnum)

Number of seconds to wait for agent response

Return

true

Always return true



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/instance/cook/cook.rb', line 170

def add_tag(tag_name, timeout)
  cmd = { :name => :add_tag, :tag => tag_name }
  response = blocking_request(cmd, timeout)
  result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}"))
  if result.success?
    ::Chef::Log.info("Successfully added tag #{tag_name}")
  else
    raise TagError.new("Add tag failed: #{result.content}")
  end
  true
end

#check_for_missing_inputs(bundle) ⇒ Object



117
118
119
120
121
122
123
124
125
126
# File 'lib/instance/cook/cook.rb', line 117

def check_for_missing_inputs(bundle)
  pending_executables = bundle.executables.select { |e| !e.ready }
  unless pending_executables.empty?
    pending_executables.each do |e|
      missing_input_names = e.input_flags.collect {|k,v| k if v.member?("unready")}.compact
      AuditStub.instance.append_info("Following inputs used by '#{e.nickname} are missing': #{missing_input_names.join(", ")}")
    end
    fail("Execution failed", "Missing inputs")
  end
end

#has_default_thread?Boolean

Determines if the current cook process has the default thread for purposes of concurrency with non-defaulted cooks.

Returns:

  • (Boolean)


130
131
132
# File 'lib/instance/cook/cook.rb', line 130

def has_default_thread?
  ::RightScale::AgentConfig.default_thread_name == @thread_name
end

#load_tags(timeout) ⇒ Object

Retrieve current instance tags

Parameters

timeout(Fixnum)

Number of seconds to wait for agent response

Raises:



206
207
208
209
210
211
212
213
# File 'lib/instance/cook/cook.rb', line 206

def load_tags(timeout)
  cmd = { :name => :get_tags }
  res = blocking_request(cmd, timeout)
  raise TagError.new("Retrieving current tags failed: #{res.inspect}") unless res.kind_of?(Array)

  ::Chef::Log.info("Successfully loaded current tags: '#{res.join("', '")}'")
  res
end

#query_tags(tags, agent_ids = nil, timeout = 120) ⇒ Object

Add given tag to tags exposed by corresponding server

Parameters

tag(String)

Tag to be added

Return

result(Hash)

contents of response



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/instance/cook/cook.rb', line 149

def query_tags(tags, agent_ids=nil, timeout=120)
  cmd = { :name => :query_tags, :tags => tags }
  cmd[:agent_ids] = agent_ids unless agent_ids.nil? || agent_ids.empty?
  response = blocking_request(cmd, timeout)
  begin
    result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}"))
    raise TagError.new("Query tags failed: #{result.content}") unless result.success?
    return result.content
  rescue
    raise TagError.new("Query tags failed: #{response.inspect}")
  end
end

#remove_tag(tag_name, timeout) ⇒ Object

Remove given tag from tags exposed by corresponding server

Parameters

tag(String)

Tag to be removed

timeout(Fixnum)

Number of seconds to wait for agent response

Return

true

Always return true



190
191
192
193
194
195
196
197
198
199
200
# File 'lib/instance/cook/cook.rb', line 190

def remove_tag(tag_name, timeout)
  cmd = { :name => :remove_tag, :tag => tag_name }
  response = blocking_request(cmd, timeout)
  result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}"))
  if result.success?
    ::Chef::Log.info("Successfully removed tag #{tag_name}")
  else
    raise TagError.new("Remove tag failed: #{result.content}")
  end
  true
end

#runObject

Run bundle given in stdin



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/instance/cook/cook.rb', line 42

def run
  # 1. Load configuration settings
  options = OptionsBag.load
  agent_id  = options[:identity]
  AgentConfig.root_dir = options[:root_dir]

  Log.program_name = 'RightLink'
  Log.facility = 'user'
  Log.log_to_file_only(options[:log_to_file_only])
  Log.init(agent_id, options[:log_path])
  Log.level = CookState.log_level
  # add an additional logger if the agent is set to log to an alternate 
  # location (install, operate, decommission, ...)
  Log.add_logger(::Logger.new(CookState.log_file)) if CookState.log_file

  Log.info("[cook] Process starting up with dev tags: [#{CookState.startup_tags.select { |tag| tag.include?(CookState::DEV_TAG_NAMESPACE)}.join(', ')}]")
  fail('Missing command server listen port') unless options[:listen_port]
  fail('Missing command cookie') unless options[:cookie]
  @client = CommandClient.new(options[:listen_port], options[:cookie])
  ShutdownRequestProxy.init(@client)

  # 2. Retrieve bundle
  input = gets.chomp
  begin
    bundle = RightScale::MessageEncoder.for_agent(agent_id).decode(input)
  rescue Exception => e
    fail('Invalid bundle', e.message)
  end

  fail('Missing bundle', 'No bundle to run') if bundle.nil?

  @thread_name = bundle.runlist_policy.thread_name if bundle.respond_to?(:runlist_policy) && bundle.runlist_policy
  @thread_name ||= RightScale::AgentConfig.default_thread_name
  options[:thread_name] = @thread_name

  # Chef state needs the server secret so it can encrypt state on disk.
  # The secret is the same for all instances of the server (i.e. is still
  # valid after stop and restart server).
  server_secret = bundle.server_secret || AgentConfig.default_server_secret
  ChefState.init(agent_id, server_secret, reset=false)

  # 3. Run bundle
  @@instance = self
  success = nil
  Log.debug("[cook] Thread name associated with bundle = #{@thread_name}")
  gatherer = ExternalParameterGatherer.new(bundle, options)
  sequence = ExecutableSequence.new(bundle)
  EM.threadpool_size = 1
  EM.error_handler do |e|
    Log.error("Execution failed", e, :trace)
    fail('Exception caught', "The following exception was caught during execution:\n  #{e.message}")
  end
  EM.run do
    begin
      AuditStub.instance.init(options)
      check_for_missing_inputs(bundle)
      gatherer.callback { EM.defer { sequence.run } }
      gatherer.errback { success = false; report_failure(gatherer) }
      sequence.callback { success = true; send_inputs_patch(sequence) }
      sequence.errback { success = false; report_failure(sequence) }

      EM.defer { gatherer.run }
    rescue Exception => e
      fail('Execution failed', Log.format("Execution failed", e, :trace))
    end
  end

rescue Exception => e
  fail('Execution failed', Log.format("Run failed", e, :trace))

ensure
  Log.info("[cook] Process stopping")
  exit(1) unless success
end

#send_push(type, payload = nil, target = nil, opts = {}) ⇒ Object

Helper method to send a request to one or more targets with no response expected See InstanceCommands for details



136
137
138
139
140
# File 'lib/instance/cook/cook.rb', line 136

def send_push(type, payload = nil, target = nil, opts = {})
  cmd = {:name => :send_push, :type => type, :payload => payload, :target => target, :options => opts}
  # Need to execute on EM main thread where command client is running
  EM.next_tick { @client.send_command(cmd) }
end