Class: EM::Beanstalk

Inherits:
Object
  • Object
show all
Defined in:
lib/em-beanstalk.rb,
lib/em-beanstalk/job.rb,
lib/em-beanstalk/shell.rb,
lib/em-beanstalk/connection.rb

Defined Under Namespace

Modules: VERSION Classes: Body, Connection, Job, Shell

Constant Summary collapse

Disconnected =
Class.new(RuntimeError)
InvalidCommand =
Class.new(RuntimeError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = nil) ⇒ Beanstalk

Returns a new instance of Beanstalk.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/em-beanstalk.rb', line 25

def initialize(opts = nil)
  @host                   = opts && opts[:host] || 'localhost'
  @port                   = opts && opts[:port] || 11300
  @tube                   = opts && opts[:tube] || 'default'
  @retry_count            = opts && opts[:retry_count] || 5
  @default_priority       = opts && opts[:default_priority] || 65536
  @default_delay          = opts && opts[:default_delay] || 0
  @default_ttr            = opts && opts[:default_ttr] || 300
  @default_timeout        = opts && opts[:timeout] || 5
  @default_error_callback = opts && opts[:default_error_callback] || Proc.new{ |error| puts "ERROR: #{error.inspect}" }
  @raise_on_disconnect    = opts && opts.key?(:raise_on_disconnect) ? opts[:raise_on_disconnect] : true
  
  @watched_tubes          = []

  @data = ""
  @retries = 0
  @in_reserve = false
  @deferrables = []

  @conn = EM::connect(host, port, EM::Beanstalk::Connection) do |conn|
    conn.client = self
    conn.comm_inactivity_timeout = 0
    conn.pending_connect_timeout = @default_timeout
  end

  if @tube
    use(@tube)
    watch(@tube)
  end
end

Instance Attribute Details

#default_delayObject (readonly)

Returns the value of attribute default_delay.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_delay
  @default_delay
end

#default_error_callbackObject (readonly)

Returns the value of attribute default_error_callback.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_error_callback
  @default_error_callback
end

#default_priorityObject (readonly)

Returns the value of attribute default_priority.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_priority
  @default_priority
end

#default_ttrObject (readonly)

Returns the value of attribute default_ttr.



23
24
25
# File 'lib/em-beanstalk.rb', line 23

def default_ttr
  @default_ttr
end

#hostObject

Returns the value of attribute host.



22
23
24
# File 'lib/em-beanstalk.rb', line 22

def host
  @host
end

#portObject

Returns the value of attribute port.



22
23
24
# File 'lib/em-beanstalk.rb', line 22

def port
  @port
end

Instance Method Details

#add_deferrable(&block) ⇒ Object



206
207
208
209
210
# File 'lib/em-beanstalk.rb', line 206

def add_deferrable(&block)
  df = Defer.new(default_error_callback, &block)
  @deferrables.push(df)
  df
end

#closeObject



56
57
58
59
# File 'lib/em-beanstalk.rb', line 56

def close
  @disconnect_manually = true
  @conn.close_connection
end

#connectedObject



188
189
190
# File 'lib/em-beanstalk.rb', line 188

def connected
  @retries = 0
end

#delete(val, &block) ⇒ Object



141
142
143
144
145
# File 'lib/em-beanstalk.rb', line 141

def delete(val, &block)
  return unless val
  @conn.send(:delete, job_id(val))
  add_deferrable(&block)
end

#disconnectedObject



192
193
194
195
196
197
198
199
# File 'lib/em-beanstalk.rb', line 192

def disconnected
  @deferrables.each {|d| d.fail(:disconnected) }
  unless @disconnect_manually
    raise EM::Beanstalk::Disconnected if @retries >= @retry_count && @raise_on_disconnect
    @retries += 1
    EM.add_timer(1) { reconnect }
  end
end

#drain!(&block) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/em-beanstalk.rb', line 61

def drain!(&block)
  stats do |stats|
    stats['current-jobs-ready'].zero? ?
      EM.next_tick(&block) :
      reserve{|job| job.delete{ drain!(&block) }}
  end
end

#each_job(timeout = nil, &block) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/em-beanstalk.rb', line 100

def each_job(timeout = nil, &block)
  work = Proc.new do
    r = reserve(timeout)
    r.callback do |job|
      block.call(job)
      EM.next_tick { work.call }
    end
    r
  end
  work.call
end

#extract_body(bytes, data) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
# File 'lib/em-beanstalk.rb', line 279

def extract_body(bytes, data)
  rem = data[(data.index(/\r\n/) + 2)..-1]
  if rem.length < bytes
    nil
  else
    body = rem[0..(bytes - 1)]
    data = rem[(bytes + 2)..-1]
    data = "" if data.nil?
    Body.new(body, data)
  end
end

#ignore(tube, &block) ⇒ Object



84
85
86
87
88
89
# File 'lib/em-beanstalk.rb', line 84

def ignore(tube, &block)
  return if not @watched_tubes.include?(tube)
  @watched_tubes.delete(tube)
  @conn.send(:ignore, tube)
  add_deferrable(&block)
end

#job_id(val) ⇒ Object



122
123
124
125
126
127
128
129
# File 'lib/em-beanstalk.rb', line 122

def job_id(val)
  case val
  when Job
    val.id
  else
    val
  end
end

#list(type = nil, &block) ⇒ Object



131
132
133
134
135
136
137
138
139
# File 'lib/em-beanstalk.rb', line 131

def list(type = nil, &block)
  case(type)
  when :tube, :tubes, nil then  @conn.send(:'list-tubes')
  when :use, :used        then  @conn.send(:'list-tube-used')
  when :watch, :watched   then  @conn.send(:'list-tubes-watched')
  else                          raise EM::Beanstalk::InvalidCommand.new
  end
  add_deferrable(&block)
end

#on_error(&block) ⇒ Object



212
213
214
# File 'lib/em-beanstalk.rb', line 212

def on_error(&block)
  @error_callback = block
end

#peek(id, &block) ⇒ Object



147
148
149
150
151
152
153
154
155
# File 'lib/em-beanstalk.rb', line 147

def peek(id, &block)
  case id
  when :ready   then  @conn.send(:'peek-ready')
  when :delayed then  @conn.send(:'peek-delayed')
  when :buried  then  @conn.send(:'peek-buried')
  else                @conn.send(:'peek', id)
  end
  add_deferrable(&block)
end

#put(msg, opts = nil, &block) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/em-beanstalk.rb', line 157

def put(msg, opts = nil, &block)
  case msg
  when Job
    priority = opts && opts[:priority] || msg.priority
    delay = opts && opts[:delay] || msg.delay
    ttr = opts && opts[:ttr] || msg.ttr
    body = msg.body
  else
    priority = opts && opts[:priority] || default_priority
    delay = opts && opts[:delay] || default_delay
    ttr = opts && opts[:ttr] || default_ttr
    body = msg.to_s
  end

  priority = default_priority if priority < 0
  priority = 2 ** 32 if priority > (2 ** 32)
  delay = default_delay if delay < 0
  ttr = default_ttr if ttr < 0

  @conn.send_with_data(:put, body, priority, delay, ttr, body.size)
  add_deferrable(&block)
end

#received(data) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/em-beanstalk.rb', line 216

def received(data)
  @data << data
  
  until @data.empty?
    idx = @data.index(/(.*?)\r\n/)
    break if idx.nil?

    first = $1
  
    case (first)
    when /^DELETED/
      df = @deferrables.shift
      df.succeed
    when /^INSERTED\s+(\d+)/
      df = @deferrables.shift
      df.succeed($1.to_i)
    when /^RELEASED/
      df = @deferrables.shift
      df.succeed
    when /^BURIED\s+(\d+)/
      df = @deferrables.shift
      df.fail(:buried, $1.to_i)
    when /^USING\s+(.*)/
      df = @deferrables.shift
      df.succeed($1)
    when /^WATCHING\s+(\d+)/
      df = @deferrables.shift
      df.succeed($1.to_i)
    when /^OK\s+(\d+)/
      bytes = $1.to_i
      if body = extract_body(bytes, @data)
        @data = body.data
        df = @deferrables.shift
        df.succeed(YAML.load(body.body))
        next
      else
        break
      end
    when /^(RESERVED|FOUND)\s+(\d+)\s+(\d+)/
      id = $2.to_i
      bytes = $3.to_i
      if body = extract_body(bytes, @data)
        @data = body.data
        df = @deferrables.shift
        job = EM::Beanstalk::Job.new(self, id, body.body)
        df.succeed(job)
        next
      else
        break
      end
    # error state
    when /^(OUT_OF_MEMORY|INTERNAL_ERROR|DRAINING|BAD_FORMAT|UNKNOWN_COMMAND|EXPECTED_CRLF|JOB_TOO_BIG|DEADLINE_SOON|TIMED_OUT|NOT_FOUND)/
      puts "... got error, calling df."
      df = @deferrables.shift
      df.fail($1.downcase.to_sym)
      @data = @data[($1.length + 2)..-1]
    else
      break
    end
    @data.slice!(0, first.size + 2)
  end
end

#reconnectObject



201
202
203
204
# File 'lib/em-beanstalk.rb', line 201

def reconnect
  @disconnect_manually = false
  @conn.reconnect(host, port)
end

#release(val, opts = nil, &block) ⇒ Object



180
181
182
183
184
185
186
# File 'lib/em-beanstalk.rb', line 180

def release(val, opts = nil, &block)
  return if val.nil?
  delay = opts && opts[:delay] || default_delay
  priority = opts && opts[:priority] || default_priority
  @conn.send(:release, job_id(val), priority.to_i, delay.to_i)
  add_deferrable(&block)
end

#reserve(timeout = nil, &block) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/em-beanstalk.rb', line 91

def reserve(timeout = nil, &block)
  if timeout
    @conn.send(:'reserve-with-timeout', timeout)
  else
    @conn.send(:reserve)
  end
  add_deferrable(&block)
end

#stats(type = nil, val = nil, &block) ⇒ Object



112
113
114
115
116
117
118
119
120
# File 'lib/em-beanstalk.rb', line 112

def stats(type = nil, val = nil, &block)
  case(type)
  when nil        then @conn.send(:stats)
  when :tube      then @conn.send(:'stats-tube', val)
  when :job       then @conn.send(:'stats-job', job_id(val))
  else                 raise EM::Beanstalk::InvalidCommand.new
  end
  add_deferrable(&block)
end

#use(tube, &block) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/em-beanstalk.rb', line 69

def use(tube, &block)
  return if @used_tube == tube
  @used_tube = tube
  @conn.send(:use, tube)
  add_deferrable(&block)

end

#watch(tube, &block) ⇒ Object



77
78
79
80
81
82
# File 'lib/em-beanstalk.rb', line 77

def watch(tube, &block)
  return if @watched_tubes.include?(tube)
  @watched_tubes.push(tube)
  @conn.send(:watch, tube)
  add_deferrable(&block)
end