Class: ActiveRecordTransactioner
- Inherits:
-
Object
- Object
- ActiveRecordTransactioner
- Defined in:
- lib/active-record-transactioner.rb
Constant Summary collapse
- DEFAULT_ARGS =
{ call_args: [], call_method: :save!, transaction_method: :transaction, transaction_size: 1000, threadded: false, max_running_threads: 2, debug: false }
- ALLOWED_ARGS =
DEFAULT_ARGS.keys
Instance Method Summary collapse
- #destroy!(model) ⇒ Object
-
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
-
#initialize(args = {}) ⇒ ActiveRecordTransactioner
constructor
A new instance of ActiveRecordTransactioner.
-
#join ⇒ Object
Waits for any remaining running threads.
-
#queue(model, args = {}) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
-
#save!(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
- #threadded? ⇒ Boolean
Constructor Details
#initialize(args = {}) ⇒ ActiveRecordTransactioner
Returns a new instance of ActiveRecordTransactioner.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/active-record-transactioner.rb', line 16 def initialize(args = {}) args.each { |key, val| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) } @args = DEFAULT_ARGS.merge(args) parse_and_set_args if block_given? begin yield self ensure flush join if threadded? end end end |
Instance Method Details
#destroy!(model) ⇒ Object
38 39 40 |
# File 'lib/active-record-transactioner.rb', line 38 def destroy!(model) queue(model, type: :destroy!) end |
#flush ⇒ Object
Flushes the specified method on all the queued models in a thread for each type of model.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/active-record-transactioner.rb', line 61 def flush wait_for_threads if threadded? @lock.synchronize do @models.each do |klass, models| next if models.empty? @models[klass] = [] @count -= models.length if threadded? work_threadded(klass, models) else work_models_through_transaction(klass, models) end end end end |
#join ⇒ Object
Waits for any remaining running threads.
81 82 83 84 85 86 87 88 |
# File 'lib/active-record-transactioner.rb', line 81 def join threads_to_join = @lock_threads.synchronize { @threads.clone } debug "Threads to join: #{threads_to_join}" if @debug threads_to_join.each do |thread| thread.join end end |
#queue(model, args = {}) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/active-record-transactioner.rb', line 43 def queue(model, args = {}) args[:type] ||= :save! @lock.synchronize do klass = model.class validate = args.key?(:validate) ? args[:validate] : true @lock_models[klass] ||= Monitor.new @models[klass] ||= [] @models[klass] << {model: model, type: args[:type], validate: validate} @count += 1 end flush if should_flush? end |
#save!(model) ⇒ Object
Adds another model to the queue and calls ‘flush’ if it is over the limit.
33 34 35 36 |
# File 'lib/active-record-transactioner.rb', line 33 def save!(model) raise ActiveRecord::RecordInvalid, model unless model.valid? queue(model, type: :save!, validate: false) end |
#threadded? ⇒ Boolean
90 91 92 |
# File 'lib/active-record-transactioner.rb', line 90 def threadded? @args[:threadded] end |