Module: Wakame::AMQPClient
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#amqp_client ⇒ Object
readonly
Returns the value of attribute amqp_client.
-
#mq ⇒ Object
readonly
Returns the value of attribute mq.
-
#queue_subscribers ⇒ Object
readonly
Returns the value of attribute queue_subscribers.
Class Method Summary collapse
Instance Method Summary collapse
- #add_subscriber(queue_name, &blk) ⇒ Object
- #amq ⇒ Object
- #amqp_server_uri ⇒ Object
- #cleanup ⇒ Object
- #close(&blk) ⇒ Object
- #connect(*args) ⇒ Object
- #connected? ⇒ Boolean
- #define_queue(name, exchange_name, opts = {}) ⇒ Object
-
#publish_to(name, *args) ⇒ Object
When you want to broadcast the data to all bound queues: publish_to(‘exchange name’, ‘data’) publish_to(‘exchange name’, ‘*’, ‘data’) When you want to send the data to keyed queue(s): publish_to(‘exchange name’, ‘group.1’, ‘data’).
Instance Attribute Details
#amqp_client ⇒ Object (readonly)
Returns the value of attribute amqp_client.
115 116 117 |
# File 'lib/wakame/amqp_client.rb', line 115 def amqp_client @amqp_client end |
#mq ⇒ Object (readonly)
Returns the value of attribute mq.
115 116 117 |
# File 'lib/wakame/amqp_client.rb', line 115 def mq @mq end |
#queue_subscribers ⇒ Object (readonly)
Returns the value of attribute queue_subscribers.
219 220 221 |
# File 'lib/wakame/amqp_client.rb', line 219 def queue_subscribers @queue_subscribers end |
Class Method Details
.included(klass) ⇒ Object
13 14 15 16 17 |
# File 'lib/wakame/amqp_client.rb', line 13 def self.included(klass) klass.extend(ClassMethods) klass.class_eval { } end |
Instance Method Details
#add_subscriber(queue_name, &blk) ⇒ Object
221 222 223 224 225 226 227 228 229 230 |
# File 'lib/wakame/amqp_client.rb', line 221 def add_subscriber(queue_name, &blk) # @mq object can be used here as it is just for checing the member of defined queues. raise "Undefined queue name : #{queue_name}" unless @mq.queues.has_key?(queue_name) EM. { @queue_subscribers ||= {} @queue_subscribers[queue_name] ||= [] @queue_subscribers[queue_name] << blk } end |
#amq ⇒ Object
144 145 146 147 |
# File 'lib/wakame/amqp_client.rb', line 144 def amq raise 'AMQP connection is not established yet' if Thread.current[:mq].nil? Thread.current[:mq] end |
#amqp_server_uri ⇒ Object
117 118 119 120 121 122 123 124 |
# File 'lib/wakame/amqp_client.rb', line 117 def amqp_server_uri raise "The connection is not established yet." unless @amqp_client && connected? URI::AMQP.build(:host => @amqp_client.settings[:host], :port => @amqp_client.settings[:port], :path => @amqp_client.settings[:vhost] ) end |
#cleanup ⇒ Object
149 150 |
# File 'lib/wakame/amqp_client.rb', line 149 def cleanup end |
#close(&blk) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/wakame/amqp_client.rb', line 152 def close(&blk) closing_proc = proc { begin cleanup yield if block_given? ensure @amqp_client = nil @mq = Thread.current[:mq] = nil end } @amqp_client.close { closing_proc.call } unless @amqp_client.nil? end |
#connect(*args) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/wakame/amqp_client.rb', line 126 def connect(*args) close() unless connected? @amqp_client = AMQP.connect(*args) @amqp_client.instance_eval { def settings @settings end } @mq = Thread.current[:mq] = MQ.new(@amqp_client) run_defered_callbacks yield if block_given? end |
#connected? ⇒ Boolean
140 141 142 |
# File 'lib/wakame/amqp_client.rb', line 140 def connected? !@amqp_client.nil? end |
#define_queue(name, exchange_name, opts = {}) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/wakame/amqp_client.rb', line 199 def define_queue(name, exchange_name, opts={}) name = instance_eval('"' + name.gsub(/%\{/, '#{') + '"') opts.each { |k,v| if v.is_a? String opts[k] = instance_eval('"' + v.gsub(/%\{/, '#{') + '"') end } @queue_subscribers ||= {} q = amq.queue(name, opts) q.bind( exchange_name, opts ).subscribe {|data| unless queue_subscribers[name].nil? queue_subscribers[name].each { |p| p.call(data) } end } end |
#publish_to(name, *args) ⇒ Object
When you want to broadcast the data to all bound queues:
publish_to('exchange name', 'data')
publish_to('exchange name', '*', 'data')
When you want to send the data to keyed queue(s):
publish_to('exchange name', 'group.1', 'data')
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/wakame/amqp_client.rb', line 174 def publish_to(name, *args) publish_proc = proc { ex = amq.exchanges[name] || raise("Undefined exchange name : #{name}") case ex.type when :topic if args.size == 1 key = '*' data = args[0] else key = args[0].to_s data = args[1] end else data = args[0] end ex.publish(data, :key=>key) } if Thread.current[:mq].nil? EM.next_tick { publish_proc.call } else publish_proc.call end end |