Class: Persist::Sharder
- Inherits:
-
Object
- Object
- Persist::Sharder
- Defined in:
- lib/rbbt/persist/tsv/sharder.rb
Constant Summary collapse
- MAX_CHAR =
255.chr
Instance Attribute Summary collapse
-
#closed ⇒ Object
Returns the value of attribute closed.
-
#databases ⇒ Object
Returns the value of attribute databases.
-
#db_type ⇒ Object
Returns the value of attribute db_type.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
-
#options ⇒ Object
Returns the value of attribute options.
-
#persistence_path ⇒ Object
Returns the value of attribute persistence_path.
-
#shard_function ⇒ Object
Returns the value of attribute shard_function.
-
#writable ⇒ Object
Returns the value of attribute writable.
Instance Method Summary collapse
- #<<(p) ⇒ Object
- #[](key, clean = false) ⇒ Object
- #[]=(key, value, clean = false) ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #collect ⇒ Object
- #database(key) ⇒ Object
- #each ⇒ Object
- #get_prefix(key) ⇒ Object
- #include?(key) ⇒ Boolean
-
#initialize(persistence_path, write = false, db_type = nil, options = {}, &block) ⇒ Sharder
constructor
A new instance of Sharder.
- #keys ⇒ Object
- #merge!(hash) ⇒ Object
- #prefix(key) ⇒ Object
- #range(*args) ⇒ Object
- #read(force = false) ⇒ Object
- #read? ⇒ Boolean
- #read_and_close ⇒ Object
- #size ⇒ Object
- #write ⇒ Object
- #write? ⇒ Boolean
- #write_and_close ⇒ Object
- #write_and_read ⇒ Object
Constructor Details
#initialize(persistence_path, write = false, db_type = nil, options = {}, &block) ⇒ Sharder
Returns a new instance of Sharder.
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 16 def initialize(persistence_path, write = false, db_type=nil, = {}, &block) @shard_function = block @options = @persistence_path = Path.setup(persistence_path) @mutex = Mutex.new @writable = write @db_type = db_type if write @databases = {} end end |
Instance Attribute Details
#closed ⇒ Object
Returns the value of attribute closed.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def closed @closed end |
#databases ⇒ Object
Returns the value of attribute databases.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def databases @databases end |
#db_type ⇒ Object
Returns the value of attribute db_type.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def db_type @db_type end |
#mutex ⇒ Object
Returns the value of attribute mutex.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def mutex @mutex end |
#options ⇒ Object
Returns the value of attribute options.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def @options end |
#persistence_path ⇒ Object
Returns the value of attribute persistence_path.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def persistence_path @persistence_path end |
#shard_function ⇒ Object
Returns the value of attribute shard_function.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def shard_function @shard_function end |
#writable ⇒ Object
Returns the value of attribute writable.
14 15 16 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 14 def writable @writable end |
Instance Method Details
#<<(p) ⇒ Object
29 30 31 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 29 def <<(key,value) self[key] = value end |
#[](key, clean = false) ⇒ Object
214 215 216 217 218 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 214 def [](key, clean=false) database = database(key) return nil if database.nil? database.send(:[], key) end |
#[]=(key, value, clean = false) ⇒ Object
210 211 212 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 210 def []=(key, value, clean = false) database(key).send(:[]=, key, value) end |
#close ⇒ Object
93 94 95 96 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 93 def close @closed = true super end |
#closed? ⇒ Boolean
89 90 91 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 89 def closed? @closed end |
#collect ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 146 def collect res = [] each do |key, value| res << if block_given? yield key, value else [key, value] end end res end |
#database(key) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 53 def database(key) shard = key =~ /__tsv_/ ? "metadata" : shard_function.call(key) if databases.include? shard databases[shard] else if shard == 'metadata' database ||= begin path = File.join(persistence_path, 'shard-' << shard.to_s) (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, "HDB", @options) : nil end else database ||= begin path = File.join(persistence_path, 'shard-' << shard.to_s) (writable or File.exist?(path)) ? Persist.open_database(path, writable, :clean, db_type, @options) : nil end end if database databases[shard] = database else Log.warn "Database #{ path } missing" if nil end end end |
#each ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 134 def each databases.values.each do |database| database.each do |k,v| yield k, v end end end |
#get_prefix(key) ⇒ Object
84 85 86 87 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 84 def get_prefix(key) keys = prefix(key) select(:key => keys) end |
#include?(key) ⇒ Boolean
142 143 144 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 142 def include?(key) self[key] != nil end |
#keys ⇒ Object
206 207 208 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 206 def keys databases.values.collect{|d| d.keys }.flatten - TSV::ENTRY_KEYS.to_a end |
#merge!(hash) ⇒ Object
200 201 202 203 204 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 200 def merge!(hash) hash.each do |key,values| self[key] = values end end |
#prefix(key) ⇒ Object
80 81 82 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 80 def prefix(key) range(key, 1, key + MAX_CHAR, 1) end |
#range(*args) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 127 def range(*args) databases.values.inject([]) do |acc,database| acc.concat database.range(*args) if TokyoCabinet::BDB === database acc end end |
#read(force = false) ⇒ Object
98 99 100 101 102 103 104 105 106 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 98 def read(force = false) raise "SIOT" return if not write? and not closed and not force self.close databases.each{|d| d.read } @writable = false @closed = false self end |
#read? ⇒ Boolean
123 124 125 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 123 def read? ! write? end |
#read_and_close ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 188 def read_and_close @mutex.synchronize do read if @closed or not read? res = begin yield ensure close end res end end |
#size ⇒ Object
237 238 239 240 241 242 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 237 def size databases.inject(0){|acc,i| shard, db = i; acc += db.size } end |
#write ⇒ Object
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 108 def write(force = true) return if write? and not closed and not force self.close databases.each{|d| d.write } @writable = true @closed = false self end |
#write? ⇒ Boolean
119 120 121 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 119 def write? @writable end |
#write_and_close ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 173 def write_and_close lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield ensure close end res end end end |
#write_and_read ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/rbbt/persist/tsv/sharder.rb', line 158 def write_and_read lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield ensure read end res end end end |