Class: Remq

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/remq.rb,
lib/remq/script.rb,
lib/remq/version.rb

Defined Under Namespace

Classes: Message, Script

Constant Summary collapse

LIMIT =
1000
VERSION =
"0.0.4"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Remq

Create a Remq client with the given options, which are passed to redis.

Parameters:

  • options (Hash) (defaults to: {})


24
25
26
27
28
29
30
# File 'lib/remq.rb', line 24

def initialize(options = {})
  @redis     = Redis.new(options)
  @predis    = Redis.new(options) # seperate connection for pub/sub
  @listeners = Hash.new { |h, k| h[k] = [] }

  super() # Monitor#initialize
end

Instance Attribute Details

#predisObject (readonly)

Returns the value of attribute predis.



19
20
21
# File 'lib/remq.rb', line 19

def predis
  @predis
end

#redisObject (readonly)

Returns the value of attribute redis.



18
19
20
# File 'lib/remq.rb', line 18

def redis
  @redis
end

Instance Method Details

#consume(pattern, options = {}) ⇒ Array<Remq::Message>

Consume persisted messages from channels matching the given pattern, starting with the cursor if provided, or the first message. limit determines how many messages will be return each time consume is called.

Parameters:

  • pattern (String)
  • options (Hash) (defaults to: {})
    • :cursor => Integer: id of the first message to return
    • :limit => Integer: maximum number of messages to return

Returns:



98
99
100
101
102
103
104
# File 'lib/remq.rb', line 98

def consume(pattern, options = {})
  synchronize do
    cursor, limit = options.fetch(:cursor, 0), options.fetch(:limit, LIMIT)
    msgs = call(:consume, pattern, cursor, limit)
    msgs.map { |msg| _handle_raw_message(msg) }
  end
end

#inspectObject



159
160
161
# File 'lib/remq.rb', line 159

def inspect
  "#<Remq client v#{Remq::VERSION} for #{redis.client.id}>"
end

#key(*args) ⇒ String

Build a key from the given name and channel.

Parameters:

  • name (String)
  • channel (String)

Returns:

  • (String)

    key



155
156
157
# File 'lib/remq.rb', line 155

def key(*args)
  (['remq'] + args).join(':')
end

#off(event, listener) ⇒ Remq

Remove a listener from the given event.

Parameters:

  • event (String|Symbol)
  • listener (Proc)

Returns:



141
142
143
144
145
146
147
# File 'lib/remq.rb', line 141

def off(event, listener)
  synchronize do
    @listeners[event.to_sym].delete(listener)
  end

  self
end

#on(event, listener = nil) { ... } ⇒ Remq

Add a listener to the given event.

Parameters:

  • event (String|Symbol)
  • listener (Proc) (defaults to: nil)

Yields:

  • a block to be called when the event is emitted

Returns:



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/remq.rb', line 122

def on(event, listener=nil, &block)
  listener ||= block
  unless listener.respond_to?(:call)
    raise ArgumentError.new('Listener must respond to #call')
  end

  synchronize do
    @listeners[event.to_sym] << listener
  end

  self
end

#publish(channel, message) ⇒ Integer

Publish a message to the given channel. The message must be a string, but objects can easily be serialized using JSON, etc. The id of the published message will be returned as an integer.

Parameters:

  • channel (String)
  • message (String)

Returns:

  • (Integer)

    id



40
41
42
43
44
45
# File 'lib/remq.rb', line 40

def publish(channel, message)
  synchronize do
    id = call(:publish, channel, message)
    id && id.to_i
  end
end

#quitObject

Forcibly close the connections to the Redis server.



107
108
109
110
111
112
# File 'lib/remq.rb', line 107

def quit
  synchronize do
    redis.quit
    predis.quit
  end
end

#subscribe(pattern, options = {}) {|received| ... } ⇒ Thread

Subscribe to the channels matching given pattern. If no initial from_id is provided, Remq subscribes using vanilla Redis pub/sub. Any Redis pub/sub pattern will work. If from_id is provided, Remq replays messages after the given id until its caught up and able to switch to pub/sub.

Remq-rb subscribes to pub/sub on another thread, which is returned so you can handle it and call Thread#join when ready to block.

Parameters:

  • pattern (String)
  • options (Hash) (defaults to: {})
    • :from_id => Integer: The message id to replay from (usually the last)

Yields:

  • a block to add as a listener to the message event

Yield Parameters:

Returns:

  • (Thread)

    thread where messages will be received



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/remq.rb', line 63

def subscribe(pattern, options = {}, &block)
  synchronize do
    return if @subscription

    on(:message, &block) if block

    @subscription = true

    if cursor = options[:from_id]
      @subscription = (pattern, cursor)
    else
      @subscription = (pattern)
    end
  end
end

#unsubscribeObject

Unsubscribe. No more message events will be emitted after this is called.



80
81
82
83
84
85
86
# File 'lib/remq.rb', line 80

def unsubscribe
  synchronize do
    return unless @subscription
    @subscription.exit if @subscription.is_a?(Thread)
    @subscription = nil
  end
end