Class: OFlow::Actors::Persister
- Inherits:
-
OFlow::Actor
- Object
- OFlow::Actor
- OFlow::Actors::Persister
- Defined in:
- lib/oflow/actors/persister.rb
Overview
Actor that persists records to the local file system as JSON representations of the records. Records can be the whole contents of the box received or a sub element of the contents. The key to the records are keys provided either in the record data or outside the data but somewhere else in the box received. Options for maintaining historic records and sequence number locking are included. If no sequence number is provide the Persister will assume there is no checking required and write anyway.
Records are stored as JSON with the filename as the key and sequence number. The format of the file name is <key>~<seq>.json. As an example, a record stored with a key of ‘first’ and a sequence number of 3 (third time saved) would be ‘first~3.json.
Defined Under Namespace
Classes: ExistsError, KeyError, NotFoundError, SeqError
Instance Attribute Summary collapse
-
#data_path ⇒ Object
readonly
Returns the value of attribute data_path.
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
-
#historic ⇒ Object
readonly
Returns the value of attribute historic.
-
#key_path ⇒ Object
readonly
Returns the value of attribute key_path.
-
#seq_path ⇒ Object
readonly
Returns the value of attribute seq_path.
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
-
#caching? ⇒ Boolean
Returns true if the actor is caching records.
- #clear(box) ⇒ Object
- #delete(box) ⇒ Object
- #delete_historic(key, seq) ⇒ Object
-
#initialize(task, options) ⇒ Persister
constructor
Initializes the persister with options of:.
- #insert(box) ⇒ Object
- #key_seq_from_path(path) ⇒ Object
- #load(path) ⇒ Object
- #perform(op, box) ⇒ Object
- #query(box) ⇒ Object
- #read(box) ⇒ Object
-
#save(rec, key, seq) ⇒ Object
internal use only.
- #update(box) ⇒ Object
Methods inherited from OFlow::Actor
#inputs, #options, #outputs, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ Persister
Initializes the persister with options of:
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/oflow/actors/persister.rb', line 34 def initialize(task, ) super @dir = [:dir] if @dir.nil? @dir = File.join('db', task.full_name.gsub(':', '/')) end @key_path = .fetch(:key_path, 'key') @seq_path = .fetch(:seq_path, 'seq') @data_path = .fetch(:data_path, nil) # nil means all contents if .fetch(:cache, true) # key is record key, value is [seq, rec] @cache = {} else @cache = nil end @historic = .fetch(:historic, false) if Dir.exist?(@dir) unless @cache.nil? Dir.glob(File.join('**', '*.json')).each do |path| path = File.join(@dir, path) if File.symlink?(path) rec = load(path) unless @cache.nil? key, seq = key_seq_from_path(path) @cache[key] = [seq, rec] end end end end else `mkdir -p #{@dir}` end end |
Instance Attribute Details
#data_path ⇒ Object (readonly)
Returns the value of attribute data_path.
23 24 25 |
# File 'lib/oflow/actors/persister.rb', line 23 def data_path @data_path end |
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
20 21 22 |
# File 'lib/oflow/actors/persister.rb', line 20 def dir @dir end |
#historic ⇒ Object (readonly)
Returns the value of attribute historic.
24 25 26 |
# File 'lib/oflow/actors/persister.rb', line 24 def historic @historic end |
#key_path ⇒ Object (readonly)
Returns the value of attribute key_path.
21 22 23 |
# File 'lib/oflow/actors/persister.rb', line 21 def key_path @key_path end |
#seq_path ⇒ Object (readonly)
Returns the value of attribute seq_path.
22 23 24 |
# File 'lib/oflow/actors/persister.rb', line 22 def seq_path @seq_path end |
Instance Method Details
#caching? ⇒ Boolean
Returns true if the actor is caching records.
101 102 103 |
# File 'lib/oflow/actors/persister.rb', line 101 def caching?() !@cache.nil? end |
#clear(box) ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/oflow/actors/persister.rb', line 197 def clear(box) @cache = {} unless @cache.nil? `rm -rf #{@dir}` # remake the dir in preparation for future inserts `mkdir -p #{@dir}` nil end |
#delete(box) ⇒ Object
154 155 156 157 158 159 160 161 |
# File 'lib/oflow/actors/persister.rb', line 154 def delete(box) key = box.get(@key_path) @cache.delete(key) unless @cache.nil? linkpath = File.join(@dir, "#{key}.json") File.delete(linkpath) delete_historic(key, nil) unless @historic nil end |
#delete_historic(key, seq) ⇒ Object
226 227 228 229 230 231 232 |
# File 'lib/oflow/actors/persister.rb', line 226 def delete_historic(key, seq) Dir.glob(File.join(@dir, '**', "#{key}~*.json")).each do |path| _, s = key_seq_from_path(path) next if s == seq File.delete(path) end end |
#insert(box) ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/oflow/actors/persister.rb', line 91 def insert(box) key = box.get(@key_path) raise KeyError.new(:insert) if key.nil? box = box.set(@seq_path, 1) rec = box.get(@data_path) @cache[key] = [1, rec] unless @cache.nil? save(rec, key, 1) end |
#key_seq_from_path(path) ⇒ Object
234 235 236 237 238 239 |
# File 'lib/oflow/actors/persister.rb', line 234 def key_seq_from_path(path) path = File.readlink(path) if File.symlink?(path) base = File.basename(path)[0..-6] # strip off '.json' a = base.split('~') [a[0..-2].join('~'), a[-1].to_i] end |
#load(path) ⇒ Object
221 222 223 224 |
# File 'lib/oflow/actors/persister.rb', line 221 def load(path) return nil unless File.exist?(path) Oj.load_file(path, :mode => :object) end |
#perform(op, box) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/oflow/actors/persister.rb', line 69 def perform(op, box) dest = box.contents[:dest] result = nil case op when :insert, :create result = insert(box) when :get, :read result = read(box) when :update result = update(box) when :delete, :remove result = delete(box) when :query result = query(box) when :clear result = clear(box) else raise OpError.new(task.full_name, op) end task.ship(dest, Box.new(result, box.tracker)) end |
#query(box) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/oflow/actors/persister.rb', line 163 def query(box) recs = {} expr = box.get('expr') if expr.nil? if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| recs[File.basename(path)[0..-6]] = load(path) if File.symlink?(path) end else @cache.each do |key,seq_rec| recs[key] = seq_rec[1] end end elsif expr.is_a?(Proc) if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| next unless File.symlink?(path) rec = load(path) key, seq = key_seq_from_path(path) recs[key] = rec if expr.call(rec, key, seq) end else @cache.each do |key,seq_rec| rec = seq_rec[1] recs[key] = rec if expr.call(rec, key, seq_rec[0]) end end else # TBD add support for string safe expressions in the future raise Exception.new("expr can only be a Proc, not a #{expr.class}") end recs end |
#read(box) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/oflow/actors/persister.rb', line 105 def read(box) # Should be a Hash. key = box.contents[:key] raise KeyError(:read) if key.nil? if @cache.nil? linkpath = File.join(@dir, "#{key}.json") rec = load(linkpath) else unless (seq_rec = @cache[key]).nil? rec = seq_rec[1] end end # If not found rec will be nil, that is okay. rec end |
#save(rec, key, seq) ⇒ Object
internal use only
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/oflow/actors/persister.rb', line 206 def save(rec, key, seq) filename = "#{key}~#{seq}.json" path = File.join(@dir, filename) linkpath = File.join(@dir, "#{key}.json") raise ExistsError.new(key, seq) if File.exist?(path) Oj.to_file(path, rec, :mode => :object) begin File.delete(linkpath) rescue Exception # ignore end File.symlink(filename, linkpath) rec end |
#update(box) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/oflow/actors/persister.rb', line 121 def update(box) key = box.get(@key_path) raise KeyError.new(:update) if key.nil? seq = box.get(@seq_path) if @cache.nil? if (seq_rec = @cache[key]).nil? raise NotFoundError.new(key) end seq = seq_rec[0] if seq.nil? else seq = 0 has_rec = false Dir.glob(File.join(@dir, '**', "#{key}*.json")).each do |path| if File.symlink?(path) has_rec = true next end _, s = key_seq_from_path(path) seq = s if seq < s end end raise NotFoundError.new(key) unless has_rec raise SeqError.new(:update, key) if seq.nil? || 0 == seq seq += 1 box = box.set(@seq_path, seq) rec = box.get(@data_path) @cache[key] = [seq, rec] unless @cache.nil? rec = save(rec, key, seq) delete_historic(key, seq) unless @historic rec end |