Class: ConeyIsland::Submitter
- Inherits:
-
Object
- Object
- ConeyIsland::Submitter
- Defined in:
- lib/coney_island/submitter.rb
Overview
TODO: Refactor this to instantiate and use instance methods for ease of testing and thread safety.
Class Method Summary collapse
- .amqp_connection ⇒ Object
- .amqp_parameters ⇒ Object
- .amqp_parameters=(params) ⇒ Object
- .channel ⇒ Object
- .connected? ⇒ Boolean
- .connection ⇒ Object
- .connection=(conn) ⇒ Object
- .create_channel ⇒ Object
- .delay_exchange ⇒ Object
- .exchange ⇒ Object
- .handle_connection ⇒ Object
- .initialize_rabbit ⇒ Object
- .jobs_cache ⇒ Object
-
.publish_job(args, job_id = nil) ⇒ Object
TODO: Document me.
- .run_inline ⇒ Object
- .running_inline? ⇒ Boolean
- .start_connection ⇒ Object
- .stop_running_inline ⇒ Object
- .submit(*args) ⇒ Object
- .submit!(args, job_id = nil) ⇒ Object
- .tcp_connection_retries ⇒ Object
- .tcp_connection_retries=(number) ⇒ Object
Class Method Details
.amqp_connection ⇒ Object
146 147 148 |
# File 'lib/coney_island/submitter.rb', line 146 def self.amqp_connection @connection end |
.amqp_parameters ⇒ Object
88 89 90 91 92 93 94 |
# File 'lib/coney_island/submitter.rb', line 88 def self.amqp_parameters if ConeyIsland.single_amqp_connection? ConeyIsland.amqp_parameters else @amqp_parameters end end |
.amqp_parameters=(params) ⇒ Object
84 85 86 |
# File 'lib/coney_island/submitter.rb', line 84 def self.amqp_parameters=(params) @amqp_parameters = params end |
.channel ⇒ Object
68 69 70 |
# File 'lib/coney_island/submitter.rb', line 68 def self.channel @channel end |
.connected? ⇒ Boolean
96 97 98 |
# File 'lib/coney_island/submitter.rb', line 96 def self.connected? !!connection && connection.connected? end |
.connection ⇒ Object
60 61 62 |
# File 'lib/coney_island/submitter.rb', line 60 def self.connection @connection end |
.connection=(conn) ⇒ Object
56 57 58 |
# File 'lib/coney_island/submitter.rb', line 56 def self.connection=(conn) @connection = conn end |
.create_channel ⇒ Object
72 73 74 |
# File 'lib/coney_island/submitter.rb', line 72 def self.create_channel @channel = self.connection.create_channel end |
.delay_exchange ⇒ Object
80 81 82 |
# File 'lib/coney_island/submitter.rb', line 80 def self.delay_exchange @delay_exchange end |
.exchange ⇒ Object
76 77 78 |
# File 'lib/coney_island/submitter.rb', line 76 def self.exchange @exchange end |
.handle_connection ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/coney_island/submitter.rb', line 100 def self.handle_connection Rails.logger.info("ConeyIsland::Submitter.handle_connection connecting...") self.connection = Bunny.new(self.amqp_parameters) self.start_connection rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e self.tcp_connection_retries ||= 0 self.tcp_connection_retries += 1 if self.tcp_connection_retries >= ConeyIsland.tcp_connection_retry_limit = "Submitter Failed to connect to RabbitMQ #{ConeyIsland.tcp_connection_retry_limit} times, bailing out" Rails.logger.error() ConeyIsland.poke_the_badger(e, { code_source: 'ConeyIsland::Submitter.handle_connection', reason: } ) @connection = nil else = "Failed to connecto to RabbitMQ Attempt ##{self.tcp_connection_retries} time(s), trying again in #{ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)} seconds..." Rails.logger.error() sleep(ConeyIsland.tcp_connection_retry_interval(self.tcp_connection_retries)) retry end rescue Bunny::ConnectionLevelException => e Rails.logger.error "Submitter Handling a connection-level exception." # Rails.logger.error "Bunny class id : #{e.connection_close.class_id}" # Rails.logger.error "Bunny method id: #{e.connection_close.method_id}" # Rails.logger.error "Status code : #{e.connection_close.reply_code}" # Rails.logger.error "Error message : #{e.connection_close.reply_text}" rescue Bunny::ChannelLevelException => e Rails.logger.error "Submitter Handling a channel-level exception." Rails.logger.error "Bunny class id : #{e.channel_close.class_id}" Rails.logger.error "Bunny method id: #{e.channel_close.method_id}" Rails.logger.error "Status code : #{e.channel_close.reply_code}" Rails.logger.error "Error message : #{e.channel_close.reply_text}" else self.initialize_rabbit self.tcp_connection_retries = 0 end |
.initialize_rabbit ⇒ Object
139 140 141 142 143 144 |
# File 'lib/coney_island/submitter.rb', line 139 def self.initialize_rabbit self.create_channel @exchange = self.channel.topic('coney_island') @delay_exchange = self.channel.topic('coney_island_delay') @delay_queue = {} end |
.jobs_cache ⇒ Object
31 32 33 |
# File 'lib/coney_island/submitter.rb', line 31 def self.jobs_cache @jobs_cache ||= JobsCache.new end |
.publish_job(args, job_id = nil) ⇒ Object
TODO: Document me
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/coney_island/submitter.rb', line 151 def self.publish_job(args, job_id = nil) # Map arguments klass, method_name, job_args = *args # Job args is optional job_args ||= {} # Check arguments # Break if klass isn't a Class or a Module raise ConeyIsland::JobArgumentError.new "Expected #{klass} to be a Class or Module" unless [Class, Module].any? {|k| klass.is_a?(k)} # Break if method_name isn't a String or a Symbol raise ConeyIsland::JobArgumentError.new "Expected #{method_name} to be a String or a Symbol" unless [String,Symbol].any? {|k| method_name.is_a?(k)} # Set defaults job_args['klass'] = klass.name job_args['method_name'] = method_name job_args.stringify_keys! # Extract non job args delay = job_args.delete 'delay' work_queue = job_args.delete 'work_queue' # Set class defaults if they exist if klass.included_modules.include?(Performer) delay ||= klass.get_coney_settings[:delay] work_queue ||= klass.get_coney_settings[:work_queue] end # Set our own defaults if we still don't have any work_queue ||= ConeyIsland.default_settings[:work_queue] delay ||= ConeyIsland.default_settings[:delay] # Make sure we have a connection if we need one handle_connection if !running_inline? && !connected? if self.running_inline? # Just run this inline if we're not threaded ConeyIsland::Job.new(nil, job_args).handle_job elsif delay && delay.to_i > 0 # Is this delayed? # Publish to the delay exchange publish_to_delay_queue(job_id, job_args, work_queue, delay) else # Publish to the normal exchange publish_to_queue(self.exchange, job_id, job_args, work_queue) end true end |
.run_inline ⇒ Object
11 12 13 |
# File 'lib/coney_island/submitter.rb', line 11 def self.run_inline @run_inline = true end |
.running_inline? ⇒ Boolean
19 20 21 |
# File 'lib/coney_island/submitter.rb', line 19 def self.running_inline? !!@run_inline end |
.start_connection ⇒ Object
64 65 66 |
# File 'lib/coney_island/submitter.rb', line 64 def self.start_connection @connection.start end |
.stop_running_inline ⇒ Object
15 16 17 |
# File 'lib/coney_island/submitter.rb', line 15 def self.stop_running_inline @run_inline = false end |
.submit(*args) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/coney_island/submitter.rb', line 35 def self.submit(*args) if caching_jobs? cache_job(*args) else submit!(args) end end |
.submit!(args, job_id = nil) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/coney_island/submitter.rb', line 43 def self.submit!(args, job_id = nil) Rails.logger.info "Submitting job #{job_id}: #{args}" publish_job(args, job_id) rescue StandardError => e Rails.logger.error(e) ConeyIsland.poke_the_badger(e,{ code_source: "ConeyIsland::Submitter.submit!", message: "Error submitting job", job_args: args }) fail e if running_inline? end |
.tcp_connection_retries ⇒ Object
27 28 29 |
# File 'lib/coney_island/submitter.rb', line 27 def self.tcp_connection_retries @tcp_connection_retries end |
.tcp_connection_retries=(number) ⇒ Object
23 24 25 |
# File 'lib/coney_island/submitter.rb', line 23 def self.tcp_connection_retries=(number) @tcp_connection_retries = number end |