Class: Umbra::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/umbra/publisher.rb

Constant Summary collapse

DEFAULT_MAX_QUEUE =
100
DEFAULT_MIN_THREADS =
1
DEFAULT_MAX_THREADS =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Publisher

Returns a new instance of Publisher.



9
10
11
12
13
14
15
16
# File 'lib/umbra/publisher.rb', line 9

def initialize(**options)
  @pool = Concurrent::CachedThreadPool.new(
    min_threads: options.fetch(:min_threads, DEFAULT_MIN_THREADS),
    max_threads: options.fetch(:max_thread, DEFAULT_MAX_THREADS),
    max_queue: options.fetch(:max_queue, DEFAULT_MAX_QUEUE),
    fallback_policy: :abort
  )
end

Instance Attribute Details

#poolObject (readonly)

Returns the value of attribute pool.



7
8
9
# File 'lib/umbra/publisher.rb', line 7

def pool
  @pool
end

Instance Method Details

#call(env, encoder: Umbra.encoder, redis: Umbra.redis) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/umbra/publisher.rb', line 18

def call(env, encoder: Umbra.encoder, redis: Umbra.redis)
  @pool << proc { call!(env, encoder: encoder, redis: redis) }

  true
rescue Concurrent::RejectedExecutionError
  Umbra.logger.warn "[umbra] Queue at max - dropping items"

  false
end

#call!(env, encoder: Umbra.encoder, redis: Umbra.redis) ⇒ Object



28
29
30
# File 'lib/umbra/publisher.rb', line 28

def call!(env, encoder: Umbra.encoder, redis: Umbra.redis)
  redis.publish(Umbra::CHANNEL, encoder.call(env))
end