Class: Knj::Threadpool::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/knj/threadpool.rb

Overview

This is the threadpool worker-object. No need to spawn this manually.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Worker

Constructor. Should not be called manually.



113
114
115
116
117
118
119
120
# File 'lib/knj/threadpool.rb', line 113

def initialize(args)
  @args = args
  @tp = @args[:threadpool]
  @mutex_tp = @tp.mutex
  @sleep = @tp.args[:sleep]
  @running = false
  self.spawn_thread
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



110
111
112
# File 'lib/knj/threadpool.rb', line 110

def running
  @running
end

Instance Method Details

#busy?Boolean

Returns true if the worker is currently working with a block.

Returns:

  • (Boolean)


172
173
174
# File 'lib/knj/threadpool.rb', line 172

def busy?
  return true if @blockdata
end

#idObject

Returns the ID of the worker.



177
178
179
# File 'lib/knj/threadpool.rb', line 177

def id
  return @args[:id]
end

#killObject

Kills the thread.



209
210
211
212
213
# File 'lib/knj/threadpool.rb', line 209

def kill
  @mutex_tp.synchronize do
    @thread.kill
  end
end

#restartObject

Kills the current thread and restarts the worker.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/knj/threadpool.rb', line 182

def restart
  @mutex_tp.synchronize do
    @thread.kill
    
    if @blockdata
      @blockdata[:runned] = true
      @blockdata[:running] = false
      
      begin
        sleep 0.1
        raise "The worker was stopped during execution of the block."
      rescue => e
        @blockdata[:error] = e
      end
    end
    
    #Dont run the job again - remove it from the queue.
    @tp.blocks.delete(@blockdata)
    @blockdata = nil
    @running = false
    
    #Spawn a new thread - we killed the previous.
    self.spawn_thread
  end
end

#spawn_threadObject

Starts the workers thread.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/knj/threadpool.rb', line 123

def spawn_thread
  @thread = Knj::Thread.new do
    loop do
      break if !@sleep or !@tp
      
      if !@blockdata
        sleep @sleep
        @blockdata = @tp.get_block if !@blockdata
      end
      
      next if !@blockdata
      
      res = nil
      raise "No block in blockdata?" if !@blockdata[:block]
      @blockdata[:worker] = self
      Thread.current[:knj_threadpool] = {
        :worker => self,
        :blockdata => @blockdata
      }
      
      begin
        @running = true
        res = @blockdata[:block].call(*@blockdata[:args])
      rescue => e
        @mutex_tp.synchronize do
          @blockdata[:error] = e
        end
      ensure
        #Reset thread.
        Thread.current[:knj_threadpool] = nil
        
        #Set running-status on worker.
        @running = false
        
        #Update block-data.
        @mutex_tp.synchronize do
          @blockdata[:result] = res if res
          @blockdata[:runned] = true
          @blockdata[:running] = false
        end
        
        #Try to avoid slowdown of sleep by checking if there is a new block right away.
        @blockdata = @tp.get_block
      end
    end
  end
end