Class: Yup::State::BDB

Inherits:
Object
  • Object
show all
Defined in:
lib/yup/state/bdb.rb

Defined Under Namespace

Classes: FeedbackHandler, RequestForwarder

Constant Summary collapse

RE_LEN =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, forward_to, feedback_channel) ⇒ BDB

Returns a new instance of BDB.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/yup/state/bdb.rb', line 23

def initialize(uri, forward_to, feedback_channel)
  @uri  = URI.parse(uri)
  @path = @uri.path
  @name = forward_to
  @feedback_channel = feedback_channel

  @logger = Yup.logger.clone
  @logger.progname = "Yup::State::BDB"

  FileUtils.mkdir_p(@path)
  @env   = Bdb::Env.new(0)
  @env   = @env.open(@path, Bdb::DB_CREATE | Bdb::DB_INIT_MPOOL | Bdb::DB_INIT_CDB, 0)
  @queue = @env.db
  @queue.re_len = RE_LEN
  @queue.open(nil, @name, nil, Bdb::Db::QUEUE, Bdb::DB_CREATE, 0)
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/yup/state/bdb.rb', line 15

def queue
  @queue
end

Class Method Details

.repair_if_need(path) ⇒ Object



17
18
19
20
21
# File 'lib/yup/state/bdb.rb', line 17

def self.repair_if_need(path)
  env = Bdb::Env.new(0)
  env.open(path, Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_RECOVER, 0)
  env.close()
end

Instance Method Details

#bpopObject



49
50
51
52
53
# File 'lib/yup/state/bdb.rb', line 49

def bpop
  data = @queue.get(nil, "", nil, Bdb::DB_CONSUME_WAIT)
  @logger.debug { "Bpoped: #{data.strip}" }
  data
end

#disposeObject



62
63
64
# File 'lib/yup/state/bdb.rb', line 62

def dispose
  @queue.close(0)
end

#push(data) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/yup/state/bdb.rb', line 40

def push(data)
  @logger.debug { "Push: #{data}" }
  i = 0
  until (chunk = data.slice(i, RE_LEN)).nil?
    @queue.put(nil, "", chunk, Bdb::DB_APPEND)
    i += @queue.re_len
  end
end

#pushback(data) ⇒ Object



55
56
57
58
59
60
# File 'lib/yup/state/bdb.rb', line 55

def pushback(data)
  @logger.debug { "Push to the feedback channel: #{data.strip}" }
  sock = UNIXSocket.new(@feedback_channel)
  sock.send(data, 0)
  sock.close
end