Class: Wamp::Worker::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/wamp/worker/queue.rb

Defined Under Namespace

Classes: Descriptor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Constructor



44
45
46
47
# File 'lib/wamp/worker/queue.rb', line 44

def initialize(name)
  @redis = Wamp::Worker.config.redis(name)
  @default_timeout = Wamp::Worker.config.timeout(name)
end

Instance Attribute Details

#default_timeoutObject (readonly)

Returns the value of attribute default_timeout.



7
8
9
# File 'lib/wamp/worker/queue.rb', line 7

def default_timeout
  @default_timeout
end

#redisObject (readonly)

Returns the value of attribute redis.



7
8
9
# File 'lib/wamp/worker/queue.rb', line 7

def redis
  @redis
end

Instance Method Details

#pop(queue_name, wait: false, delete: false, timeout: nil) ⇒ Object

Pops a command off of the queue

Parameters:

  • queue_name (String, Array)
    • The name of the queue (or multiple queues if brpop)

  • wait (Bool) (defaults to: false)
    • True if we want to block waiting for the response

  • delete (Bool) (defaults to: false)
    • True if we want the queue deleted (only applicable if wait)

  • timeout (Int) (defaults to: nil)
    • Number of seconds to wait before timing out



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/wamp/worker/queue.rb', line 73

def pop(queue_name, wait: false, delete: false, timeout: nil)

  # Retrieve the response from the queue
  if wait
    # Use the default timeout if non is specified
    timeout ||= self.default_timeout

    # Make the pop call
    response = self.redis.brpop(queue_name, timeout: timeout)

    # Returns [queue, value]
    if response != nil
      queue_name = response[0]
      response = response[1]
    end
  else
    # Else just call the method
    response = self.redis.rpop(queue_name)
  end

  # If delete was set, delete the queue
  if delete
    self.redis.del(queue_name)
  end

  # Parse the response
  descriptor = response != nil ? Descriptor.from_json(response) : nil

  # Log the info
  log(:pop, queue_name, descriptor)

  # Return the queue_name and the descriptor
  descriptor
end

#push(queue_name, command, params, handle = nil) ⇒ Object

Pushes a command onto the queue

Parameters:

  • queue_name (String)
    • The name of the queue

  • command (Symbol)
    • The command

  • params (Hash)
    • The params for the request

  • handle (String) (defaults to: nil)
    • The response handle



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/wamp/worker/queue.rb', line 55

def push(queue_name, command, params, handle=nil)

  # Create the descriptor
  descriptor = Descriptor.new(command, handle, params)

  # Log the info
  log(:push, queue_name, descriptor)

  # Queue the command
  self.redis.lpush(queue_name, descriptor.to_json)
end