Class: ActiveRecordTransactioner

Inherits:
Object
  • Object
show all
Defined in:
lib/active-record-transactioner.rb

Constant Summary collapse

DEFAULT_ARGS =
{
  :call_args => [],
  :call_method => :save!,
  :transaction_method => :transaction,
  :transaction_size => 1000
}
ALLOWED_ARGS =
DEFAULT_ARGS.keys

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ActiveRecordTransactioner

Returns a new instance of ActiveRecordTransactioner.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/active-record-transactioner.rb', line 13

def initialize(args = {})
  args.each do |key, val|
    raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key)
  end
  
  @args = DEFAULT_ARGS.merge(args)
  @models = {}
  @threads = []
  @count = 0
  @lock = Monitor.new
  @lock_threads = Monitor.new
  @lock_models = {}
  
  if block_given?
    begin
      yield self
    ensure
      flush
      join
    end
  end
end

Instance Method Details

#flushObject

Flushes the specified method on all the queued models in a thread for each type of model.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/active-record-transactioner.rb', line 50

def flush
  threads = []
  
  @lock.synchronize do
    @models.each do |klass, val|
      next if val.empty?
      
      models = val
      @models[klass] = []
      @count -= models.length
      thread = nil
      
      @lock_models[klass].synchronize do
        thread = Thread.new do
          begin
            @lock_models[klass].synchronize do
              klass.__send__(@args[:transaction_method]) do
                models.each do |model|
                  model.__send__(@args[:call_method], *@args[:call_args])
                end
              end
            end
          rescue => e
            puts e.inspect
            puts e.backtrace
            
            if e.is_a?(NoMethodError) and e.message.to_s.include?("`reverse' for nil:NilClass")
              puts
              puts "Warning: Known Rails reverse error when using transaction - retrying in 2 sec."
              sleep 2
              puts "Retrying"
              puts
              retry
            end
          ensure
            @threads.delete(Thread.current)
          end
        end
      end
      
      @lock_threads.synchronize do
        threads << thread
        @threads << thread
      end
    end
  end
  
  return {
    :threads => threads
  }
end

#joinObject

Waits for any remaining running threads.



103
104
105
106
107
108
109
# File 'lib/active-record-transactioner.rb', line 103

def join
  @lock_threads.synchronize do
    @threads.each do |thread|
      thread.join
    end
  end
end

#queue(model) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/active-record-transactioner.rb', line 37

def queue(model)
  @lock.synchronize do
    klass = model.class
    
    @lock_models[klass] = Mutex.new if !@lock_models.key?(klass)
    @models[klass] = [] if !@models.key?(klass)
    @models[klass] << model
    @count += 1
    flush if @count >= @args[:transaction_size]
  end
end