Class: Yup::State::BDB
- Inherits:
-
Object
show all
- Defined in:
- lib/yup/state/bdb.rb
Defined Under Namespace
Classes: FeedbackHandler, RequestForwarder
Constant Summary
collapse
- RE_LEN =
1000
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(uri, forward_to, feedback_channel) ⇒ BDB
Returns a new instance of BDB.
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/yup/state/bdb.rb', line 23
def initialize(uri, forward_to, feedback_channel)
@uri = URI.parse(uri)
@path = @uri.path
@name = forward_to
@feedback_channel = feedback_channel
@logger = Yup.logger.clone
@logger.progname = "Yup::State::BDB"
FileUtils.mkdir_p(@path)
@env = Bdb::Env.new(0)
@env = @env.open(@path, Bdb::DB_CREATE | Bdb::DB_INIT_MPOOL | Bdb::DB_INIT_CDB, 0)
@queue = @env.db
@queue.re_len = RE_LEN
@queue.open(nil, @name, nil, Bdb::Db::QUEUE, Bdb::DB_CREATE, 0)
end
|
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
15
16
17
|
# File 'lib/yup/state/bdb.rb', line 15
def queue
@queue
end
|
Class Method Details
.repair_if_need(path) ⇒ Object
17
18
19
20
21
|
# File 'lib/yup/state/bdb.rb', line 17
def self.repair_if_need(path)
env = Bdb::Env.new(0)
env.open(path, Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_RECOVER, 0)
env.close()
end
|
Instance Method Details
#bpop ⇒ Object
49
50
51
52
53
|
# File 'lib/yup/state/bdb.rb', line 49
def bpop
data = @queue.get(nil, "", nil, Bdb::DB_CONSUME_WAIT)
@logger.debug { "Bpoped: #{data.strip}" }
data
end
|
#dispose ⇒ Object
62
63
64
|
# File 'lib/yup/state/bdb.rb', line 62
def dispose
@queue.close(0)
end
|
#push(data) ⇒ Object
40
41
42
43
44
45
46
47
|
# File 'lib/yup/state/bdb.rb', line 40
def push(data)
@logger.debug { "Push: #{data}" }
i = 0
until (chunk = data.slice(i, RE_LEN)).nil?
@queue.put(nil, "", chunk, Bdb::DB_APPEND)
i += @queue.re_len
end
end
|
#pushback(data) ⇒ Object
55
56
57
58
59
60
|
# File 'lib/yup/state/bdb.rb', line 55
def pushback(data)
@logger.debug { "Push to the feedback channel: #{data.strip}" }
sock = UNIXSocket.new(@feedback_channel)
sock.send(data, 0)
sock.close
end
|