Class: UnifiedQueues::Multi::Driver::EMJackDriver::Connection

Inherits:
UnifiedQueues::Multi::Driver show all
Defined in:
lib/unified-queues/multi/driver/em-jack.rb

Overview

Multi queue driver for unified queues single queue interface.

Instance Attribute Summary

Attributes inherited from UnifiedQueues::Multi::Driver

#native

Instance Method Summary collapse

Methods inherited from UnifiedQueues::Multi::Driver

#close

Constructor Details

#initialize(cls, *args, &block) ⇒ Connection

Constructor.



64
65
66
67
68
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 64

def initialize(cls, *args, &block)
    @native = cls::new(*args, &block)
    @subscribed_names = LookupHash["default"]
    @used_name = "default"
end

Instance Method Details

#list(&block) ⇒ Array

Lists names of all available queues.

Returns:

  • (Array)


174
175
176
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 174

def list(&block)
    @native.list(&block)
end

#list_subscribed(&block) ⇒ Array

Lists all subscribed queues.

Returns:

  • (Array)


192
193
194
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 192

def list_subscribed(&block)
    @native.list(:watched, &block)
end

#list_used(&block) ⇒ Array

Lists all used queues.

Returns:

  • (Array)


183
184
185
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 183

def list_used(&block)
    @native.list(:used, &block)
end

#pop(blocking = false, &block) ⇒ Object?

Pops value from the queue. Callback is recurring, so it will keep callback active after #pop.

Parameters:

  • blocking (Boolean) (defaults to: false)

    indicates, it should block or not

Returns:

  • (Object, nil)


89
90
91
92
93
94
95
96
97
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 89

def pop(blocking = false, &block)
    timeout = blocking ? nil : 0
    @native.each_job(timeout) do |job|
        result = job.body
        job.delete do
            yield result
        end
    end
end

#push(value, key = value, &block) ⇒ Object

Pushes value to the currently used queue.

Parameters:

  • value (Object)
  • key (Integer) (defaults to: value)

    key for priority queues



77
78
79
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 77

def push(value, key = value, &block)
    @native.put(value, :priority => key, &block)
end

#subscribe(name, &block) ⇒ Object

Subscribes to the queue. So marks it as target for #pop. Note, than only single queue can be subscribed at one time.

Parameters:

  • name (Object)

    name of the required queue



124
125
126
127
128
129
130
131
132
133
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 124

def subscribe(name, &block)
    if not name.in? @subscribed_names
        @subscribed_names << name
        @native.watch(name, &block)
    elsif not block.nil?
        EM::next_tick do
            yield
        end
    end        
end

#subscribed {|_self| ... } ⇒ Queue

Currently subscribed queue.

Yields:

  • (_self)

Yield Parameters:

Returns:

  • (Queue)


165
166
167
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 165

def subscribed
    yield self
end

#unsubscribe(name, &block) ⇒ Object

Unsubscribes from the queue.

Parameters:

  • name (Object)

    name of the required queue\



140
141
142
143
144
145
146
147
148
149
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 140

def unsubscribe(name, &block)
    if name.in? @subscribed_names
        @subscribed_names.delete name
        @native.ignore(name, &block)
    elsif not block.nil?
        EM::next_tick do
            yield
        end
    end
end

#use(name, &block) ⇒ Object

Sets queue with given name as used. So marks it as target for #push.

Parameters:

  • name (Object)

    name of the required queue



106
107
108
109
110
111
112
113
114
115
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 106

def use(name, &block)
    if name != @used_name
        @used_name = name
        @native.use(name, &block)
    elsif not block.nil?
        EM::next_tick do
            yield
        end
    end
end

#used {|_self| ... } ⇒ Queue

Currently used queue.

Yields:

  • (_self)

Yield Parameters:

Returns:

  • (Queue)


156
157
158
# File 'lib/unified-queues/multi/driver/em-jack.rb', line 156

def used
    yield self
end