Class: BackRun::Pubsub
- Inherits:
-
Object
- Object
- BackRun::Pubsub
- Defined in:
- lib/back_run/pubsub.rb
Instance Method Summary collapse
-
#initialize ⇒ Pubsub
constructor
A new instance of Pubsub.
- #kill_job(job) ⇒ Object
- #publish(job, topic = nil) ⇒ Object
-
#subscribe(worker) ⇒ Object
The gem is using Concurrent::CachedThreadPool which overrides the max_threads, so it’s ignoring the stream configuration used in the listen method.
Constructor Details
#initialize ⇒ Pubsub
Returns a new instance of Pubsub.
6 7 8 9 10 11 |
# File 'lib/back_run/pubsub.rb', line 6 def initialize @pubsub = Google::Cloud::Pubsub.new( project_id: Rails.application.config.back_run[:project_id], credentials: Rails.application.config.back_run[:credentials_path] ) end |
Instance Method Details
#kill_job(job) ⇒ Object
36 37 38 39 |
# File 'lib/back_run/pubsub.rb', line 36 def kill_job(job) BackRun.logger.info("Moving to morgue queue #{job.klass}") publish(job, 'morgue') end |
#publish(job, topic = nil) ⇒ Object
13 14 15 16 17 |
# File 'lib/back_run/pubsub.rb', line 13 def publish(job, topic = nil) topic_name = topic || job.queue_name topic = fetch_topic(topic_name) topic.publish(job.to_json) end |
#subscribe(worker) ⇒ Object
The gem is using Concurrent::CachedThreadPool which overrides the max_threads, so it’s ignoring the stream configuration used in the listen method. It may cause that lot’s of threads are created when the traffic increases. This PR github.com/googleapis/google-cloud-ruby/pull/3682 fixes that but it’s not released yet.
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/back_run/pubsub.rb', line 24 def subscribe(worker) worker.queues.each do |queue| subscription = subscription_for(queue) subscriber = subscription.listen do || job = Job.from_json(.data) worker.(job, ack_callback(job, ), modify_ack_callback()) end subscriber.on_error { |error| BackRun.logger.error(error) } subscriber.start end end |