Class: ActiveEvent::ReplayServer
- Inherits:
-
Object
- Object
- ActiveEvent::ReplayServer
- Includes:
- Singleton
- Defined in:
- lib/active_event/replay_server.rb
Instance Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
- #event_channel ⇒ Object
- #event_connection ⇒ Object
- #has_next_event? ⇒ Boolean
- #new_id? ⇒ Boolean
- #next_event ⇒ Object
- #queue ⇒ Object
- #republish(event) ⇒ Object
- #republish_events ⇒ Object
- #resend_exchange ⇒ Object
- #send_done_message ⇒ Object
- #start(id) ⇒ Object
- #start_republishing ⇒ Object
Instance Attribute Details
#options ⇒ Object
92 93 94 |
# File 'lib/active_event/replay_server.rb', line 92 def @options end |
Class Method Details
.start(options, id) ⇒ Object
8 9 10 11 |
# File 'lib/active_event/replay_server.rb', line 8 def self.start(, id) instance. = instance.start id end |
.update(id) ⇒ Object
13 14 15 |
# File 'lib/active_event/replay_server.rb', line 13 def self.update(id) instance.queue << id end |
Instance Method Details
#event_channel ⇒ Object
84 85 86 |
# File 'lib/active_event/replay_server.rb', line 84 def event_channel @event_channel ||= event_connection.create_channel end |
#event_connection ⇒ Object
80 81 82 |
# File 'lib/active_event/replay_server.rb', line 80 def event_connection @event_server ||= Bunny.new URI::Generic.build([:event_connection]).to_s end |
#has_next_event? ⇒ Boolean
76 77 78 |
# File 'lib/active_event/replay_server.rb', line 76 def has_next_event? @events.length > 0 end |
#new_id? ⇒ Boolean
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/active_event/replay_server.rb', line 52 def new_id? unless queue.empty? new_id = queue.pop if new_id < @last_id @last_id = new_id return true end end false end |
#next_event ⇒ Object
70 71 72 73 74 |
# File 'lib/active_event/replay_server.rb', line 70 def next_event e = @events.shift @last_id = e.id e end |
#queue ⇒ Object
28 29 30 |
# File 'lib/active_event/replay_server.rb', line 28 def queue @queue ||= Queue.new end |
#republish(event) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/active_event/replay_server.rb', line 63 def republish(event) type = event.event body = event.data.to_json resend_exchange.publish body, {type: type, headers: {id: event.id, created_at: event.created_at, replayed: true}} LOGGER.debug "Republished #{type} with #{body}" end |
#republish_events ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/active_event/replay_server.rb', line 44 def republish_events while has_next_event? return if new_id? republish next_event Thread.pass end end |
#resend_exchange ⇒ Object
88 89 90 |
# File 'lib/active_event/replay_server.rb', line 88 def resend_exchange @resend_exchange ||= event_channel.fanout "resend_#{[:event_exchange]}" end |
#send_done_message ⇒ Object
32 33 34 |
# File 'lib/active_event/replay_server.rb', line 32 def resend_exchange.publish 'replay_done' end |
#start(id) ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/active_event/replay_server.rb', line 17 def start(id) event_connection.start @last_id = id start_republishing rescue Exception => e LOGGER.error e. LOGGER.error e.backtrace.join("\n") raise e end |
#start_republishing ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/active_event/replay_server.rb', line 36 def start_republishing loop do @events = EventRepository.after_id(@last_id).to_a return if @events.empty? republish_events end end |