Class: Daemonic::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/daemonic/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, options) ⇒ Producer

Returns a new instance of Producer.



6
7
8
9
10
11
12
13
# File 'lib/daemonic/producer.rb', line 6

def initialize(worker, options)
  @worker      = worker
  @options     = options
  @concurrency = options.fetch(:concurrency) { 4 }
  @queue_size  = options.fetch(:queue_size) { @concurrency + 1 }
  @logger      = options[:logger]
  @running     = true
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



4
5
6
# File 'lib/daemonic/producer.rb', line 4

def concurrency
  @concurrency
end

#optionsObject (readonly)

Returns the value of attribute options.



4
5
6
# File 'lib/daemonic/producer.rb', line 4

def options
  @options
end

#queue_sizeObject (readonly)

Returns the value of attribute queue_size.



4
5
6
# File 'lib/daemonic/producer.rb', line 4

def queue_size
  @queue_size
end

#workerObject (readonly)

Returns the value of attribute worker.



4
5
6
# File 'lib/daemonic/producer.rb', line 4

def worker
  @worker
end

Instance Method Details

#loggerObject



42
43
44
45
46
# File 'lib/daemonic/producer.rb', line 42

def logger
  @logger ||= Logger.new(@options[:log] || STDOUT).tap { |logger|
    logger.level = @options[:log_level] || Logger::INFO
  }
end

#runObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/daemonic/producer.rb', line 15

def run
  logger.info "Starting producer with #{concurrency} consumer threads."

  Signal.trap("INT") { stop }
  Signal.trap("TERM") { stop }

  pool = Pool.new(self)

  producer = Thread.new do
    while @running
      worker.produce(pool)
      Thread.pass
    end
    logger.info { "Producer has been shut down. Stopping the thread pool" }
    pool.stop
  end

  producer.join

  logger.info { "Shutting down" }

end

#stopObject



38
39
40
# File 'lib/daemonic/producer.rb', line 38

def stop
  @running = false
end