Class: RocketJob::Event
- Inherits:
-
Object
- Object
- RocketJob::Event
- Includes:
- Mongoid::Timestamps, Plugins::Document, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/event.rb
Overview
RocketJob::Event
Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.
Constant Summary collapse
- ALL_EVENTS =
"*".freeze
Class Method Summary collapse
- .add_subscriber(subscriber) ⇒ Object
- .collection_exists? ⇒ Boolean
-
.convert_to_capped_collection(size) ⇒ Object
Convert a non-capped collection to capped.
-
.create_capped_collection(size: capped_collection_size) ⇒ Object
Create the capped collection only if it does not exist.
-
.listener(time: @load_time) ⇒ Object
Indefinitely tail the capped collection looking for new events.
-
.process_event(event) ⇒ Object
Process a new event, calling registered subscribers.
-
.subscribe(subscriber) ⇒ Object
Add a subscriber for its events.
- .tail_capped_collection(time) ⇒ Object
-
.unsubscribe(handle) ⇒ Object
Unsubscribes a previous subscription.
Class Method Details
.add_subscriber(subscriber) ⇒ Object
110 111 112 113 114 |
# File 'lib/rocket_job/event.rb', line 110 def self.add_subscriber(subscriber) name = subscriber.class.event_name @subscribers[name] = @subscribers[name] << subscriber subscriber.object_id end |
.collection_exists? ⇒ Boolean
152 153 154 |
# File 'lib/rocket_job/event.rb', line 152 def self.collection_exists? collection.database.collection_names.include?(collection_name.to_s) end |
.convert_to_capped_collection(size) ⇒ Object
Convert a non-capped collection to capped
157 158 159 |
# File 'lib/rocket_job/event.rb', line 157 def self.convert_to_capped_collection(size) collection.database.command("convertToCapped" => collection_name.to_s, "size" => size) end |
.create_capped_collection(size: capped_collection_size) ⇒ Object
Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.
99 100 101 102 103 104 105 |
# File 'lib/rocket_job/event.rb', line 99 def self.create_capped_collection(size: capped_collection_size) if collection_exists? convert_to_capped_collection(size) unless collection.capped? else collection.client[collection_name, {capped: true, size: size}].create end end |
.listener(time: @load_time) ⇒ Object
Indefinitely tail the capped collection looking for new events.
time: the start time from which to start looking for new events.
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rocket_job/event.rb', line 86 def self.listener(time: @load_time) Thread.current.name = "rocketjob event" create_capped_collection logger.info("Event listener started") tail_capped_collection(time) { |event| process_event(event) } rescue Exception => e logger.error("#listener Event listener is terminating due to unhandled exception", e) raise(e) end |
.process_event(event) ⇒ Object
Process a new event, calling registered subscribers.
138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/rocket_job/event.rb', line 138 def self.process_event(event) logger.info("Event Received", event.attributes) if @subscribers.key?(event.name) @subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) } end if @subscribers.key?(ALL_EVENTS) @subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) } end rescue StandardError => e logger.error("Unknown subscriber. Continuing..", e) end |
.subscribe(subscriber) ⇒ Object
Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription
Example: def MySubscriber
include RocketJob::Subscriber
def hello
logger.info "Hello Action Received"
end
def show(message:)
logger.info "Received: #{}"
end
end
MySubscriber.subscribe
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rocket_job/event.rb', line 66 def self.subscribe(subscriber) if block_given? begin handle = add_subscriber(subscriber) yield(subscriber) ensure unsubscribe(handle) if handle end else add_subscriber(subscriber) end end |
.tail_capped_collection(time) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rocket_job/event.rb', line 116 def self.tail_capped_collection(time) with(socket_timeout: long_poll_seconds + 10) do filter = {created_at: {"$gt" => time}} collection. find(filter). await_data. cursor_type(:tailable_await). max_await_time_ms(long_poll_seconds * 1000). sort("$natural" => 1). each do |doc| event = Mongoid::Factory.from_db(Event, doc) # Recovery will occur from after the last message read time = event.created_at yield(event) end end rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.}") retry end |
.unsubscribe(handle) ⇒ Object
Unsubscribes a previous subscription
80 81 82 |
# File 'lib/rocket_job/event.rb', line 80 def self.unsubscribe(handle) @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } } end |