Class: S3DataPacker::ThreadSet

Inherits:
Object
  • Object
show all
Defined in:
lib/s3_data_packer/thread_set.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ ThreadSet

Returns a new instance of ThreadSet.



5
6
7
8
9
10
# File 'lib/s3_data_packer/thread_set.rb', line 5

def initialize opts ={}
  @lock = Mutex.new
  @workers = []
  @finish = false
  @queue = S3DataPacker::Queue.new
end

Instance Attribute Details

#lockObject (readonly)

Returns the value of attribute lock.



3
4
5
# File 'lib/s3_data_packer/thread_set.rb', line 3

def lock
  @lock
end

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/s3_data_packer/thread_set.rb', line 3

def queue
  @queue
end

#workersObject (readonly)

Returns the value of attribute workers.



3
4
5
# File 'lib/s3_data_packer/thread_set.rb', line 3

def workers
  @workers
end

Instance Method Details

#dead?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/s3_data_packer/thread_set.rb', line 24

def dead?
  workers.map(&:status).uniq == [nil] || workers.map(&:status).uniq == [false]
end

#finish!Object



39
40
41
# File 'lib/s3_data_packer/thread_set.rb', line 39

def finish!
  @finish = true
end

#finished?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/s3_data_packer/thread_set.rb', line 43

def finished?
  @finish == true && queue.size == 0
end

#kill!Object



28
29
30
31
# File 'lib/s3_data_packer/thread_set.rb', line 28

def kill!
  log 'All', "Killing #{workers.size} workers"
  workers.map(&:kill)
end

#lock_wait_timeObject



16
17
18
# File 'lib/s3_data_packer/thread_set.rb', line 16

def lock_wait_time
  @lock_wait_time ||= S3DataPacker.config.thread_lock_wait_time
end

#log(id, message, level = :info) ⇒ Object



47
48
49
# File 'lib/s3_data_packer/thread_set.rb', line 47

def log id, message, level = :info
  logger.send level, "Thread #{id}: #{message}"
end

#reset!Object



33
34
35
36
37
# File 'lib/s3_data_packer/thread_set.rb', line 33

def reset!
  return unless dead?
  @finish = false
  @workers = []
end

#spawn_thread!(id, &block) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/s3_data_packer/thread_set.rb', line 55

def spawn_thread! id, &block
  @workers << Thread.new do
    log id, "Started!"
    loop do
      if finished?
        log id, "Finish signal up and no more work to pull - Exiting"
        break
      end
      item = queue.fetch!
      if item
        log id, "Processing item #{item}", :debug
        begin
          yield item
        rescue ThreadError
          log id, "Locked, retry in #{lock_wait_time}", :warn
          sleep(lock_wait_time)
          retry
        end
      else
        log id, "No more work found, sleeping for #{wait_time}"
        sleep(wait_time)
      end
    rescue Exception => e
      log id, 'Unexpected error!'
      raise e
    end
  end
end

#spawn_threads!(&block) ⇒ Object



84
85
86
87
88
89
# File 'lib/s3_data_packer/thread_set.rb', line 84

def spawn_threads! &block
  logger.info "Spawning #{thread_count} threads"
  thread_count.times do |id|
    spawn_thread!(id, &block)
  end
end

#thread_countObject



20
21
22
# File 'lib/s3_data_packer/thread_set.rb', line 20

def thread_count
  @thread_count ||= S3DataPacker.config.thread_count
end

#wait!Object



51
52
53
# File 'lib/s3_data_packer/thread_set.rb', line 51

def wait!
  workers.map(&:join)
end

#wait_timeObject



12
13
14
# File 'lib/s3_data_packer/thread_set.rb', line 12

def wait_time
  @wait_time ||= S3DataPacker.config.thread_sleep_time
end