Class: ActiveRecordTransactioner
- Inherits:
-
Object
- Object
- ActiveRecordTransactioner
- Defined in:
- lib/active-record-transactioner.rb
Constant Summary collapse
- DEFAULT_ARGS =
{ :call_args => [], :call_method => :save!, :transaction_method => :transaction, :transaction_size => 1000, :max_running_threads => 2, :debug => false }
- ALLOWED_ARGS =
DEFAULT_ARGS.keys
Instance Method Summary collapse
-
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
-
#initialize(args = {}) ⇒ ActiveRecordTransactioner
constructor
A new instance of ActiveRecordTransactioner.
-
#join ⇒ Object
Waits for any remaining running threads.
-
#queue(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
Constructor Details
#initialize(args = {}) ⇒ ActiveRecordTransactioner
Returns a new instance of ActiveRecordTransactioner.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/active-record-transactioner.rb', line 15 def initialize(args = {}) args.each do |key, val| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) end @args = DEFAULT_ARGS.merge(args) @models = {} @threads = [] @count = 0 @lock = Monitor.new @lock_threads = Monitor.new @lock_models = {} @debug = @args[:debug] if block_given? begin yield self ensure flush join end end end |
Instance Method Details
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/active-record-transactioner.rb', line 54 def flush threads = [] wait_for_threads @lock.synchronize do @models.each do |klass, val| next if val.empty? models = val @models[klass] = [] @count -= models.length thread = nil @lock_models[klass].synchronize do thread = Thread.new do begin @lock_models[klass].synchronize do debug "Opening new transaction by using '#{@args[:transaction_method]}'." klass.__send__(@args[:transaction_method]) do models.each do |model| # debug "Saving #{model.class.name}(#{model.id}) with method #{@args[:call_method]}" model.__send__(@args[:call_method], *@args[:call_args]) end end end rescue => e puts e.inspect puts e.backtrace if e.is_a?(NoMethodError) and e..to_s.include?("`reverse' for nil:NilClass") puts "Warning: Known Rails reverse error when using transaction - retrying in 2 sec." sleep 2 puts "Retrying" puts retry end raise e ensure debug "Removing thread #{Thread.current.__id__}" @threads.delete(Thread.current) @lock.synchronize do ActiveRecord::Base.connection.close if ActiveRecord::Base.connection end end end end @lock_threads.synchronize do threads << thread @threads << thread end end end return { :threads => threads } end |
#join ⇒ Object
Waits for any remaining running threads.
116 117 118 119 120 121 122 |
# File 'lib/active-record-transactioner.rb', line 116 def join @lock_threads.synchronize do @threads.each do |thread| thread.join end end end |
#queue(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/active-record-transactioner.rb', line 40 def queue(model) @lock.synchronize do klass = model.class @lock_models[klass] = Mutex.new if !@lock_models.key?(klass) @models[klass] = [] if !@models.key?(klass) @models[klass] << model @count += 1 end flush if @count >= @args[:transaction_size] end |