Class: Promiscuous::Publisher::Operation::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/publisher/operation/base.rb

Direct Known Subclasses

Atomic, NonPersistent, Transaction

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Base

Returns a new instance of Base.



15
16
17
# File 'lib/promiscuous/publisher/operation/base.rb', line 15

def initialize(options={})
  @operation = options[:operation]
end

Instance Attribute Details

#operationObject

Returns the value of attribute operation.



13
14
15
# File 'lib/promiscuous/publisher/operation/base.rb', line 13

def operation
  @operation
end

Class Method Details

._acquire_lock(mutex) ⇒ Object



308
309
310
311
312
313
314
315
316
317
# File 'lib/promiscuous/publisher/operation/base.rb', line 308

def self._acquire_lock(mutex)
  loop do
    case mutex.lock
    # recover_operation_from_lock implicitely unlocks the lock.
    when :recovered then recover_operation_from_lock(mutex)
    when true       then return true
    when false      then return false
    end
  end
end

.lock_optionsObject



289
290
291
292
293
294
295
296
# File 'lib/promiscuous/publisher/operation/base.rb', line 289

def self.lock_options
  {
    :timeout  => 10.seconds,   # after 10 seconds, we give up so we don't queue requests
    :sleep    => 0.01.seconds, # polling every 10ms.
    :expire   => 1.minute,     # after one minute, we are considered dead
    :lock_set => Promiscuous::Key.new(:pub).join('lock_set').to_s
  }
end

.rabbitmq_staging_set_keyObject



30
31
32
# File 'lib/promiscuous/publisher/operation/base.rb', line 30

def self.rabbitmq_staging_set_key
  Promiscuous::Key.new(:pub).join('rabbitmq_staging').to_s
end

.recover_locksObject



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/promiscuous/publisher/operation/base.rb', line 339

def self.recover_locks
  return unless Promiscuous::Redis.master
  # This method is regularly called from a worker to recover locks by doing a
  # locking/unlocking cycle.

  Promiscuous::Redis.master.nodes.each do |node|
    loop do
      key, time = node.zrange(lock_options[:lock_set], 0, 1, :with_scores => true).flatten
      break unless key && Time.now.to_i >= time.to_i + lock_options[:expire]

      mutex = Promiscuous::Redis::Mutex.new(key, lock_options.merge(:node => node))
      mutex.unlock if _acquire_lock(mutex)
    end
  end
end

.recover_operation(*recovery_payload) ⇒ Object



407
408
409
# File 'lib/promiscuous/publisher/operation/base.rb', line 407

def self.recover_operation(*recovery_payload)
  # Overridden to reconstruct the operation.
end

.recover_operation_from_lock(lock) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/promiscuous/publisher/operation/base.rb', line 149

def self.recover_operation_from_lock(lock)
  # We happen to have acquired a never released lock.
  # The database instance is thus still pristine.

  master_node = lock.node
  recovery_data = master_node.get("#{lock.key}:operation_recovery")

  unless recovery_data.present?
    lock.unlock
    return
  end

  Promiscuous.info "[operation recovery] #{lock.key} -> #{recovery_data}"

  op_klass, operation, write_dependencies, recovery_arguments = *MultiJson.load(recovery_data)

  operation = operation.to_sym
  write_dependencies.map! { |k| Promiscuous::Dependency.parse(k.to_s, :type => :write) }

  begin
    op = op_klass.constantize.recover_operation(*recovery_arguments)
  rescue NameError
    raise "invalid recover operation class: #{op_klass}"
  end

  Thread.new do
    # We run the recovery in another thread to ensure that we get a new
    # database connection to avoid tampering with the current state of the
    # connection, which can be in an open transaction.
    # Thankfully, we are not in a fast path.
    # Note that any exceptions will be passed through the thread join() method.
    op.instance_eval do
      @operation = operation
      @write_dependencies = write_dependencies
      @op_lock = lock
      @recovery_data = recovery_data

      query = Promiscuous::Publisher::Operation::ProxyForQuery.new(self) { recover_db_operation }
      self.execute_instrumented(query)
      query.result
    end
  end.join

rescue Exception => e
  message = "cannot recover #{lock.key}, failed to fetch recovery data"
  message = "cannot recover #{lock.key}, recovery data: #{recovery_data}" if recovery_data
  raise Promiscuous::Error::Recovery.new(message, e)
end

.recover_payloads_for_rabbitmqObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/promiscuous/publisher/operation/base.rb', line 52

def self.recover_payloads_for_rabbitmq
  return unless Promiscuous::Redis.master
  # This method is regularly called from a worker to resend payloads that
  # never got their confirm. We get the oldest queued message, and test if
  # it's old enough to for a republish (default 10 seconds).
  # Any sort of race is okay since we would just republish, and that's okay.

  Promiscuous::Redis.master.nodes.each do |node|
    loop do
      key, time = node.zrange(rabbitmq_staging_set_key, 0, 1, :with_scores => true).flatten
      break unless key && Time.now.to_i >= time.to_i + Promiscuous::Config.recovery_timeout

      # Refresh the score so we skip it next time we look for something to recover.
      node.zadd(rabbitmq_staging_set_key, Time.now.to_i, key)
      payload = node.get(key)

      # It's possible that the payload is nil as the message could be
      # recovered by another worker
      if payload
        Promiscuous.info "[payload recovery] #{payload}"
        new.instance_eval do
          @payload_recovery_node = node
          @payload_recovery_key = key
          @payload = payload
          @recovery = true
          publish_payload_in_rabbitmq_async
        end
      end
    end
  end
end

.register_recovery_mechanism(method_name = nil, &block) ⇒ Object



5
6
7
# File 'lib/promiscuous/publisher/operation/base.rb', line 5

def self.register_recovery_mechanism(method_name=nil, &block)
  self.recovery_mechanisms << (block || method(method_name))
end

.run_recovery_mechanismsObject



9
10
11
# File 'lib/promiscuous/publisher/operation/base.rb', line 9

def self.run_recovery_mechanisms
  self.recovery_mechanisms.each(&:call)
end

Instance Method Details

#acquire_op_lockObject



319
320
321
322
323
324
325
# File 'lib/promiscuous/publisher/operation/base.rb', line 319

def acquire_op_lock
  @op_lock = get_new_op_lock

  unless self.class._acquire_lock(@op_lock)
    raise Promiscuous::Error::LockUnavailable.new(@op_lock.key)
  end
end

#dependencies_for(instance, options = {}) ⇒ Object



356
357
358
359
360
361
362
363
# File 'lib/promiscuous/publisher/operation/base.rb', line 356

def dependencies_for(instance, options={})
  return [] if instance.nil?

  # Note that tracked_dependencies will not return the id dependency if it
  # doesn't exist which can only happen for create operations and auto
  # generated ids.
  [instance.promiscuous.get_dependency]
end

#dependency_for_op_lockObject



299
300
301
# File 'lib/promiscuous/publisher/operation/base.rb', line 299

def dependency_for_op_lock
  query_dependencies.first
end

#ensure_op_still_lockedObject



332
333
334
335
336
337
# File 'lib/promiscuous/publisher/operation/base.rb', line 332

def ensure_op_still_locked
  unless @op_lock.still_locked?
    # We lost the lock, let the recovery mechanism do its thing.
    raise Promiscuous::Error::LostLock.new(@op_lock.key)
  end
end

#execute(&query_config) ⇒ Object



373
374
375
376
377
378
379
380
381
382
383
# File 'lib/promiscuous/publisher/operation/base.rb', line 373

def execute(&query_config)
  query = Promiscuous::Publisher::Operation::ProxyForQuery.new(self, &query_config)

  if should_instrument_query?
    execute_instrumented(query)
  else
    query.call_and_remember_result(:non_instrumented)
  end

  query.result
end

#execute_instrumented(db_operation) ⇒ Object



392
393
394
395
# File 'lib/promiscuous/publisher/operation/base.rb', line 392

def execute_instrumented(db_operation)
  # Implemented by subclasses
  raise
end

#explain_operation(max_width) ⇒ Object



423
424
425
# File 'lib/promiscuous/publisher/operation/base.rb', line 423

def explain_operation(max_width)
  "Unknown database operation"
end

#generate_payloadObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/promiscuous/publisher/operation/base.rb', line 134

def generate_payload
  payload = {}
  payload[:operations] = operation_payloads
  payload[:app] = Promiscuous::Config.app
  payload[:current_user_id] = Promiscuous.context.current_user.id if Promiscuous.context.current_user
  payload[:timestamp] = @timestamp
  payload[:generation] = Promiscuous::Config.generation
  payload[:host] = Socket.gethostname
  payload[:recovered_operation] = true if recovering?
  payload[:dependencies] = {}
  payload[:dependencies][:write] = @committed_write_deps

  @payload = MultiJson.dump(payload)
end

#get_new_op_lockObject



303
304
305
306
# File 'lib/promiscuous/publisher/operation/base.rb', line 303

def get_new_op_lock
  dep = dependency_for_op_lock
  Promiscuous::Redis::Mutex.new(dep.key(:pub).to_s, lock_options.merge(:node => dep.redis_node))
end

#increment_dependenciesObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/promiscuous/publisher/operation/base.rb', line 198

def increment_dependencies
  # We collapse all operations, ignoring the read/write interleaving.
  # It doesn't matter since all write operations are serialized, so the first
  # write in the transaction can have all the read dependencies.
  w = write_dependencies

  master_node = @op_lock.node
  operation_recovery_key = "#{@op_lock.key}:operation_recovery"

  # We group all the dependencies by their respective shards
  # The master node will have the responsibility to hold the recovery data.
  # We do the master node first. The secondaries can be done in parallel.
  @committed_write_deps = []

  # We need to do the increments always in the same node order, otherwise.
  # the subscriber can deadlock. But we must always put the recovery payload
  # on the master before touching anything.
  nodes_deps = w.group_by(&:redis_node)
                .sort_by { |node, deps| -Promiscuous::Redis.master.nodes.index(node) }
  if nodes_deps.first[0] != master_node
    nodes_deps = [[master_node, []]] + nodes_deps
  end

  nodes_deps.each do |node, deps|
    argv = []
    argv << Promiscuous::Key.new(:pub) # key prefixes
    argv << operation_recovery_key

    # Each shard have their own recovery payload. The master recovery node
    # has the full operation recovery, and the others just have their versions.
    # Note that the operation_recovery_key on the secondaries have the current
    # version of the instance appended to them. It's easier to cleanup when
    # locks get lost.
    if node == master_node && !self.recovering?
      # We are on the master node, which holds the recovery payload
      argv << MultiJson.dump([self.class.name, operation, w, self.recovery_payload])
    end

    # FIXME If the lock is lost, we need to backoff

    # We are going to store all the versions in redis, to be able to recover.
    # We store all our increments in a transaction_id key in JSON format.
    # Note that the transaction_id is the id of the current instance.
    @@increment_script ||= Promiscuous::Redis::Script.new <<-SCRIPT
      local prefix = ARGV[1] .. ':'
      local operation_recovery_key = ARGV[2]
      local versions_recovery_key = operation_recovery_key .. ':versions'
      local operation_recovery_payload = ARGV[3]
      local deps = KEYS

      local versions = {}

      if redis.call('exists', versions_recovery_key) == 1 then
        for i, dep in ipairs(deps) do
          versions[i] = tonumber(redis.call('hget', versions_recovery_key, dep))
          if not versions[i] then
            return redis.error_reply('Failed to read dependency ' .. dep .. ' during recovery')
          end
        end

        return { versions }
      end

      for i, dep in ipairs(deps) do
        local key = prefix .. dep
        versions[i] = redis.call('incr', key .. ':w')
        redis.call('hset', versions_recovery_key, dep, versions[i])
      end

      if operation_recovery_payload then
        redis.call('set', operation_recovery_key, operation_recovery_payload)
      end

      return { versions }
    SCRIPT

    versions = @@increment_script.eval(node, :argv => argv, :keys => deps)

    deps.zip(versions).each  { |dep, version| dep.version = version }

    @committed_write_deps += deps
  end

  # The instance version must to be the first in the list to allow atomic
  # subscribers to do their magic.
  # TODO What happens with transactions with multiple operations?
  instance_dep_index = @committed_write_deps.index(write_dependencies.first)
  @committed_write_deps[0], @committed_write_deps[instance_dep_index] =
    @committed_write_deps[instance_dep_index], @committed_write_deps[0]
end

#on_rabbitmq_confirmObject



36
37
38
39
40
41
42
43
44
45
# File 'lib/promiscuous/publisher/operation/base.rb', line 36

def on_rabbitmq_confirm
  # These requests could be parallelized, rabbitmq persisted the operation.
  # XXX TODO
  # Promiscuous::Redis.slave.del(@payload_recovery_key) if Promiscuous::Redis.slave

  @payload_recovery_node.multi do
    @payload_recovery_node.del(@payload_recovery_key)
    @payload_recovery_node.zrem(rabbitmq_staging_set_key, @payload_recovery_key)
  end
end

#operation_payloadsObject



397
398
399
400
# File 'lib/promiscuous/publisher/operation/base.rb', line 397

def operation_payloads
  # subclass can use payloads_for to generate the payload
  raise
end

#payload_for(instance) ⇒ Object



127
128
129
130
131
132
# File 'lib/promiscuous/publisher/operation/base.rb', line 127

def payload_for(instance)
  options = { :with_attributes => self.operation.in?([:create, :update]) }
  instance.promiscuous.payload(options).tap do |payload|
    payload[:operation] = self.operation
  end
end

#publish_payload_in_rabbitmq_asyncObject



47
48
49
50
# File 'lib/promiscuous/publisher/operation/base.rb', line 47

def publish_payload_in_rabbitmq_async
  Promiscuous::AMQP.publish(:key => Promiscuous::Config.app, :payload => @payload,
                            :on_confirm => method(:on_rabbitmq_confirm))
end

#publish_payload_in_redisObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/promiscuous/publisher/operation/base.rb', line 85

def publish_payload_in_redis
  # TODO Optimize and DRY this up
  w = @committed_write_deps

  # We identify a payload with a unique key (id:id_value:current_version:payload_recovery)
  # to avoid collisions with other updates on the same document.
  master_node = @op_lock.node
  @payload_recovery_node = master_node
  @payload_recovery_key = Promiscuous::Key.new(:pub).join('payload_recovery', @op_lock.token).to_s

  # We need to be able to recover from a redis failure. By sending the
  # payload to the slave first, we ensure that we can replay the lost
  # payloads if the master came to fail.
  # We still need to recover the lost operations. This can be done by doing a
  # version diff from what is stored in the database and the recovered redis slave.
  # XXX TODO
  # Promiscuous::Redis.slave.set(@payload_recovery_key, @payload) if Promiscuous::Redis.slave

  # We don't care if we get raced by someone recovering our operation. It can
  # happen if we lost the lock without knowing about it.
  # The payload can be sent twice, which is okay since the subscribers
  # tolerate it.
  operation_recovery_key = "#{@op_lock.key}:operation_recovery"
  versions_recovery_key = "#{operation_recovery_key}:versions"

  master_node.multi do
    master_node.set(@payload_recovery_key, @payload)
    master_node.zadd(rabbitmq_staging_set_key, Time.now.to_i, @payload_recovery_key)
    master_node.del(operation_recovery_key)
    master_node.del(versions_recovery_key)
  end

  # The payload is safe now. We can cleanup all the versions on the
  # secondary. There are no harmful races that can happen since the
  # secondary_operation_recovery_key is unique to the operation.
  # XXX The caveat is that if we die here, the
  # secondary_operation_recovery_key will never be cleaned up.
  w.map(&:redis_node).uniq
    .reject { |node| node == master_node }
    .each   { |node| node.del(versions_recovery_key) }
end

#query_dependenciesObject



385
386
387
388
389
390
# File 'lib/promiscuous/publisher/operation/base.rb', line 385

def query_dependencies
  # Returns the list of dependencies that are involved in the database query.
  # For an atomic write operation, the first one returned must be the one
  # corresponding to the primary key.
  raise
end

#record_timestampObject



23
24
25
26
27
28
# File 'lib/promiscuous/publisher/operation/base.rb', line 23

def record_timestamp
  # Records the number of milliseconds since epoch, which we use send sending
  # the payload over. It's good for latency measurements.
  time = Time.now
  @timestamp = time.to_i * 1000 + time.usec / 1000
end

#recover_db_operationObject



411
412
413
414
# File 'lib/promiscuous/publisher/operation/base.rb', line 411

def recover_db_operation
  # Overridden to reexecute the db operation during recovery (or make sure that
  # it will never succeed).
end

#recovering?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/promiscuous/publisher/operation/base.rb', line 19

def recovering?
  !!@recovery_data
end

#recovery_payloadObject



402
403
404
405
# File 'lib/promiscuous/publisher/operation/base.rb', line 402

def recovery_payload
  # Overridden to be able to recover the operation
  []
end

#release_op_lockObject



327
328
329
330
# File 'lib/promiscuous/publisher/operation/base.rb', line 327

def release_op_lock
  @op_lock.unlock
  @op_lock = nil
end

#should_instrument_query?Boolean

Returns:

  • (Boolean)


369
370
371
# File 'lib/promiscuous/publisher/operation/base.rb', line 369

def should_instrument_query?
  !Promiscuous.disabled?
end

#trace_operationObject



416
417
418
419
420
421
# File 'lib/promiscuous/publisher/operation/base.rb', line 416

def trace_operation
  if ENV['TRACE']
    msg = self.explain_operation(70)
    Promiscuous.context.trace(msg, :color => '1;31')
  end
end

#write_dependenciesObject



365
366
367
# File 'lib/promiscuous/publisher/operation/base.rb', line 365

def write_dependencies
  @write_dependencies ||= self.query_dependencies.uniq.each { |d| d.type = :write }
end