Class: DpStmMap::TransactionLog
- Inherits:
-
Object
- Object
- DpStmMap::TransactionLog
- Defined in:
- lib/dp_stm_map/Manager.rb
Instance Method Summary collapse
- #add_listener(current_state, &block) ⇒ Object
-
#initialize(log_dir) ⇒ TransactionLog
constructor
A new instance of TransactionLog.
- #last_id ⇒ Object
- #read_tx(id) ⇒ Object
- #store_transaction(references_to_change, values_to_add, values_to_remove) ⇒ Object
- #write_tx(id, tx) ⇒ Object
Constructor Details
#initialize(log_dir) ⇒ TransactionLog
Returns a new instance of TransactionLog.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/dp_stm_map/Manager.rb', line 190 def initialize log_dir @log_dir=log_dir FileUtils.mkdir_p(log_dir) unless File.exist?(log_dir) unless File.exist?("#{log_dir}/last_id.txt") File.open("#{log_dir}/last_id.txt","w") {|f| f.write("0")} @last_id=0 else @last_id=File.open("#{log_dir}/last_id.txt","r") {|f| f.read().to_i} end @mutex=Mutex.new @change=ConditionVariable.new end |
Instance Method Details
#add_listener(current_state, &block) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/dp_stm_map/Manager.rb', line 218 def add_listener current_state, &block Thread.new do begin loop do arg=@mutex.synchronize do until current_state < last_id @change.wait(@mutex) end current_state+=1 [current_state, read_tx(current_state)] end yield arg end rescue => e # puts "Error during processing: #{$!}" # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" end end end |
#last_id ⇒ Object
241 242 243 |
# File 'lib/dp_stm_map/Manager.rb', line 241 def last_id @last_id end |
#read_tx(id) ⇒ Object
207 208 209 |
# File 'lib/dp_stm_map/Manager.rb', line 207 def read_tx id JSON::parse(File.open("#{@log_dir}/#{id}.json","r") {|f| f.read}) end |
#store_transaction(references_to_change, values_to_add, values_to_remove) ⇒ Object
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/dp_stm_map/Manager.rb', line 245 def store_transaction references_to_change, values_to_add, values_to_remove @mutex.synchronize do txid=last_id+1 write_tx txid, [references_to_change, values_to_add, values_to_remove] File.open("#{@log_dir}/last_id.txt","w") {|f| f.write(txid.to_s)} @change.broadcast @last_id=txid txid end end |
#write_tx(id, tx) ⇒ Object
211 212 213 214 |
# File 'lib/dp_stm_map/Manager.rb', line 211 def write_tx id, tx File.open("#{@log_dir}/#{id}.json","w") {|f| f.write(tx.to_json)} end |