Class: Knj::Threadpool::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/knj/threadpool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Worker

Returns a new instance of Worker.



102
103
104
105
106
107
108
109
# File 'lib/knj/threadpool.rb', line 102

def initialize(args)
  @args = args
  @tp = @args[:threadpool]
  @mutex_tp = @tp.mutex
  @sleep = @tp.args[:sleep]
  @running = false
  self.spawn_thread
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



100
101
102
# File 'lib/knj/threadpool.rb', line 100

def running
  @running
end

Instance Method Details

#busy?Boolean

Returns true if the worker is currently working with a block.

Returns:

  • (Boolean)


178
179
180
# File 'lib/knj/threadpool.rb', line 178

def busy?
  return true if @blockdata
end

#idObject

Returns the ID of the worker.



183
184
185
# File 'lib/knj/threadpool.rb', line 183

def id
  return @args[:id]
end

#killObject

Kills the thread.



222
223
224
225
226
# File 'lib/knj/threadpool.rb', line 222

def kill
  @mutex_tp.synchronize do
    @thread.kill
  end
end

#restartObject

Kills the current thread and restarts the worker.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/knj/threadpool.rb', line 188

def restart
  @mutex_tp.synchronize do
    @thread.kill
    
    if @blockdata
      @blockdata[:runned] = true
      @blockdata[:running] = false
      
      begin
        sleep 0.1
        raise "The worker was stopped during execution of the block."
      rescue Exception => e
        @blockdata[:error] = e
      end
    end
    
    #Dont run the job again - remove it from the queue.
    @tp.blocks.delete(@blockdata)
    @blockdata = nil
    @running = false
    
    #Spawn a new thread - we killed the previous.
    self.spawn_thread
  end
end

#spawn_threadObject

Starts the workers thread.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/knj/threadpool.rb', line 112

def spawn_thread
  @thread = Knj::Thread.new do
    loop do
      if !@blockdata
        sleep @sleep
        @blockdata = @tp.get_block if !@blockdata
      end
      
      next if !@blockdata
      
      res = nil
      raise "No block in blockdata?" if !@blockdata[:block]
      @blockdata[:worker] = self
      Thread.current[:knj_threadpool] = {
        :worker => self,
        :blockdata => @blockdata
      }
      
      begin
        if @blockdata.key?(:result)
          begin
            @running = true
            res = @blockdata[:block].call(*@blockdata[:args])
          rescue Exception => e
            @mutex_tp.synchronize do
              @blockdata[:error] = e
            end
          ensure
            @running = false
            
            @mutex_tp.synchronize do
              @blockdata[:result] = res
              @blockdata[:runned] = true
              @blockdata[:running] = false
            end
            
            #Try to avoid slowdown of sleep by checking if there is a new block right away.
            @blockdata = @tp.get_block
          end
        else
          begin
            @blockdata[:block].call(*@blockdata[:args])
          rescue Exception => e
            if @tp.events.connected?(:on_error)
              @tp.events.call(:on_error, e)
            else
              STDOUT.print Knj::Errors.error_str(e)
            end
          ensure
            @mutex_tp.synchronize do
              @blockdata.clear if @blockdata
              @tp.blocks.delete(@blockdata)
            end
            
            #Try to avoid slowdown of sleep by checking if there is a new block right away.
            @blockdata = @tp.get_block
          end
        end
      ensure
        Thread.current[:knj_threadpool] = nil
      end
    end
  end
end

#stopObject

Sleeps the thread.



215
216
217
218
219
# File 'lib/knj/threadpool.rb', line 215

def stop
  @mutex_tp.synchronize do
    @thread.stop
  end
end