Class: Ruote::Beanstalk::BsStorage

Inherits:
Object
  • Object
show all
Includes:
StorageBase
Defined in:
lib/ruote/beanstalk/storage.rb

Overview

This ruote storage can be used in two modes : client and server.

Beanstalk is the medium.

client

The storage is pointed at a beanstalk queue

engine = Ruote::Engine.new(
  Ruote::Worker.new(
    Ruote::Beanstalk::BsStorage.new('127.0.0.1:11300', opts)))

All the operations (put, get, get_many, …) of the storage are done by a server, connected to the same beanstalk queue.

server

The storage point to a beanstalk queue and receives orders from clients via the queue.

Ruote::Beanstalk::BsStorage.new(':11300', 'ruote_work', :fork => true)

Note the directory passed as a string. When in server mode, this storage uses an embedded Ruote::FsStorage for the actual storage.

The :fork => true lets the storage start and adjacent OS process containing the Beanstalk server. The storage takes care of stopping the beanstalk server when the Ruby process exits.

Instance Method Summary collapse

Constructor Details

#initialize(uri, directory = nil, options = nil) ⇒ BsStorage

Returns a new instance of BsStorage.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/ruote/beanstalk/storage.rb', line 73

def initialize (uri, directory=nil, options=nil)

  @uri, address, port = split_uri(uri)

  directory, @options = if directory.nil?
    [ nil, {} ]
  elsif directory.is_a?(Hash)
    [ nil, directory ]
  else
    [ directory, options || {} ]
  end

  @cloche = nil

  if directory
    #
    # run embedded Ruote::FsStorage

    require 'rufus/cloche'

    FileUtils.mkdir_p(directory)

    @cloche = Rufus::Cloche.new(
      :dir => directory, :nolock => @options['cloche_nolock'])
  end

  if fork_opts = @options[:fork]
    #
    # run beanstalk in a forked process

    fork_opts = fork_opts.is_a?(Hash) ? fork_opts : {}
    fork_opts = { :address => address, :port => port }.merge(fork_opts)

    Ruote::Beanstalk.fork(fork_opts)

    sleep 0.1
  end

  put_configuration

  serve if @cloche
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



186
187
188
189
# File 'lib/ruote/beanstalk/storage.rb', line 186

def add_type (type)

  # nothing to do
end

#delete(doc) ⇒ Object



138
139
140
141
142
143
# File 'lib/ruote/beanstalk/storage.rb', line 138

def delete (doc)

  return @cloche.delete(doc) if @cloche

  operate('delete', [ doc ])
end

#dump(type) ⇒ Object



168
169
170
171
# File 'lib/ruote/beanstalk/storage.rb', line 168

def dump (type)

  get_many(type)
end

#get(type, key) ⇒ Object



131
132
133
134
135
136
# File 'lib/ruote/beanstalk/storage.rb', line 131

def get (type, key)

  return @cloche.get(type, key) if @cloche

  operate('get', [ type, key ])
end

#get_many(type, key = nil, opts = {}) ⇒ Object



145
146
147
148
149
150
# File 'lib/ruote/beanstalk/storage.rb', line 145

def get_many (type, key=nil, opts={})

  return @cloche.get_many(type, key, opts) if @cloche

  operate('get_many', [ type, key, opts ])
end

#ids(type) ⇒ Object



152
153
154
155
156
157
# File 'lib/ruote/beanstalk/storage.rb', line 152

def ids (type)

  return @cloche.ids(type) if @cloche

  operate('ids', [ type ])
end

#purge!Object



159
160
161
162
163
164
165
166
# File 'lib/ruote/beanstalk/storage.rb', line 159

def purge!

  if @cloche
    FileUtils.rm_rf(@cloche.dir)
  else
    operate('purge!', [])
  end
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it (losing all the documents that were in it).



193
194
195
196
197
198
199
200
# File 'lib/ruote/beanstalk/storage.rb', line 193

def purge_type! (type)

  if @cloche
    @cloche.purge_type!(type)
  else
    operate('purge_type!', [ type ])
  end
end

#put(doc, opts = {}) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/ruote/beanstalk/storage.rb', line 116

def put (doc, opts={})

  doc.merge!('put_at' => Ruote.now_to_utc_s)

  return @cloche.put(doc, opts) if @cloche

  r = operate('put', [ doc ])

  return r unless r.nil?

  doc['_rev'] = (doc['_rev'] || -1) + 1 if opts[:update_rev]

  nil
end

#shutdownObject



173
174
175
176
177
178
179
180
181
182
# File 'lib/ruote/beanstalk/storage.rb', line 173

def shutdown

  Thread.list.each do |t|
    t.keys.each do |k|
      next unless k.match(/^BeanstalkConnection\_/)
      t[k].close
      t[k] = nil
    end
  end
end