Class: ActiveRecordTransactioner

Inherits:
Object
  • Object
show all
Defined in:
lib/active-record-transactioner.rb

Constant Summary collapse

DEFAULT_ARGS =
{
  :call_args => [],
  :call_method => :save!,
  :transaction_method => :transaction,
  :transaction_size => 1000,
  :max_running_threads => 2,
  :debug => false
}
ALLOWED_ARGS =
DEFAULT_ARGS.keys

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ActiveRecordTransactioner

Returns a new instance of ActiveRecordTransactioner.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/active-record-transactioner.rb', line 15

def initialize(args = {})
  args.each do |key, val|
    raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key)
  end
  
  @args = DEFAULT_ARGS.merge(args)
  @models = {}
  @threads = []
  @count = 0
  @lock = Monitor.new
  @lock_threads = Monitor.new
  @lock_models = {}
  @debug = @args[:debug]
  
  if block_given?
    begin
      yield self
    ensure
      flush
      join
    end
  end
end

Instance Method Details

#flushObject

Flushes the specified method on all the queued models in a thread for each type of model.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/active-record-transactioner.rb', line 54

def flush
  threads = []
  wait_for_threads
  
  @lock.synchronize do
    @models.each do |klass, val|
      next if val.empty?
      
      models = val
      @models[klass] = []
      @count -= models.length
      thread = nil
      
      @lock_models[klass].synchronize do
        thread = Thread.new do
          begin
            @lock_models[klass].synchronize do
              debug "Opening new transaction by using '#{@args[:transaction_method]}'."
              klass.__send__(@args[:transaction_method]) do
                models.each do |model|
                  # debug "Saving #{model.class.name}(#{model.id}) with method #{@args[:call_method]}"
                  model.__send__(@args[:call_method], *@args[:call_args])
                end
              end
            end
          rescue => e
            puts e.inspect
            puts e.backtrace
            
            if e.is_a?(NoMethodError) and e.message.to_s.include?("`reverse' for nil:NilClass")
              puts "Warning: Known Rails reverse error when using transaction - retrying in 2 sec."
              sleep 2
              puts "Retrying"
              puts
              retry
            end
            
            raise e
          ensure
            debug "Removing thread #{Thread.current.__id__}"
            @threads.delete(Thread.current)
            
            @lock.synchronize do
              ActiveRecord::Base.connection.close if ActiveRecord::Base.connection
            end
          end
        end
      end
      
      @lock_threads.synchronize do
        threads << thread
        @threads << thread
      end
    end
  end
  
  return {
    :threads => threads
  }
end

#joinObject

Waits for any remaining running threads.



116
117
118
119
120
121
122
# File 'lib/active-record-transactioner.rb', line 116

def join
  @lock_threads.synchronize do
    @threads.each do |thread|
      thread.join
    end
  end
end

#queue(model) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/active-record-transactioner.rb', line 40

def queue(model)
  @lock.synchronize do
    klass = model.class
    
    @lock_models[klass] = Mutex.new if !@lock_models.key?(klass)
    @models[klass] = [] if !@models.key?(klass)
    @models[klass] << model
    @count += 1
  end
  
  flush if @count >= @args[:transaction_size]
end