Class: RightScale::Cook
Defined Under Namespace
Classes: BlockingError, TagError
Constant Summary collapse
- AGENT_NAME =
Name of agent running the cook process
'instance'
Class Method Summary collapse
-
.instance ⇒ Object
Access cook instance from anywhere to send requests to core through command protocol.
Instance Method Summary collapse
-
#add_tag(tag_name, timeout) ⇒ Object
Add given tag to tags exposed by corresponding server.
- #check_for_missing_inputs(bundle) ⇒ Object
-
#has_default_thread? ⇒ Boolean
Determines if the current cook process has the default thread for purposes of concurrency with non-defaulted cooks.
-
#load_tags(timeout) ⇒ Object
Retrieve current instance tags.
-
#query_tags(tags, agent_ids = nil, timeout = 120) ⇒ Object
Add given tag to tags exposed by corresponding server.
-
#remove_tag(tag_name, timeout) ⇒ Object
Remove given tag from tags exposed by corresponding server.
-
#run ⇒ Object
Run bundle given in stdin.
-
#send_push(type, payload = nil, target = nil, opts = {}) ⇒ Object
Helper method to send a request to one or more targets with no response expected See InstanceCommands for details.
Class Method Details
.instance ⇒ Object
Access cook instance from anywhere to send requests to core through command protocol
217 218 219 |
# File 'lib/instance/cook/cook.rb', line 217 def self.instance @@instance end |
Instance Method Details
#add_tag(tag_name, timeout) ⇒ Object
Add given tag to tags exposed by corresponding server
Parameters
- tag(String)
-
Tag to be added
- timeout(Fixnum)
-
Number of seconds to wait for agent response
Return
- true
-
Always return true
170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/instance/cook/cook.rb', line 170 def add_tag(tag_name, timeout) cmd = { :name => :add_tag, :tag => tag_name } response = blocking_request(cmd, timeout) result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}")) if result.success? ::Chef::Log.info("Successfully added tag #{tag_name}") else raise TagError.new("Add tag failed: #{result.content}") end true end |
#check_for_missing_inputs(bundle) ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/instance/cook/cook.rb', line 117 def check_for_missing_inputs(bundle) pending_executables = bundle.executables.select { |e| !e.ready } unless pending_executables.empty? pending_executables.each do |e| missing_input_names = e.input_flags.collect {|k,v| k if v.member?("unready")}.compact AuditStub.instance.append_info("Following inputs used by '#{e.nickname} are missing': #{missing_input_names.join(", ")}") end fail("Execution failed", "Missing inputs") end end |
#has_default_thread? ⇒ Boolean
Determines if the current cook process has the default thread for purposes of concurrency with non-defaulted cooks.
130 131 132 |
# File 'lib/instance/cook/cook.rb', line 130 def has_default_thread? ::RightScale::AgentConfig.default_thread_name == @thread_name end |
#load_tags(timeout) ⇒ Object
Retrieve current instance tags
Parameters
- timeout(Fixnum)
-
Number of seconds to wait for agent response
206 207 208 209 210 211 212 213 |
# File 'lib/instance/cook/cook.rb', line 206 def (timeout) cmd = { :name => :get_tags } res = blocking_request(cmd, timeout) raise TagError.new("Retrieving current tags failed: #{res.inspect}") unless res.kind_of?(Array) ::Chef::Log.info("Successfully loaded current tags: '#{res.join("', '")}'") res end |
#query_tags(tags, agent_ids = nil, timeout = 120) ⇒ Object
Add given tag to tags exposed by corresponding server
Parameters
- tag(String)
-
Tag to be added
Return
- result(Hash)
-
contents of response
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/instance/cook/cook.rb', line 149 def (, agent_ids=nil, timeout=120) cmd = { :name => :query_tags, :tags => } cmd[:agent_ids] = agent_ids unless agent_ids.nil? || agent_ids.empty? response = blocking_request(cmd, timeout) begin result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}")) raise TagError.new("Query tags failed: #{result.content}") unless result.success? return result.content rescue raise TagError.new("Query tags failed: #{response.inspect}") end end |
#remove_tag(tag_name, timeout) ⇒ Object
Remove given tag from tags exposed by corresponding server
Parameters
- tag(String)
-
Tag to be removed
- timeout(Fixnum)
-
Number of seconds to wait for agent response
Return
- true
-
Always return true
190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/instance/cook/cook.rb', line 190 def remove_tag(tag_name, timeout) cmd = { :name => :remove_tag, :tag => tag_name } response = blocking_request(cmd, timeout) result = OperationResult.from_results(load(response, "Unexpected response #{response.inspect}")) if result.success? ::Chef::Log.info("Successfully removed tag #{tag_name}") else raise TagError.new("Remove tag failed: #{result.content}") end true end |
#run ⇒ Object
Run bundle given in stdin
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/instance/cook/cook.rb', line 42 def run # 1. Load configuration settings = OptionsBag.load agent_id = [:identity] AgentConfig.root_dir = [:root_dir] Log.program_name = 'RightLink' Log.facility = 'user' Log.log_to_file_only([:log_to_file_only]) Log.init(agent_id, [:log_path]) Log.level = CookState.log_level # add an additional logger if the agent is set to log to an alternate # location (install, operate, decommission, ...) Log.add_logger(::Logger.new(CookState.log_file)) if CookState.log_file Log.info("[cook] Process starting up with dev tags: [#{CookState.startup_tags.select { |tag| tag.include?(CookState::DEV_TAG_NAMESPACE)}.join(', ')}]") fail('Missing command server listen port') unless [:listen_port] fail('Missing command cookie') unless [:cookie] @client = CommandClient.new([:listen_port], [:cookie]) ShutdownRequestProxy.init(@client) # 2. Retrieve bundle input = gets.chomp begin bundle = RightScale::MessageEncoder.for_agent(agent_id).decode(input) rescue Exception => e fail('Invalid bundle', e.) end fail('Missing bundle', 'No bundle to run') if bundle.nil? @thread_name = bundle.runlist_policy.thread_name if bundle.respond_to?(:runlist_policy) && bundle.runlist_policy @thread_name ||= RightScale::AgentConfig.default_thread_name [:thread_name] = @thread_name # Chef state needs the server secret so it can encrypt state on disk. # The secret is the same for all instances of the server (i.e. is still # valid after stop and restart server). server_secret = bundle.server_secret || AgentConfig.default_server_secret ChefState.init(agent_id, server_secret, reset=false) # 3. Run bundle @@instance = self success = nil Log.debug("[cook] Thread name associated with bundle = #{@thread_name}") gatherer = ExternalParameterGatherer.new(bundle, ) sequence = ExecutableSequence.new(bundle) EM.threadpool_size = 1 EM.error_handler do |e| Log.error("Execution failed", e, :trace) fail('Exception caught', "The following exception was caught during execution:\n #{e.message}") end EM.run do begin AuditStub.instance.init() check_for_missing_inputs(bundle) gatherer.callback { EM.defer { sequence.run } } gatherer.errback { success = false; report_failure(gatherer) } sequence.callback { success = true; send_inputs_patch(sequence) } sequence.errback { success = false; report_failure(sequence) } EM.defer { gatherer.run } rescue Exception => e fail('Execution failed', Log.format("Execution failed", e, :trace)) end end rescue Exception => e fail('Execution failed', Log.format("Run failed", e, :trace)) ensure Log.info("[cook] Process stopping") exit(1) unless success end |
#send_push(type, payload = nil, target = nil, opts = {}) ⇒ Object
Helper method to send a request to one or more targets with no response expected See InstanceCommands for details
136 137 138 139 140 |
# File 'lib/instance/cook/cook.rb', line 136 def send_push(type, payload = nil, target = nil, opts = {}) cmd = {:name => :send_push, :type => type, :payload => payload, :target => target, :options => opts} # Need to execute on EM main thread where command client is running EM.next_tick { @client.send_command(cmd) } end |