Class: ActiveEvent::ReplayServer

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/active_event/replay_server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject



92
93
94
# File 'lib/active_event/replay_server.rb', line 92

def options
  @options
end

Class Method Details

.start(options, id) ⇒ Object



8
9
10
11
# File 'lib/active_event/replay_server.rb', line 8

def self.start(options, id)
  instance.options = options
  instance.start id
end

.update(id) ⇒ Object



13
14
15
# File 'lib/active_event/replay_server.rb', line 13

def self.update(id)
  instance.queue << id
end

Instance Method Details

#event_channelObject



84
85
86
# File 'lib/active_event/replay_server.rb', line 84

def event_channel
  @event_channel ||= event_connection.create_channel
end

#event_connectionObject



80
81
82
# File 'lib/active_event/replay_server.rb', line 80

def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end

#has_next_event?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/active_event/replay_server.rb', line 76

def has_next_event?
  @events.length > 0
end

#new_id?Boolean

Returns:

  • (Boolean)


52
53
54
55
56
57
58
59
60
61
# File 'lib/active_event/replay_server.rb', line 52

def new_id?
  unless queue.empty?
    new_id = queue.pop
    if new_id < @last_id
      @last_id = new_id
      return true
    end
  end
  false
end

#next_eventObject



70
71
72
73
74
# File 'lib/active_event/replay_server.rb', line 70

def next_event
  e = @events.shift
  @last_id = e.id
  e
end

#queueObject



28
29
30
# File 'lib/active_event/replay_server.rb', line 28

def queue
  @queue ||= Queue.new
end

#republish(event) ⇒ Object



63
64
65
66
67
68
# File 'lib/active_event/replay_server.rb', line 63

def republish(event)
  type = event.event
  body = event.data.to_json
  resend_exchange.publish body, {type: type, headers: {id: event.id, created_at: event.created_at, replayed: true}}
  LOGGER.debug "Republished #{type} with #{body}"
end

#republish_eventsObject



44
45
46
47
48
49
50
# File 'lib/active_event/replay_server.rb', line 44

def republish_events
  while has_next_event?
    return if new_id?
    republish next_event
    Thread.pass
  end
end

#resend_exchangeObject



88
89
90
# File 'lib/active_event/replay_server.rb', line 88

def resend_exchange
  @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}"
end

#send_done_messageObject



32
33
34
# File 'lib/active_event/replay_server.rb', line 32

def send_done_message
  resend_exchange.publish 'replay_done'
end

#start(id) ⇒ Object



17
18
19
20
21
22
23
24
25
26
# File 'lib/active_event/replay_server.rb', line 17

def start(id)
  event_connection.start
  @last_id = id
  start_republishing
  send_done_message
rescue Exception => e
  LOGGER.error e.message
  LOGGER.error e.backtrace.join("\n")
  raise e
end

#start_republishingObject



36
37
38
39
40
41
42
# File 'lib/active_event/replay_server.rb', line 36

def start_republishing
  loop do
    @events = EventRepository.after_id(@last_id).to_a
    return if @events.empty?
    republish_events
  end
end