Module: BackgroundBunnies::Bunny

Defined in:
lib/background_bunnies/bunny.rb

Defined Under Namespace

Modules: BunnyConfigurators

Constant Summary collapse

DEFAULT_CONNECTION_OPTIONS =
{:threaded=>true}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



83
84
85
# File 'lib/background_bunnies/bunny.rb', line 83

def channel
  @channel
end

#consumerObject (readonly)

Returns the value of attribute consumer.



85
86
87
# File 'lib/background_bunnies/bunny.rb', line 85

def consumer
  @consumer
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



86
87
88
# File 'lib/background_bunnies/bunny.rb', line 86

def exchange
  @exchange
end

#queueObject (readonly)

Returns the value of attribute queue.



84
85
86
# File 'lib/background_bunnies/bunny.rb', line 84

def queue
  @queue
end

#threadObject (readonly)

Returns the value of attribute thread.



87
88
89
# File 'lib/background_bunnies/bunny.rb', line 87

def thread
  @thread
end

Class Method Details

.included(base) ⇒ Object



64
65
66
# File 'lib/background_bunnies/bunny.rb', line 64

def self.included(base)
  base.extend(BunnyConfigurators)
end

Instance Method Details

#connection_optionsObject



79
80
81
# File 'lib/background_bunnies/bunny.rb', line 79

def connection_options
  self.class.connection_options
end

#log_error(a) ⇒ Object



132
133
134
# File 'lib/background_bunnies/bunny.rb', line 132

def log_error(a)
  BackgroundBunnies.error "#{queue_name}: #{a}"
end

#log_info(a) ⇒ Object



136
137
138
# File 'lib/background_bunnies/bunny.rb', line 136

def log_info(a)
  BackgroundBunnies.info "#{queue_name}: #{a}"
end

#log_warn(a) ⇒ Object



140
141
142
# File 'lib/background_bunnies/bunny.rb', line 140

def log_warn(a)
  BackgroundBunnies.warn "#{queue_name}: #{a}"
end

#on_error(job, err) ⇒ Object



128
129
130
# File 'lib/background_bunnies/bunny.rb', line 128

def on_error(job, err)
  log_error "Error processing #{job.info.delivery_tag}: #{err.message}, #{err.backtrace.join('\n')}"
end

#process(job) ⇒ Object

Process a Job. Implemented by the class.



147
148
149
# File 'lib/background_bunnies/bunny.rb', line 147

def process(job)

end

#queue_nameObject

Returns the name of the queue for the Worker



71
72
73
# File 'lib/background_bunnies/bunny.rb', line 71

def queue_name
  self.class.queue_name
end

#queue_typeObject



75
76
77
# File 'lib/background_bunnies/bunny.rb', line 75

def queue_type
  self.class.queue_type
end

#run(connection) ⇒ Object

Starts the worker instance and blocks the current thread.



154
155
156
157
# File 'lib/background_bunnies/bunny.rb', line 154

def run(connection)
  start connection
  Thread.current.join
end

#start(connection_or_group) ⇒ Object

Starts the Worker with the given connection or group name



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/background_bunnies/bunny.rb', line 92

def start(connection_or_group)
  @connection = connection_or_group
  @channel = AMQP::Channel.new(@connection)
  queue_options = {}
  name = queue_name
  if queue_type == :broadcast
    queue_options[:exclusive] = true
    queue_options[:auto_delete] = true
    name = "#{Socket.gethostname}-#{Process.pid}-#{self.object_id}"
    @queue = @channel.queue(name, queue_options)
    @exchange = @channel.fanout(BackgroundBunnies.broadcast_exchange_name(queue_name))
    @queue.bind(@exchange)
  else
    queue_options[:durable] = true
    @queue = @channel.queue(queue_name, queue_options)
  end
  @consumer = @queue.subscribe(:ack=>true) do |, payload|
    info = 
    properties = nil
    begin 
      job = Job.new(JSON.parse!(payload), info, properties)
      err = nil
      self.process(job) 
      .ack
    rescue =>err
      # processing went wrong, requeing message
      job = Job.new(nil, info, properties) unless job
      unless on_error(job, err)
        .reject(:requeue=>true)
      else
        .ack
      end
    end
  end
end