Module: Rabbitek::Consumer::ClassMethods

Defined in:
lib/rabbitek/server/consumer.rb

Overview

rubocop:disable Style/Documentation

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optsObject

Returns the value of attribute opts.



60
61
62
# File 'lib/rabbitek/server/consumer.rb', line 60

def opts
  @opts
end

#rabbit_options_hashObject

Returns the value of attribute rabbit_options_hash.



60
61
62
# File 'lib/rabbitek/server/consumer.rb', line 60

def rabbit_options_hash
  @rabbit_options_hash
end

Instance Method Details

#perform_async(payload, opts: {}, channel: nil) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/rabbitek/server/consumer.rb', line 67

def perform_async(payload, opts: {}, channel: nil)
  publisher = Publisher.new(
    rabbit_options_hash[:bind_exchange],
    exchange_type: rabbit_options_hash[:bind_exchange_type],
    channel: channel
  )
  publish_with_publisher(publisher, payload, opts)
ensure
  publisher&.close unless channel
end

#perform_at(at_time, payload, opts: {}, channel: nil) ⇒ Object



92
93
94
# File 'lib/rabbitek/server/consumer.rb', line 92

def perform_at(at_time, payload, opts: {}, channel: nil)
  perform_in(at_time - Time.current, payload, opts: opts, channel: channel)
end

#perform_in(time, payload, opts: {}, channel: nil) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/rabbitek/server/consumer.rb', line 78

def perform_in(time, payload, opts: {}, channel: nil)
  publisher = Publisher.new(
    Utils::RabbitObjectNames.retry_or_delayed_bind_exchange(rabbit_options_hash[:bind_exchange]),
    exchange_type: :direct,
    channel: channel
  )
  publish_with_publisher(publisher, payload, {
    expiration: time.to_i * 1000, # in milliseconds
    headers: { 'x-dead-letter-routing-key': to_s }
  }.merge(opts))
ensure
  publisher&.close unless channel
end

#publish_with_publisher(publisher, payload, opts) ⇒ Object



96
97
98
# File 'lib/rabbitek/server/consumer.rb', line 96

def publish_with_publisher(publisher, payload, opts)
  publisher.publish(payload, { routing_key: to_s }.merge(opts))
end

#rabbit_options(opts) ⇒ Object



62
63
64
65
# File 'lib/rabbitek/server/consumer.rb', line 62

def rabbit_options(opts)
  self.rabbit_options_hash = default_rabbit_options(opts).with_indifferent_access.merge(opts)
  self.opts = opts
end