Module: ActionSubscriber::DSL

Included in:
Base
Defined in:
lib/action_subscriber/dsl.rb

Instance Method Summary collapse

Instance Method Details

#_run_action_at_least_once_with_filters(env, action) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/action_subscriber/dsl.rb', line 134

def _run_action_at_least_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false

  _run_action_with_filters(env, action)

  processed_acknowledgement = env.acknowledge
rescue
  ::ActionSubscriber::MessageRetry.redeliver_message_with_backoff(env)
  processed_acknowledgement = env.acknowledge

  raise
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_at_most_once_with_filters(env, action) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/action_subscriber/dsl.rb', line 103

def _run_action_at_most_once_with_filters(env, action)
  processed_acknowledgement = false
  rejected_message = false
  processed_acknowledgement = env.acknowledge

  _run_action_with_filters(env, action)
ensure
  rejected_message = env.reject if !processed_acknowledgement

  if !rejected_message && !processed_acknowledgement
    $stdout << <<-UNREJECTABLE
      CANNOT ACKNOWLEDGE OR REJECT THE MESSAGE

      This is a exceptional state for ActionSubscriber to enter and puts the current
      Process in the position of "I can't get new work from RabbitMQ, but also
      can't acknowledge or reject the work that I currently have" ... While rare
      this state can happen.

      Instead of continuing to try to process the message ActionSubscriber is
      sending a Kill signal to the current running process to gracefully shutdown
      so that the RabbitMQ server will purge any outstanding acknowledgements. If
      you are running a process monitoring tool (like Upstart) the Subscriber
      process will be restarted and be able to take on new work.

      ** Running a process monitoring tool like Upstart is recommended for this reason **
    UNREJECTABLE

    Process.kill(:TERM, Process.pid)
  end
end

#_run_action_with_filters(env, action) ⇒ Object



93
94
95
96
97
98
99
100
101
# File 'lib/action_subscriber/dsl.rb', line 93

def _run_action_with_filters(env, action)
  subscriber_instance = self.new(env)
  final_block = Proc.new { subscriber_instance.public_send(action) }

  first_proc = around_filters.reverse.reduce(final_block) do |block, filter|
    Proc.new { subscriber_instance.send(filter, &block) }
  end
  first_proc.call
end

#acknowledge_messages?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/action_subscriber/dsl.rb', line 21

def acknowledge_messages?
  !!@_acknowledge_messages
end

#around_filter(filter_method) ⇒ Object



25
26
27
28
# File 'lib/action_subscriber/dsl.rb', line 25

def around_filter(filter_method)
  around_filters << filter_method unless around_filters.include?(filter_method)
  around_filters
end

#around_filtersObject



30
31
32
# File 'lib/action_subscriber/dsl.rb', line 30

def around_filters
  @_around_filters ||= []
end

#at_least_once!Object



3
4
5
6
# File 'lib/action_subscriber/dsl.rb', line 3

def at_least_once!
  @_acknowledge_messages = true
  @_at_least_once = true
end

#at_least_once?Boolean

Returns:

  • (Boolean)


8
9
10
# File 'lib/action_subscriber/dsl.rb', line 8

def at_least_once?
  !!@_at_least_once
end

#at_most_once!Object



12
13
14
15
# File 'lib/action_subscriber/dsl.rb', line 12

def at_most_once!
  @_acknowledge_messages = true
  @_at_most_once = true
end

#at_most_once?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/action_subscriber/dsl.rb', line 17

def at_most_once?
  !!@_at_most_once
end

#exchange_names(*names) ⇒ Object Also known as: exchange

Explicitly set the name of the exchange



36
37
38
39
40
41
42
43
44
45
# File 'lib/action_subscriber/dsl.rb', line 36

def exchange_names(*names)
  @_exchange_names ||= []
  @_exchange_names += names.flatten.map(&:to_s)

  if @_exchange_names.empty?
    return [ ::ActionSubscriber.config.default_exchange ]
  else
    return @_exchange_names.compact.uniq
  end
end

#manual_acknowledgement!Object



48
49
50
51
# File 'lib/action_subscriber/dsl.rb', line 48

def manual_acknowledgement!
  @_acknowledge_messages = true
  @_manual_acknowedgement = true
end

#manual_acknowledgement?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/action_subscriber/dsl.rb', line 53

def manual_acknowledgement?
  !!@_manual_acknowedgement
end

#no_acknowledgement!Object



57
58
59
# File 'lib/action_subscriber/dsl.rb', line 57

def no_acknowledgement!
  @_acknowledge_messages = false
end

#queue_for(method, queue_name) ⇒ Object

Explicitly set the name of a queue for the given method route

Ex.

queue_for :created, "derp.derp"
queue_for :updated, "foo.bar"


67
68
69
70
# File 'lib/action_subscriber/dsl.rb', line 67

def queue_for(method, queue_name)
  @_queue_names ||= {}
  @_queue_names[method] = queue_name
end

#queue_namesObject



72
73
74
# File 'lib/action_subscriber/dsl.rb', line 72

def queue_names
  @_queue_names ||= {}
end

#remote_application_name(name = nil) ⇒ Object Also known as: publisher



76
77
78
79
# File 'lib/action_subscriber/dsl.rb', line 76

def remote_application_name(name = nil)
  @_remote_application_name = name if name
  @_remote_application_name
end

#routing_key_for(method, routing_key_name) ⇒ Object

Explicitly set the whole routing key to use for a given method route.



84
85
86
87
# File 'lib/action_subscriber/dsl.rb', line 84

def routing_key_for(method, routing_key_name)
  @_routing_key_names ||= {}
  @_routing_key_names[method] = routing_key_name
end

#routing_key_namesObject



89
90
91
# File 'lib/action_subscriber/dsl.rb', line 89

def routing_key_names
  @_routing_key_names ||= {}
end

#run_action_with_filters(env, action) ⇒ Object



171
172
173
174
175
176
177
178
179
180
# File 'lib/action_subscriber/dsl.rb', line 171

def run_action_with_filters(env, action)
  case
  when at_least_once?
    _run_action_at_least_once_with_filters(env, action)
  when at_most_once?
    _run_action_at_most_once_with_filters(env, action)
  else
    _run_action_with_filters(env, action)
  end
end