Class: Promiscuous::Publisher::Operation::Transaction

Inherits:
Base
  • Object
show all
Defined in:
lib/promiscuous/publisher/operation/transaction.rb

Instance Attribute Summary collapse

Attributes inherited from Base

#operation

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

_acquire_lock, #acquire_op_lock, #dependencies_for, #ensure_op_still_locked, #execute, #explain_operation, #generate_payload, #get_new_op_lock, #increment_dependencies, lock_options, #on_rabbitmq_confirm, #payload_for, #publish_payload_in_rabbitmq_async, #publish_payload_in_redis, rabbitmq_staging_set_key, #record_timestamp, #recover_db_operation, recover_locks, recover_operation_from_lock, recover_payloads_for_rabbitmq, #recovering?, register_recovery_mechanism, #release_op_lock, run_recovery_mechanisms, #trace_operation, #write_dependencies

Constructor Details

#initialize(options = {}) ⇒ Transaction

Returns a new instance of Transaction.



4
5
6
7
8
9
10
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 4

def initialize(options={})
  super
  @operation = :commit
  @transaction_operations = options[:transaction_operations].to_a
  @transaction_id = options[:transaction_id]
  @operation_payloads = options[:operation_payloads]
end

Instance Attribute Details

#operation_payloadsObject Also known as: cache_operation_payloads

Returns the value of attribute operation_payloads.



2
3
4
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 2

def operation_payloads
  @operation_payloads
end

#transaction_idObject

Returns the value of attribute transaction_id.



2
3
4
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 2

def transaction_id
  @transaction_id
end

#transaction_operationsObject

Returns the value of attribute transaction_operations.



2
3
4
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 2

def transaction_operations
  @transaction_operations
end

Class Method Details

.recover_operation(transaction_id, operation_payloads) ⇒ Object



80
81
82
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 80

def self.recover_operation(transaction_id, operation_payloads)
  new(:transaction_id => transaction_id, :operation_payloads => operation_payloads)
end

Instance Method Details

#dependency_for_op_lockObject



12
13
14
15
16
17
18
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 12

def dependency_for_op_lock
  # We don't take locks on rows as the database already have the locks on them
  # until the transaction is committed.
  # A lock on the transaction ID is taken so we know when we conflict with
  # the recovery mechanism.
  Promiscuous::Dependency.new("__transactions__", self.transaction_id, :dont_hash => true)
end

#execute_instrumented(query) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 41

def execute_instrumented(query)
  unless self.recovering?
    acquire_op_lock

    # As opposed to atomic operations, we know the values of the instances
    # before the database operation, and not after, so only one stage
    # of recovery is used.
    cache_operation_payloads

    query.call_and_remember_result(:prepare)
  end

  self.increment_dependencies

  query.call_and_remember_result(:instrumented)

  # We can't do anything if the prepared commit doesn't go through.
  # Either it's a network failure, or the database is having some real
  # difficulties. The recovery mechanism will have to retry the transaction.
  return if query.failed?

  # We take a timestamp right after the write is performed because latency
  # measurements are performed on the subscriber.
  record_timestamp

  ensure_op_still_locked

  generate_payload

  publish_payload_in_redis
  release_op_lock
  publish_payload_in_rabbitmq_async
end

#pending_writesObject



20
21
22
23
24
25
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 20

def pending_writes
  # TODO (performance) Return a list of writes that:
  # - Never touch the same id (the latest write is sufficient)
  # - create/update and then delete should be invisible
  @transaction_operations
end

#query_dependenciesObject



27
28
29
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 27

def query_dependencies
  @query_dependencies ||= pending_writes.map(&:query_dependencies).flatten
end

#recovery_payloadObject



75
76
77
78
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 75

def recovery_payload
  # TODO just save the table/ids, or publish the real payload directly.
  [@transaction_id, @operation_payloads]
end

#should_instrument_query?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/promiscuous/publisher/operation/transaction.rb', line 37

def should_instrument_query?
  super && !pending_writes.empty?
end