Class: Sharder

Inherits:
Object
  • Object
show all
Defined in:
lib/scout/persist/engine/sharder.rb

Constant Summary collapse

MAX_CHAR =
255.chr

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(persistence_path, write = false, db_type = nil, persist_options = {}, &block) ⇒ Sharder

Returns a new instance of Sharder.



4
5
6
7
8
9
10
11
12
13
14
15
# File 'lib/scout/persist/engine/sharder.rb', line 4

def initialize(persistence_path, write=false, db_type=nil, persist_options={}, &block)
  @shard_function = persist_options[:shard_function] || block
  @persist_options = persist_options
  @persistence_path = Path.setup(persistence_path)
  @mutex = Mutex.new
  @writable = write
  @db_type = db_type

  if write
    @databases = {} 
  end
end

Instance Attribute Details

#closedObject

Returns the value of attribute closed.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def closed
  @closed
end

#databasesObject

Returns the value of attribute databases.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def databases
  @databases
end

#db_typeObject

Returns the value of attribute db_type.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def db_type
  @db_type
end

#mutexObject

Returns the value of attribute mutex.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def mutex
  @mutex
end

#persist_optionsObject

Returns the value of attribute persist_options.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def persist_options
  @persist_options
end

#persistence_pathObject

Returns the value of attribute persistence_path.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def persistence_path
  @persistence_path
end

#shard_functionObject

Returns the value of attribute shard_function.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def shard_function
  @shard_function
end

#writableObject

Returns the value of attribute writable.



2
3
4
# File 'lib/scout/persist/engine/sharder.rb', line 2

def writable
  @writable
end

Instance Method Details

#<<(values) ⇒ Object

def <<(p)

return if p.nil?
self[p.first] = p.last

end



195
196
197
198
# File 'lib/scout/persist/engine/sharder.rb', line 195

def <<(values)
  key, rest = values
  database(key) << values
end

#[](key, clean = false) ⇒ Object



184
185
186
187
188
# File 'lib/scout/persist/engine/sharder.rb', line 184

def [](key, clean=false)
  database = database(key)
  return nil if database.nil?
  database.send(:[], key)
end

#[]=(key, value, clean = false) ⇒ Object



180
181
182
# File 'lib/scout/persist/engine/sharder.rb', line 180

def []=(key, value, clean = false)
  database(key).send(:[]=, key, value)
end

#closeObject



63
64
65
66
# File 'lib/scout/persist/engine/sharder.rb', line 63

def close
  @closed = true
  super
end

#closed?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/scout/persist/engine/sharder.rb', line 59

def closed?
  @closed
end

#collectObject



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/scout/persist/engine/sharder.rb', line 116

def collect
  res = []
  each do |key, value|
    res << if block_given?
             yield key, value
    else
      [key, value]
    end
  end
  res
end

#database(key) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/scout/persist/engine/sharder.rb', line 34

def database(key)
  shard = shard_function.call(key)
  if databases.include? shard
    databases[shard]
  else
    database = databases[shard] ||= begin
                                      path = File.join(persistence_path, 'shard-' << shard.to_s)
                                      (writable or File.exist?(path)) ? Persist.open_database(path, (File.exist?(path) ? false : writable), :clean, db_type, @persist_options) : nil
                                    end
    Log.warn "Database #{ path } missing" if database.nil?
    database
  end
end

#eachObject



104
105
106
107
108
109
110
# File 'lib/scout/persist/engine/sharder.rb', line 104

def each
  databases.values.each do |database|
    database.each do |k,v|
      yield k, v
    end
  end
end

#get_prefix(key) ⇒ Object



54
55
56
57
# File 'lib/scout/persist/engine/sharder.rb', line 54

def get_prefix(key)
  keys = prefix(key)
  select(:key => keys)
end

#include?(key) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/scout/persist/engine/sharder.rb', line 112

def include?(key)
  self[key] != nil
end

#keysObject



176
177
178
# File 'lib/scout/persist/engine/sharder.rb', line 176

def keys
  databases.values.collect{|d| d.keys }
end

#merge!(hash) ⇒ Object



170
171
172
173
174
# File 'lib/scout/persist/engine/sharder.rb', line 170

def merge!(hash)
  hash.each do |key,values|
    self[key] = values
  end
end

#prefix(key) ⇒ Object



50
51
52
# File 'lib/scout/persist/engine/sharder.rb', line 50

def prefix(key)
  range(key, 1, key + MAX_CHAR, 1)
end

#range(*args) ⇒ Object



97
98
99
100
101
102
# File 'lib/scout/persist/engine/sharder.rb', line 97

def range(*args)
  databases.values.inject([]) do |acc,database|
    acc.concat database.range(*args) if TokyoCabinet::BDB === database
    acc
  end
end

#read(force = false) ⇒ Object



68
69
70
71
72
73
74
75
76
# File 'lib/scout/persist/engine/sharder.rb', line 68

def read(force = false)
  raise "SIOT"
  return if not write? and not closed and not force
  self.close
  databases.each{|d| d.read }
  @writable = false
  @closed = false
  self
end

#read?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/scout/persist/engine/sharder.rb', line 93

def read?
  ! write?
end

#read_and_closeObject



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/scout/persist/engine/sharder.rb', line 158

def read_and_close
  @mutex.synchronize do
    read if @closed or not read?
    res = begin
            yield
          ensure
            close
          end
    res
  end
end

#sizeObject



212
213
214
215
216
217
# File 'lib/scout/persist/engine/sharder.rb', line 212

def size
  databases.inject(0){|acc,i| 
    shard, db = i; 
    acc += db.size 
  }
end

#writeObject



78
79
80
81
82
83
84
85
86
87
# File 'lib/scout/persist/engine/sharder.rb', line 78

def write(force = true)
  return if write? and not closed and not force
  self.close

  databases.each{|d| d.write }

  @writable = true
  @closed = false
  self
end

#write?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/scout/persist/engine/sharder.rb', line 89

def write?
  @writable
end

#write_and_closeObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/scout/persist/engine/sharder.rb', line 143

def write_and_close
  lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
  Misc.lock(lock_filename) do
    @mutex.synchronize do
      write if @closed or not write?
      res = begin
              yield
            ensure
              close
            end
      res
    end
  end
end

#write_and_readObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/scout/persist/engine/sharder.rb', line 128

def write_and_read
  lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
  Misc.lock(lock_filename) do
    @mutex.synchronize do
      write if @closed or not write?
      res = begin
              yield
            ensure
              read
            end
      res
    end
  end
end