Class: Beaneater::Tube

Inherits:
Object
  • Object
show all
Defined in:
lib/beaneater/tube/record.rb

Overview

Beanstalk tube which contains jobs which can be inserted, reserved, et al.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name) ⇒ Tube

Fetches the specified tube.

Examples:

Beaneater::Tube.new(@client, 'tube-name')

Parameters:

  • client (Beaneater)

    The beaneater client instance.

  • name (String)

    The name for this tube.



18
19
20
21
22
# File 'lib/beaneater/tube/record.rb', line 18

def initialize(client, name)
  @client = client
  @name = name.to_s
  @mutex = Mutex.new
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



9
# File 'lib/beaneater/tube/record.rb', line 9

attr_reader :name, :client

#nameString

Returns name of the tube.

Returns:

  • (String)

    name of the tube



9
10
11
# File 'lib/beaneater/tube/record.rb', line 9

def name
  @name
end

Instance Method Details

#clearObject

Clears all unreserved jobs in all states from the tube

Examples:

@tube.clear


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/beaneater/tube/record.rb', line 132

def clear
  client.tubes.watch!(self.name)
  %w(delayed buried ready).each do |state|
    while job = self.peek(state.to_sym)
      begin
        job.delete
      rescue Beaneater::UnexpectedResponse, Beaneater::NotFoundError
        # swallow any issues
      end
    end
  end
  client.tubes.ignore(name)
rescue Beaneater::NotIgnoredError
  # swallow any issues
end

#configBeaneater::Configuration (protected)

Returns configuration options for beaneater

Returns:



180
181
182
# File 'lib/beaneater/tube/record.rb', line 180

def config
  Beaneater.configuration
end

#kick(bounds = 1) ⇒ Hash{String => String, Number}

Kick specified number of jobs from buried to ready state.

Examples:

@tube.kick(5)

Parameters:

  • bounds (Integer) (defaults to: 1)

    The number of jobs to kick.

Returns:

  • (Hash{String => String, Number})

    Beanstalkd command response



99
100
101
# File 'lib/beaneater/tube/record.rb', line 99

def kick(bounds=1)
  safe_use { transmit("kick #{bounds}") }
end

#pause(delay) ⇒ Array<Hash{String => String, Number}>

Pause the execution of this tube for specified delay.

Examples:

@tube.pause(10)

Parameters:

  • delay (Integer)

    Number of seconds to delay tube execution

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response



123
124
125
# File 'lib/beaneater/tube/record.rb', line 123

def pause(delay)
  transmit("pause-tube #{name} #{delay}")
end

#peek(state) ⇒ Beaneater::Job

Peek at next job within this tube in given state.

Examples:

@tube.peek(:ready) # => <Beaneater::Job id=5 body=foo>

Parameters:

  • state (String)

    The job state to peek at (ready, buried, delayed)

Returns:



66
67
68
69
70
71
72
73
74
# File 'lib/beaneater/tube/record.rb', line 66

def peek(state)
  safe_use do
    res = transmit("peek-#{state}")
    Job.new(client, res)
  end
rescue Beaneater::NotFoundError
  # Return nil if not found
  nil
end

#put(body, options = {}) ⇒ Hash{String => String, Number}

Inserts job with specified body onto tube.

Examples:

@tube.put "data", :pri => 1000, :ttr => 10, :delay => 5

Parameters:

  • body (String)

    The data to store with this job.

  • options (Hash{String => Integer}) (defaults to: {})

    The settings associated with this job.

Options Hash (options):

  • pri (Integer)

    priority for this job

  • ttr (Integer)

    time to respond for this job

  • delay (Integer)

    delay for this job

Returns:

  • (Hash{String => String, Number})

    beanstalkd command response



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/beaneater/tube/record.rb', line 43

def put(body, options={})
  safe_use do
    serialized_body = config.job_serializer.call(body)

    options = {
      :pri   => config.default_put_pri,
      :delay => config.default_put_delay,
      :ttr   => config.default_put_ttr
    }.merge(options)

    cmd_options = "#{options[:pri]} #{options[:delay]} #{options[:ttr]} #{serialized_body.bytesize}"
    transmit("put #{cmd_options}\r\n#{serialized_body}")
  end
end

#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job

Reserves the next job from tube.

Examples:

@tube.reserve # => <Beaneater::Job id=5 body=foo>

Parameters:

  • timeout (Integer) (defaults to: nil)

    Number of seconds before timing out

  • block (Proc)

    Callback to perform on reserved job

Yields:

  • (job)

    Job that was reserved.

Returns:



86
87
88
89
# File 'lib/beaneater/tube/record.rb', line 86

def reserve(timeout=nil, &block)
  client.tubes.watch!(self.name)
  client.tubes.reserve(timeout, &block)
end

#safe_use(&block) ⇒ Object (protected)

Transmits a beanstalk command that requires this tube to be set as used.

Examples:

safe_use { transmit("kick 1") }
  # => "Response to kick command"

Parameters:

  • block (Proc)

    Beanstalk command to transmit.

Returns:

  • (Object)

    Result of block passed



169
170
171
172
173
174
175
# File 'lib/beaneater/tube/record.rb', line 169

def safe_use(&block)
  @mutex.lock
  client.tubes.use(self.name)
  yield
ensure
  @mutex.unlock
end

#statsBeaneater::StatStruct

Returns related stats for this tube.

Examples:

@tube.stats.current_jobs_delayed # => 24

Returns:



110
111
112
113
# File 'lib/beaneater/tube/record.rb', line 110

def stats
  res = transmit("stats-tube #{name}")
  StatStruct.from_hash(res[:body])
end

#to_sString Also known as: inspect

String representation of tube.

Examples:

@tube.to_s # => "#<Beaneater::Tube name=foo>"

Returns:

  • (String)

    Representation of tube including name.



154
155
156
# File 'lib/beaneater/tube/record.rb', line 154

def to_s
  "#<Beaneater::Tube name=#{name.inspect}>"
end

#transmit(command, options = {}) ⇒ Object

Delegates transmit to the connection object.



27
28
29
# File 'lib/beaneater/tube/record.rb', line 27

def transmit(command, options={})
  client.connection.transmit(command, **options)
end