Class: ActiveEvent::EventServer

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/active_event/event_server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject



70
71
72
# File 'lib/active_event/event_server.rb', line 70

def options
  @options
end

Class Method Details

.publish(event) ⇒ Object



7
8
9
10
11
12
# File 'lib/active_event/event_server.rb', line 7

def self.publish(event)
  type = event.class.name
  body = event.to_json
  instance.event_exchange.publish body, type: type, headers: event.store_infos
  LOGGER.debug "Published #{type} with #{body}"
end

.start(options) ⇒ Object



14
15
16
17
# File 'lib/active_event/event_server.rb', line 14

def self.start(options)
  instance.options = options
  instance.start
end

Instance Method Details

#event_channelObject



54
55
56
# File 'lib/active_event/event_server.rb', line 54

def event_channel
  @event_channel ||= event_connection.create_channel
end

#event_connectionObject



50
51
52
# File 'lib/active_event/event_server.rb', line 50

def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end

#event_exchangeObject



58
59
60
# File 'lib/active_event/event_server.rb', line 58

def event_exchange
  @event_exchange ||= event_channel.fanout options[:event_exchange]
end

#listen_for_resend_requestsObject



39
40
41
42
43
# File 'lib/active_event/event_server.rb', line 39

def listen_for_resend_requests
  resend_request_queue.subscribe do |delivery_info, properties, id|
    resend_request_received id
  end
end

#resend_events_after(id) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/active_event/event_server.rb', line 28

def resend_events_after(id)
  if @replay_server_thread.nil? || !@replay_server_thread.alive?
    @replay_server_thread = Thread.new do
      Thread.current.priority = -10
      ReplayServer.start options, id
    end
  else
    ReplayServer.update id
  end
end

#resend_request_exchangeObject



62
63
64
# File 'lib/active_event/event_server.rb', line 62

def resend_request_exchange
  @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}"
end

#resend_request_queueObject



66
67
68
# File 'lib/active_event/event_server.rb', line 66

def resend_request_queue
  @resend_request_queue ||= event_channel.queue('', auto_delete: true).bind(resend_request_exchange, routing_key: 'resend_request')
end

#resend_request_received(id) ⇒ Object



45
46
47
48
# File 'lib/active_event/event_server.rb', line 45

def resend_request_received (id)
  LOGGER.debug "received resend request with id #{id}"
  resend_events_after id.to_i
end

#startObject



19
20
21
22
23
24
25
26
# File 'lib/active_event/event_server.rb', line 19

def start
  event_connection.start
  listen_for_resend_requests
rescue Exception => e
  LOGGER.error e.message
  LOGGER.error e.backtrace.join("\n")
  raise e
end