Class: Proco

Inherits:
Object
  • Object
show all
Includes:
OptionInitializer, Logger
Defined in:
lib/proco.rb,
lib/proco/future.rb,
lib/proco/logger.rb,
lib/proco/mt/base.rb,
lib/proco/mt/pool.rb,
lib/proco/version.rb,
lib/proco/mt/worker.rb,
lib/proco/dispatcher.rb,
lib/proco/queue/base.rb,
lib/proco/mt/threaded.rb,
lib/proco/queue/batch_queue.rb,
lib/proco/queue/multi_queue.rb,
lib/proco/queue/single_queue.rb

Defined Under Namespace

Modules: Logger, MT, Queue Classes: Dispatcher, Future

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :interval   => 0,
  :threads    => 1,
  :queues     => 1,
  :queue_size => 1000,
  :batch      => false
}
VERSION =
"0.0.2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger

Constructor Details

#initialize(options = {}, &processor) ⇒ Proco

Returns a new instance of Proco.



47
48
49
50
51
52
53
54
# File 'lib/proco.rb', line 47

def initialize options = {}, &processor
  validate_options options
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = @options[:logger]

  @pool = nil
  @running = false
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



37
38
39
# File 'lib/proco.rb', line 37

def options
  @options
end

Instance Method Details

#exitnil

Stops Proco, returns results from remaining submissions in the queue.

Returns:

  • (nil)


86
87
88
89
90
91
# File 'lib/proco.rb', line 86

def exit
  check_running
  @running = false
  @dispatchers.each(&:exit)
  @pool.exit
end

#killnil

Returns:

  • (nil)


94
95
96
97
98
99
100
# File 'lib/proco.rb', line 94

def kill
  check_running
  @running = false
  @dispatchers.each(&:kill)
  @pool.kill
  nil
end

#running?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/proco.rb', line 103

def running?
  @running
end

#start(&block) ⇒ Proco

Returns:

Raises:

  • (ArgumentError)


57
58
59
60
61
62
63
64
65
66
# File 'lib/proco.rb', line 57

def start &block
  raise ArgumentError, "Block not given" if block.nil?
  @running = true
  @pool = Proco::MT::Pool.new(options[:threads], @logger)
  @dispatchers = options[:queues].times.map { |i|
    Dispatcher.new(self, @pool, block)
  }

  self
end

#submit(item) ⇒ Hash

Synchronous submission

Parameters:

  • items (Object)

Returns:

  • (Hash)


71
72
73
74
# File 'lib/proco.rb', line 71

def submit item
  check_running
  submit!(item).get
end

#submit!(item) ⇒ Proco::Future

Asynchronous submission

Parameters:

  • items (Object)

Returns:



79
80
81
82
# File 'lib/proco.rb', line 79

def submit! item
  check_running
  @dispatchers.sample.push(item)
end