Module: BackgroundBunnies::Bunny
- Defined in:
- lib/background_bunnies/bunny.rb
Defined Under Namespace
Modules: BunnyConfigurators
Constant Summary collapse
- DEFAULT_CONNECTION_OPTIONS =
{:threaded=>true}
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Class Method Summary collapse
Instance Method Summary collapse
- #connection_options ⇒ Object
- #log_error(a) ⇒ Object
- #log_info(a) ⇒ Object
- #log_warn(a) ⇒ Object
- #on_error(job, err) ⇒ Object
-
#process(job) ⇒ Object
Process a Job.
-
#queue_name ⇒ Object
Returns the name of the queue for the Worker.
- #queue_type ⇒ Object
-
#run(connection) ⇒ Object
Starts the worker instance and blocks the current thread.
-
#start(connection_or_group) ⇒ Object
Starts the Worker with the given connection or group name.
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
83 84 85 |
# File 'lib/background_bunnies/bunny.rb', line 83 def channel @channel end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
85 86 87 |
# File 'lib/background_bunnies/bunny.rb', line 85 def consumer @consumer end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
86 87 88 |
# File 'lib/background_bunnies/bunny.rb', line 86 def exchange @exchange end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
84 85 86 |
# File 'lib/background_bunnies/bunny.rb', line 84 def queue @queue end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
87 88 89 |
# File 'lib/background_bunnies/bunny.rb', line 87 def thread @thread end |
Class Method Details
.included(base) ⇒ Object
64 65 66 |
# File 'lib/background_bunnies/bunny.rb', line 64 def self.included(base) base.extend(BunnyConfigurators) end |
Instance Method Details
#connection_options ⇒ Object
79 80 81 |
# File 'lib/background_bunnies/bunny.rb', line 79 def self.class. end |
#log_error(a) ⇒ Object
132 133 134 |
# File 'lib/background_bunnies/bunny.rb', line 132 def log_error(a) BackgroundBunnies.error "#{queue_name}: #{a}" end |
#log_info(a) ⇒ Object
136 137 138 |
# File 'lib/background_bunnies/bunny.rb', line 136 def log_info(a) BackgroundBunnies.info "#{queue_name}: #{a}" end |
#log_warn(a) ⇒ Object
140 141 142 |
# File 'lib/background_bunnies/bunny.rb', line 140 def log_warn(a) BackgroundBunnies.warn "#{queue_name}: #{a}" end |
#on_error(job, err) ⇒ Object
128 129 130 |
# File 'lib/background_bunnies/bunny.rb', line 128 def on_error(job, err) log_error "Error processing #{job.info.delivery_tag}: #{err.}, #{err.backtrace.join('\n')}" end |
#process(job) ⇒ Object
Process a Job. Implemented by the class.
147 148 149 |
# File 'lib/background_bunnies/bunny.rb', line 147 def process(job) end |
#queue_name ⇒ Object
Returns the name of the queue for the Worker
71 72 73 |
# File 'lib/background_bunnies/bunny.rb', line 71 def queue_name self.class.queue_name end |
#queue_type ⇒ Object
75 76 77 |
# File 'lib/background_bunnies/bunny.rb', line 75 def queue_type self.class.queue_type end |
#run(connection) ⇒ Object
Starts the worker instance and blocks the current thread.
154 155 156 157 |
# File 'lib/background_bunnies/bunny.rb', line 154 def run(connection) start connection Thread.current.join end |
#start(connection_or_group) ⇒ Object
Starts the Worker with the given connection or group name
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/background_bunnies/bunny.rb', line 92 def start(connection_or_group) @connection = connection_or_group @channel = AMQP::Channel.new(@connection) = {} name = queue_name if queue_type == :broadcast [:exclusive] = true [:auto_delete] = true name = "#{Socket.gethostname}-#{Process.pid}-#{self.object_id}" @queue = @channel.queue(name, ) @exchange = @channel.fanout(BackgroundBunnies.broadcast_exchange_name(queue_name)) @queue.bind(@exchange) else [:durable] = true @queue = @channel.queue(queue_name, ) end @consumer = @queue.subscribe(:ack=>true) do |, payload| info = properties = nil begin job = Job.new(JSON.parse!(payload), info, properties) err = nil self.process(job) .ack rescue =>err # processing went wrong, requeing message job = Job.new(nil, info, properties) unless job unless on_error(job, err) .reject(:requeue=>true) else .ack end end end end |