Class: Qube::Tube

Inherits:
Object
  • Object
show all
Defined in:
lib/qube/model/tube.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Tube

Returns a new instance of Tube.



5
6
7
8
9
10
11
12
# File 'lib/qube/model/tube.rb', line 5

def initialize(options = {})
  @config  = Qube.config
  @client  = Client.new
  @name    = options.delete(:name)
  @type    = options.delete(:type)
  @options = options.merge!(if_not_exists: true)
  fetch_or_create_tube
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/qube/model/tube.rb', line 3

def name
  @name
end

Instance Method Details

#ack(data) ⇒ Object

PUT tubes/:name/:task_id/ask



26
27
28
29
30
# File 'lib/qube/model/tube.rb', line 26

def ack(data)
  task_id  = data.is_a?(Hash) ? data.dig('task_id') : data
  response = @client.put("tubes/#{@name}/#{task_id}/ack", tube: @name, task_id: task_id)
  response.code == 200
end

#dropObject

DELETE tubes/:name



33
34
35
36
# File 'lib/qube/model/tube.rb', line 33

def drop
  response = @client.delete("tubes/#{@name}")
  response.code == 200
end

#each_task(timeout = 0) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/qube/model/tube.rb', line 38

def each_task(timeout = 0)
  loop do
    task = take(timeout)
    if task.nil? || task.empty?
      break
    else
      yield task
      ack(task)
    end
  end
end

#put(task, options = {}) ⇒ Object

POST tubes/:name



15
16
17
18
# File 'lib/qube/model/tube.rb', line 15

def put(task, options = {})
  response = @client.post("tubes/#{@name}", tube: @name, task: task, options: options)
  response.code == 200
end

#take(timeout = 0) ⇒ Object

GET tubes/:name



21
22
23
# File 'lib/qube/model/tube.rb', line 21

def take(timeout = 0)
  @client.get("tubes/#{@name}", timeout: timeout)&.body
end