Class: Ruote::Beanstalk::Storage

Inherits:
Object
  • Object
show all
Includes:
StorageBase
Defined in:
lib/ruote/beanstalk/storage.rb

Overview

This ruote storage can be used in two modes : client and server.

Beanstalk is the medium.

client

The storage is pointed at a beanstalk queue

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Beanstalk::Storage.new('127.0.0.1:11300', opts)))

All the operations(put, get, get_many, …) of the storage are done by a server, connected to the same beanstalk queue.

server

The storage point to a beanstalk queue and receives orders from clients via the queue.

Ruote::Beanstalk::Storage.new(':11300', 'ruote_work', :fork => true)

Note the directory passed as a string. When in server mode, this storage uses an embedded Ruote::FsStorage for the actual storage.

The :fork => true lets the storage start and adjacent OS process containing the Beanstalk server. The storage takes care of stopping the beanstalk server when the Ruby process exits.

Instance Method Summary collapse

Constructor Details

#initialize(uri, directory = nil, options = nil) ⇒ Storage

Returns a new instance of Storage.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/ruote/beanstalk/storage.rb', line 73

def initialize(uri, directory=nil, options=nil)

  @uri, address, port = split_uri(uri)

  directory, @options = if directory.nil?
    [ nil, {} ]
  elsif directory.is_a?(Hash)
    [ nil, directory ]
  else
    [ directory, options || {} ]
  end

  @cloche = nil

  if directory
    #
    # run embedded Ruote::FsStorage

    require 'rufus/cloche'

    FileUtils.mkdir_p(directory)

    @cloche = Rufus::Cloche.new(
      :dir => directory, :nolock => @options['cloche_nolock'])
  end

  if fork_opts = @options[:fork]
    #
    # run beanstalk in a forked process

    fork_opts = fork_opts.is_a?(Hash) ? fork_opts : {}
    fork_opts = { :address => address, :port => port }.merge(fork_opts)

    Ruote::Beanstalk.fork(fork_opts)

    sleep 0.1
  end

  put_configuration

  serve if @cloche
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



212
213
214
215
# File 'lib/ruote/beanstalk/storage.rb', line 212

def add_type(type)

  # nothing to do
end

#closeObject



205
206
207
208
# File 'lib/ruote/beanstalk/storage.rb', line 205

def close

  shutdown
end

#delete(doc) ⇒ Object



152
153
154
155
156
157
# File 'lib/ruote/beanstalk/storage.rb', line 152

def delete(doc)

  return @cloche.delete(doc) if @cloche

  operate('delete', [ doc ])
end

#dump(type) ⇒ Object



189
190
191
192
# File 'lib/ruote/beanstalk/storage.rb', line 189

def dump(type)

  get_many(type)
end

#get(type, key) ⇒ Object



145
146
147
148
149
150
# File 'lib/ruote/beanstalk/storage.rb', line 145

def get(type, key)

  return @cloche.get(type, key) if @cloche

  operate('get', [ type, key ])
end

#get_many(type, key = nil, opts = {}) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/ruote/beanstalk/storage.rb', line 159

def get_many(type, key=nil, opts={})

  return operate('get_many', [ type, key, opts ]) unless @cloche

  if key
    key = Array(key).collect { |k|
      k[0..6] == '(?-mix:' ? Regexp.new(k[7..-2]) : "!#{k}"
    } if key
  end
    # assuming /!#{wfid}$/...

  @cloche.get_many(type, key, opts)
end

#get_msgsObject

One catch : will return [] in case of [network] error



118
119
120
121
# File 'lib/ruote/beanstalk/storage.rb', line 118

def get_msgs

  super rescue []
end

#ids(type) ⇒ Object



173
174
175
176
177
178
# File 'lib/ruote/beanstalk/storage.rb', line 173

def ids(type)

  return @cloche.ids(type) if @cloche

  operate('ids', [ type ])
end

#purge!Object



180
181
182
183
184
185
186
187
# File 'lib/ruote/beanstalk/storage.rb', line 180

def purge!

  if @cloche
    FileUtils.rm_rf(@cloche.dir)
  else
    operate('purge!', [])
  end
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it(losing all the documents that were in it).



219
220
221
222
223
224
225
226
# File 'lib/ruote/beanstalk/storage.rb', line 219

def purge_type!(type)

  if @cloche
    @cloche.purge_type!(type)
  else
    operate('purge_type!', [ type ])
  end
end

#put(doc, opts = {}) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ruote/beanstalk/storage.rb', line 130

def put(doc, opts={})

  doc.merge!('put_at' => Ruote.now_to_utc_s)

  return @cloche.put(doc, opts) if @cloche

  r = operate('put', [ doc ])

  return r unless r.nil?

  doc['_rev'] = (doc['_rev'] || -1) + 1 if opts[:update_rev]

  nil
end

#reserve(doc) ⇒ Object

One catch : will return true (failure) in case of [network] error



125
126
127
128
# File 'lib/ruote/beanstalk/storage.rb', line 125

def reserve(doc)

  super(doc) rescue true
end

#shutdownObject



194
195
196
197
198
199
200
201
202
203
# File 'lib/ruote/beanstalk/storage.rb', line 194

def shutdown

  Thread.list.each do |t|
    t.keys.each do |k|
      next unless k.to_s.match(CONN_KEY)
      t[k].close
      t[k] = nil
    end
  end
end