Class: Promiscuous::Subscriber::UnitOfWork

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/subscriber/unit_of_work.rb

Constant Summary collapse

LOCK_OPTIONS =

after 1.5 minute, we give up

{ :timeout => 1.5.minute, # after 1.5 minute, we give up
:sleep   => 0.1,        # polling every 100ms.
:expire  => 1.minute }

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(message) ⇒ UnitOfWork

Returns a new instance of UnitOfWork.



5
6
7
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 5

def initialize(message)
  self.message = message
end

Instance Attribute Details

#messageObject

Returns the value of attribute message.



2
3
4
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 2

def message
  @message
end

Class Method Details

.currentObject



30
31
32
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 30

def self.current
  Thread.current[:promiscuous_message_processor]
end

.current=(value) ⇒ Object



34
35
36
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 34

def self.current=(value)
  Thread.current[:promiscuous_message_processor] = value
end

.process(*args) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 19

def self.process(*args)
  raise "Same thread is processing a message?" if self.current

  begin
    self.current = new(*args)
    self.current.process_message
  ensure
    self.current = nil
  end
end

Instance Method Details

#appObject



9
10
11
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 9

def app
  message.parsed_payload['app']
end

#execute_operation(operation) ⇒ Object

XXX Used for hooking into e.g. by promiscuous-newrelic



80
81
82
83
84
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 80

def execute_operation(operation)
  with_instance_locked_for(operation) do
    operation.execute
  end
end

#on_messageObject



86
87
88
89
90
91
92
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 86

def on_message
  # XXX This needs to be done for each operation
  with_transaction do
    self.operations.each { |op| execute_operation(op) if op.model }
  end
  message.ack
end

#operationsObject



13
14
15
16
17
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 13

def operations
  message.parsed_payload['operations'].
    each_with_index.
    map { |op, i| Promiscuous::Subscriber::Operation.new(op.merge('dependency' => message.dependencies[i])) }
end

#process_messageObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 38

def process_message
  begin
    on_message
  rescue Exception => e
    @fail_count ||= 0;  @fail_count += 1

    if @fail_count <= Promiscuous::Config.max_retries
      Promiscuous.warn("[receive] #{e.message} #{@fail_count.ordinalize} retry: #{@message}")
      sleep @fail_count ** 2
      process_message
    else
      raise e
    end
  end
end

#with_instance_locked_for(operation, &block) ⇒ Object

after one minute, we are considered dead



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/promiscuous/subscriber/unit_of_work.rb', line 58

def with_instance_locked_for(operation, &block)
  return yield unless operation.dependency

  lock_options = LOCK_OPTIONS.merge(:node => operation.dependency.redis_node)
  mutex = Promiscuous::Redis::Mutex.new(operation.dependency.key(:sub).to_s, lock_options)

  unless mutex.lock
    raise Promiscuous::Error::LockUnavailable.new(mutex.key)
  end

  begin
    yield
  ensure
    unless mutex.unlock
      # TODO Be safe in case we have a duplicate message and lost the lock on it
      raise "The subscriber lost the lock during its operation. It means that someone else\n"+
        "received a duplicate message, and we got screwed.\n"
    end
  end
end