Class: Remq
- Inherits:
-
Object
- Object
- Remq
- Includes:
- MonitorMixin
- Defined in:
- lib/remq.rb,
lib/remq/script.rb,
lib/remq/version.rb
Defined Under Namespace
Constant Summary collapse
- LIMIT =
1000
- VERSION =
"0.0.4"
Instance Attribute Summary collapse
-
#predis ⇒ Object
readonly
Returns the value of attribute predis.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#consume(pattern, options = {}) ⇒ Array<Remq::Message>
Consume persisted messages from channels matching the given
pattern
, starting with thecursor
if provided, or the first message. -
#initialize(options = {}) ⇒ Remq
constructor
Create a
Remq
client with the givenoptions
, which are passed to redis. - #inspect ⇒ Object
-
#key(*args) ⇒ String
Build a key from the given
name
andchannel
. -
#off(event, listener) ⇒ Remq
Remove a listener from the given event.
-
#on(event, listener = nil) { ... } ⇒ Remq
Add a listener to the given event.
-
#publish(channel, message) ⇒ Integer
Publish a
message
to the givenchannel
. -
#quit ⇒ Object
Forcibly close the connections to the Redis server.
-
#subscribe(pattern, options = {}) {|received| ... } ⇒ Thread
Subscribe to the channels matching given
pattern
. -
#unsubscribe ⇒ Object
Unsubscribe.
Constructor Details
#initialize(options = {}) ⇒ Remq
Create a Remq
client with the given options
, which are passed to redis.
24 25 26 27 28 29 30 |
# File 'lib/remq.rb', line 24 def initialize( = {}) @redis = Redis.new() @predis = Redis.new() # seperate connection for pub/sub @listeners = Hash.new { |h, k| h[k] = [] } super() # Monitor#initialize end |
Instance Attribute Details
#predis ⇒ Object (readonly)
Returns the value of attribute predis.
19 20 21 |
# File 'lib/remq.rb', line 19 def predis @predis end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
18 19 20 |
# File 'lib/remq.rb', line 18 def redis @redis end |
Instance Method Details
#consume(pattern, options = {}) ⇒ Array<Remq::Message>
Consume persisted messages from channels matching the given pattern
,
starting with the cursor
if provided, or the first message. limit
determines how many messages will be return each time consume
is called.
98 99 100 101 102 103 104 |
# File 'lib/remq.rb', line 98 def consume(pattern, = {}) synchronize do cursor, limit = .fetch(:cursor, 0), .fetch(:limit, LIMIT) msgs = call(:consume, pattern, cursor, limit) msgs.map { |msg| (msg) } end end |
#inspect ⇒ Object
159 160 161 |
# File 'lib/remq.rb', line 159 def inspect "#<Remq client v#{Remq::VERSION} for #{redis.client.id}>" end |
#key(*args) ⇒ String
Build a key from the given name
and channel
.
155 156 157 |
# File 'lib/remq.rb', line 155 def key(*args) (['remq'] + args).join(':') end |
#off(event, listener) ⇒ Remq
Remove a listener from the given event.
141 142 143 144 145 146 147 |
# File 'lib/remq.rb', line 141 def off(event, listener) synchronize do @listeners[event.to_sym].delete(listener) end self end |
#on(event, listener = nil) { ... } ⇒ Remq
Add a listener to the given event.
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/remq.rb', line 122 def on(event, listener=nil, &block) listener ||= block unless listener.respond_to?(:call) raise ArgumentError.new('Listener must respond to #call') end synchronize do @listeners[event.to_sym] << listener end self end |
#publish(channel, message) ⇒ Integer
Publish a message
to the given channel
. The message
must be a string,
but objects can easily be serialized using JSON, etc. The id of the
published message will be returned as an integer.
40 41 42 43 44 45 |
# File 'lib/remq.rb', line 40 def publish(channel, ) synchronize do id = call(:publish, channel, ) id && id.to_i end end |
#quit ⇒ Object
Forcibly close the connections to the Redis server.
107 108 109 110 111 112 |
# File 'lib/remq.rb', line 107 def quit synchronize do redis.quit predis.quit end end |
#subscribe(pattern, options = {}) {|received| ... } ⇒ Thread
Subscribe to the channels matching given pattern
. If no initial from_id
is provided, Remq subscribes using vanilla Redis pub/sub. Any Redis pub/sub
pattern will work. If from_id
is provided, Remq replays messages after the
given id until its caught up and able to switch to pub/sub.
Remq-rb subscribes to pub/sub on another thread, which is returned so you
can handle it and call Thread#join
when ready to block.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/remq.rb', line 63 def subscribe(pattern, = {}, &block) synchronize do return if @subscription on(:message, &block) if block @subscription = true if cursor = [:from_id] @subscription = _subscribe_from_cursor(pattern, cursor) else @subscription = _subscribe_to_pubsub(pattern) end end end |
#unsubscribe ⇒ Object
Unsubscribe. No more message
events will be emitted after this is called.
80 81 82 83 84 85 86 |
# File 'lib/remq.rb', line 80 def unsubscribe synchronize do return unless @subscription @subscription.exit if @subscription.is_a?(Thread) @subscription = nil end end |