Class: Batsir::Acceptors::Acceptor

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/batsir/acceptors/acceptor.rb

Direct Known Subclasses

AMQPAcceptor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Acceptor

Returns a new instance of Acceptor.



10
11
12
13
14
15
# File 'lib/batsir/acceptors/acceptor.rb', line 10

def initialize(options = {})
  options.each do |option, value|
    self.send("#{option}=", value)
  end
  @transformer_queue = []
end

Instance Attribute Details

#cancellatorObject

Returns the value of attribute cancellator.



8
9
10
# File 'lib/batsir/acceptors/acceptor.rb', line 8

def cancellator
  @cancellator
end

#stage_nameObject

Returns the value of attribute stage_name.



6
7
8
# File 'lib/batsir/acceptors/acceptor.rb', line 6

def stage_name
  @stage_name
end

#transformer_queueObject

Returns the value of attribute transformer_queue.



7
8
9
# File 'lib/batsir/acceptors/acceptor.rb', line 7

def transformer_queue
  @transformer_queue
end

Instance Method Details

#add_transformer(transformer) ⇒ Object



17
18
19
# File 'lib/batsir/acceptors/acceptor.rb', line 17

def add_transformer(transformer)
  @transformer_queue << transformer
end

#process_message_error(message, error) ⇒ Object

This method is called after an error is thrown. Can be overridden to implement error handling. Returns a message



66
67
68
# File 'lib/batsir/acceptors/acceptor.rb', line 66

def process_message_error(message, error)
  message
end

#startObject

This method is called automatically when the stage is started, it is here that you set up the accepting logic. Make sure that somewhere within this logic the #start_filter_chain(msg) is called to start actual processing.

Note that this method will be invoked asynchronously using the Celluloid actor semantics.

Raises:

  • (NotImplementedError)


31
32
33
# File 'lib/batsir/acceptors/acceptor.rb', line 31

def start
  raise NotImplementedError.new
end

#start_filter_chain(message) ⇒ Object

When a message is accepted by an Acceptor, this method should be invoked with the received payload to start processing of the filter chain.



40
41
42
43
44
# File 'lib/batsir/acceptors/acceptor.rb', line 40

def start_filter_chain(message)
  klazz = Batsir::Registry.get(stage_name)
  message = transform(message)
  klazz.perform_async(message) if klazz
end

#transform(message) ⇒ Object

Call each of the transformers in the transformer_queue on the message, in order



50
51
52
53
54
55
56
57
58
59
# File 'lib/batsir/acceptors/acceptor.rb', line 50

def transform(message)
  transformer_queue.each do |transformer|
    begin
      message = transformer.transform(message)
    rescue Batsir::Errors::TransformError => e
      message = process_message_error(message, e)
    end
  end
  message
end