Class: Beaneater::Tube

Inherits:
PoolCommand show all
Defined in:
lib/beaneater/tube/record.rb

Overview

Beanstalk tube which contains jobs which can be inserted, reserved, et al.

Instance Attribute Summary collapse

Attributes inherited from PoolCommand

#pool

Instance Method Summary collapse

Methods inherited from PoolCommand

#combine_stats, #method_missing, #sum_items, #transmit_to_all

Constructor Details

#initialize(pool, name) ⇒ Tube

Fetches the specified tube.

Examples:

Beaneater::Tube.new(@pool, 'tube-name')

Parameters:

  • pool (Beaneater::Pool)

    The beaneater pool for this tube.

  • name (String)

    The name for this tube.


16
17
18
19
20
# File 'lib/beaneater/tube/record.rb', line 16

def initialize(pool, name)
  @name = name.to_s
  @mutex = Mutex.new
  super(pool)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Beaneater::PoolCommand

Instance Attribute Details

#nameObject

Returns the value of attribute name


7
8
9
# File 'lib/beaneater/tube/record.rb', line 7

def name
  @name
end

Instance Method Details

#clearObject

Clears all unreserved jobs in all states from the tube

Examples:

@tube.clear

117
118
119
120
121
122
123
124
125
126
127
# File 'lib/beaneater/tube/record.rb', line 117

def clear
  pool.tubes.watch!(self.name)
  %w(delayed buried ready).each do |state|
    while job = self.peek(state.to_sym)
      job.delete
    end
  end
  pool.tubes.ignore(name)
rescue Beaneater::UnexpectedResponse
  # swallow any issues
end

#configBeaneater::Configuration (protected)

Returns configuration options for beaneater

Returns:


161
162
163
# File 'lib/beaneater/tube/record.rb', line 161

def config
  Beaneater.configuration
end

#kick(bounds = 1) ⇒ Hash{String => String, Number}

Kick specified number of jobs from buried to ready state.

Examples:

@tube.kick(5)

Parameters:

  • bounds (Integer) (defaults to: 1)

    The number of jobs to kick.

Returns:

  • (Hash{String => String, Number})

    Beanstalkd command response


84
85
86
# File 'lib/beaneater/tube/record.rb', line 84

def kick(bounds=1)
  safe_use { transmit_to_rand("kick #{bounds}") }
end

#pause(delay) ⇒ Array<Hash{String => String, Number}>

Pause the execution of this tube for specified delay.

Examples:

@tube.pause(10)

Parameters:

  • delay (Integer)

    Number of seconds to delay tube execution

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response


108
109
110
# File 'lib/beaneater/tube/record.rb', line 108

def pause(delay)
  transmit_to_all("pause-tube #{name} #{delay}")
end

#peek(state) ⇒ Beaneater::Job

Peek at next job within this tube in given state.

Examples:

@tube.peek(:ready) # => <Beaneater::Job id=5 body=foo>

Parameters:

  • state (String)

    The job state to peek at (ready, buried, delayed)

Returns:


51
52
53
54
55
56
57
58
59
# File 'lib/beaneater/tube/record.rb', line 51

def peek(state)
  safe_use do
    res = transmit_until_res "peek-#{state}", :status => "FOUND"
    Job.new(res)
  end
rescue Beaneater::NotFoundError
  # Return nil if not found
  nil
end

#put(body, options = {}) ⇒ Hash{String => String, Number}

Inserts job with specified body onto tube.

Examples:

@tube.put "data", :pri => 1000, :ttr => 10, :delay => 5

Parameters:

  • body (String)

    The data to store with this job.

  • options (Hash{String => Integer}) (defaults to: {})

    The settings associated with this job.

Options Hash (options):

  • pri (Integer)

    priority for this job

  • ttr (Integer)

    time to respond for this job

  • delay (Integer)

    delay for this job

Returns:

  • (Hash{String => String, Number})

    beanstalkd command response


34
35
36
37
38
39
40
41
# File 'lib/beaneater/tube/record.rb', line 34

def put(body, options={})
  safe_use do
    options = { :pri => config.default_put_pri, :delay => config.default_put_delay,
                :ttr => config.default_put_ttr }.merge(options)
    cmd_options = "#{options[:pri]} #{options[:delay]} #{options[:ttr]} #{body.bytesize}"
    transmit_to_rand("put #{cmd_options}\r\n#{body}")
  end
end

#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job

Reserves the next job from tube.

Examples:

@tube.reserve # => <Beaneater::Job id=5 body=foo>

Parameters:

  • timeout (Integer) (defaults to: nil)

    Number of seconds before timing out

  • block (Proc)

    Callback to perform on reserved job

Yields:

  • (job)

    Job that was reserved.

Returns:


71
72
73
74
# File 'lib/beaneater/tube/record.rb', line 71

def reserve(timeout=nil, &block)
  pool.tubes.watch!(self.name)
  pool.tubes.reserve(timeout, &block)
end

#safe_use(&block) ⇒ Object (protected)

Transmits a beanstalk command that requires this tube to be set as used.

Examples:

safe_use { transmit_to_rand("kick 1") }
  # => "Response to kick command"

Parameters:

  • block (Proc)

    Beanstalk command to transmit.

Returns:

  • (Object)

    Result of block passed


150
151
152
153
154
155
156
# File 'lib/beaneater/tube/record.rb', line 150

def safe_use(&block)
  @mutex.lock
  tubes.use(self.name)
  yield
ensure
  @mutex.unlock
end

#statsBeaneater::StatStruct

Returns related stats for this tube.

Examples:

@tube.stats.current_jobs_delayed # => 24

Returns:


95
96
97
98
# File 'lib/beaneater/tube/record.rb', line 95

def stats
  res = transmit_to_all("stats-tube #{name}", :merge => true)
  StatStruct.from_hash(res[:body])
end

#to_sString Also known as: inspect

String representation of tube.

Examples:

@tube.to_s # => "#<Beaneater::Tube name=foo>"

Returns:

  • (String)

    Representation of tube including name.


135
136
137
# File 'lib/beaneater/tube/record.rb', line 135

def to_s
  "#<Beaneater::Tube name=#{name.inspect}>"
end