Class: TinyDS::BaseTx

Inherits:
Object
  • Object
show all
Defined in:
lib/tiny_ds/base_tx.rb

Defined Under Namespace

Classes: TxDone, TxSrc

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#tx_keyObject (readonly)

Returns the value of attribute tx_key.



19
20
21
# File 'lib/tiny_ds/base_tx.rb', line 19

def tx_key
  @tx_key
end

Class Method Details

.exec(src, dest, args = {}) ⇒ Object

トランザクション実行

       :  src_phase   dest_phase
raised :    failed      ----
false  :    OK          failed
true   :    OK          OK


26
27
28
29
30
31
32
33
34
35
# File 'lib/tiny_ds/base_tx.rb', line 26

def self.exec(src, dest, args={})
  tx = new
  tx.create_tx(src, dest, args)
  begin
    tx.roll_forward
  rescue AppEngine::Datastore::TransactionFailed => e
    return false
  end
  return true
end

.pending_tx_query(status = "pending", dire = :asc) ⇒ Object

pending/done/failed



50
51
52
53
54
55
# File 'lib/tiny_ds/base_tx.rb', line 50

def self.pending_tx_query(status="pending", dire=:asc) # pending/done/failed
  TxSrc.query.
    filter(:tx_kind, "==", tx_kind).
    filter(:status,  "==", status).
    sort(:created_at, dire)
end

.roll_forward_all(limit = 50) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/tiny_ds/base_tx.rb', line 37

def self.roll_forward_all(limit=50)
  pending_tx_query.each(:limit=>limit) do |tx_src|
    tx = new
    tx.restore_tx(tx_src)
    begin
      tx.roll_forward
    rescue => e
      #p "roll_forward failed. tx=[#{tx.tx_key}] e=[#{e.inspect}]"
    end
  end
  nil
end

.tx_kindObject



10
11
12
# File 'lib/tiny_ds/base_tx.rb', line 10

def self.tx_kind
  name
end

Instance Method Details

#create_tx(src, dest, args) ⇒ Object

トランザクション前半

TODO TxIDを指定できるようにする。TxSrc#key.nameにTxIDを指定して重複実行防止


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/tiny_ds/base_tx.rb', line 73

def create_tx(src, dest, args)
  tx_src = nil
  TinyDS.tx(tx_retries){
    src = src.class.get(src.key)
    src_phase(src, args)
    src.save!

    attrs = {
      :tx_kind  => self.class.tx_kind,
      :dest_key => dest.key.to_s,
      :args     => args.to_yaml,
    }
    tx_src = TxSrc.create!(attrs, :parent=>src) # srcがparent, tx_srcがchild
    # COMMIT:「srcの処理(=src_phase)、TxSrc作成」
  }
  @tx_key = tx_src.key
  nil
end

#dest_phase(dest, args) ⇒ Object



7
8
9
# File 'lib/tiny_ds/base_tx.rb', line 7

def dest_phase(dest, args)
  raise "no impl."
end

#restore_tx(tx_src) ⇒ Object



92
93
94
95
# File 'lib/tiny_ds/base_tx.rb', line 92

def restore_tx(tx_src)
  @tx_key = tx_src.key
  nil
end

#roll_forwardObject

トランザクション後半



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/tiny_ds/base_tx.rb', line 102

def roll_forward
  tx_src = TxSrc.get(@tx_key)

  TinyDS.tx(tx_retries){
    dest_key = LowDS::KeyFactory.stringToKey(tx_src.dest_key)
    dest = dest_key.kind.constantize.get(dest_key)
    done_name = "TxDone_#{@tx_key.to_s}"
    done_key = LowDS::KeyFactory.createKey(dest.key, TxDone.kind, done_name)
    begin
      TxDone.get!(done_key)
      # なにもしない : TxDoneが存在しているということはdest_phaseは処理済み
    rescue AppEngine::Datastore::EntityNotFound => e
      # TxDoneが無い→dest_phaseが未実行
      attrs = {:done_at=>Time.now}
      tx_done = TxDone.create!(attrs, :parent=>dest, :name=>done_name)
      dest_phase(dest, YAML.load(tx_src.args)) # destの処理を実行
      dest.save!
    end
    # memo: done_keyが同じTxDoneをcommitしようとするとTransactionFailedになるはず→dest_phaseもキャンセル
    # COMMIT:「destの処理(=dest_phase)、TxDone作成」
  }

  # TxSrc#statusをdoneに
  TinyDS.tx(tx_retries){
    tx_src = TxSrc.get!(@tx_key)
    if tx_src.status=="pending"
      tx_src.status = "done"
      tx_src.done_at = Time.now
      tx_src.save!
    end
  }
  return true
rescue => e
  puts e.inspect
  TinyDS.tx(tx_retries){
    tx_src = TxSrc.get!(@tx_key)
    tx_src.roll_forward_failed_count += 1
    if roll_forward_retries_limit < tx_src.roll_forward_failed_count
      tx_src.status = "failed"
    end
    tx_src.save!
  }
  return false
end

#roll_forward_retries_limitObject



13
14
15
# File 'lib/tiny_ds/base_tx.rb', line 13

def roll_forward_retries_limit
  100
end

#src_phase(src, args) ⇒ Object



4
5
6
# File 'lib/tiny_ds/base_tx.rb', line 4

def src_phase(src, args)
  raise "no impl."
end

#tx_retriesObject



16
17
18
# File 'lib/tiny_ds/base_tx.rb', line 16

def tx_retries
  3
end