Class: ActiveRecordTransactioner

Inherits:
Object
  • Object
show all
Defined in:
lib/active-record-transactioner.rb

Constant Summary collapse

DEFAULT_ARGS =
{
  call_args: [],
  call_method: :save!,
  transaction_method: :transaction,
  transaction_size: 1000,
  threadded: false,
  max_running_threads: 2,
  debug: false
}
ALLOWED_ARGS =
DEFAULT_ARGS.keys

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ActiveRecordTransactioner

Returns a new instance of ActiveRecordTransactioner.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/active-record-transactioner.rb', line 16

def initialize(args = {})
  args.each { |key, val| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) }

  @args = DEFAULT_ARGS.merge(args)
  parse_and_set_args

  if block_given?
    begin
      yield self
    ensure
      flush
      join if threadded?
    end
  end
end

Instance Method Details

#destroy!(model) ⇒ Object



38
39
40
# File 'lib/active-record-transactioner.rb', line 38

def destroy!(model)
  queue(model, type: :destroy!)
end

#flushObject

Flushes the specified method on all the queued models in a thread for each type of model.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/active-record-transactioner.rb', line 61

def flush
  wait_for_threads if threadded?

  @lock.synchronize do
    @models.each do |klass, models|
      next if models.empty?

      @models[klass] = []
      @count -= models.length

      if threadded?
        work_threadded(klass, models)
      else
        work_models_through_transaction(klass, models)
      end
    end
  end
end

#joinObject

Waits for any remaining running threads.



81
82
83
84
85
86
87
88
# File 'lib/active-record-transactioner.rb', line 81

def join
  threads_to_join = @lock_threads.synchronize { @threads.clone }

  debug "Threads to join: #{threads_to_join}" if @debug
  threads_to_join.each do |thread|
    thread.join
  end
end

#queue(model, args = {}) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/active-record-transactioner.rb', line 43

def queue(model, args = {})
  args[:type] ||= :save!

  @lock.synchronize do
    klass = model.class

    validate = args.key?(:validate) ? args[:validate] : true

    @lock_models[klass] ||= Monitor.new
    @models[klass] ||= []
    @models[klass] << {model: model, type: args[:type], validate: validate}
    @count += 1
  end

  flush if should_flush?
end

#save!(model) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.

Raises:

  • (ActiveRecord::RecordInvalid)


33
34
35
36
# File 'lib/active-record-transactioner.rb', line 33

def save!(model)
  raise ActiveRecord::RecordInvalid, model unless model.valid?
  queue(model, type: :save!, validate: false)
end

#threadded?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/active-record-transactioner.rb', line 90

def threadded?
  @args[:threadded]
end