Class: Msf::TaskManager

Inherits:
Object
  • Object
show all
Defined in:
lib/msf/core/task_manager.rb

Overview

This class provides a task manager

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(framework) ⇒ TaskManager

Create a new TaskManager


65
66
67
68
# File 'lib/msf/core/task_manager.rb', line 65

def initialize(framework)
  self.framework = framework
  self.flush
end

Instance Attribute Details

#frameworkObject

Returns the value of attribute framework


60
61
62
# File 'lib/msf/core/task_manager.rb', line 60

def framework
  @framework
end

#processingObject

Returns the value of attribute processing


57
58
59
# File 'lib/msf/core/task_manager.rb', line 57

def processing
  @processing
end

#queueObject

Returns the value of attribute queue


58
59
60
# File 'lib/msf/core/task_manager.rb', line 58

def queue
  @queue
end

#threadObject

Returns the value of attribute thread


59
60
61
# File 'lib/msf/core/task_manager.rb', line 59

def thread
  @thread
end

Instance Method Details

#backlogObject

Retrieve the number of tasks in the queue


151
152
153
# File 'lib/msf/core/task_manager.rb', line 151

def backlog
  self.queue.length
end

#flushObject

Flush the queue


94
95
96
# File 'lib/msf/core/task_manager.rb', line 94

def flush
  self.queue = []
end

#killObject

Forcefully kill the processing thread


111
112
113
114
115
116
# File 'lib/msf/core/task_manager.rb', line 111

def kill
  return if not self.thread
  self.processing = false
  self.thread.kill
  self.thread = nil
end

#log_debug(msg) ⇒ Object


232
233
234
# File 'lib/msf/core/task_manager.rb', line 232

def log_debug(msg)
  dlog(msg, 'core')
end

#log_error(msg) ⇒ Object


228
229
230
# File 'lib/msf/core/task_manager.rb', line 228

def log_error(msg)
  elog(msg, 'core')
end

#process_task(task) ⇒ Object

Process a specific task from the queue


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/msf/core/task_manager.rb', line 193

def process_task(task)
  begin
    if(task.timeout)
      ::Timeout.timeout(task.timeout) do
        task.run(self, task)
      end
    else
      task.run(self, task)
    end
  rescue ::ThreadError
    # Ignore these (caused by a return inside of the proc)
  rescue ::Exception => e

    if(e.class == ::Timeout::Error)
      elog("taskmanager: task #{task.inspect} timed out after #{task.timeout} seconds")
      task.status = :timeout
      task.completed = Time.now
      return :drop
    end

    elog("taskmanager: task triggered an exception: #{e.class} #{e}")
    elog("taskmanager: task proc: #{task.proc.inspect} ")
    elog("taskmanager: task Call stack: \n#{task.source.join("\n")} ")
    dlog("Call stack:\n#{[email protected].join("\n")}")

    task.status    = :dropped
    task.exception = e
    return :drop

  end
  task.status = :done
  task.completed = Time.now
  return :done
end

#process_tasksObject

Process the actual queue


158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/msf/core/task_manager.rb', line 158

def process_tasks
  spin  = 50
  ltask = nil

  while(self.processing)
    cnt = 0
    while(task = self.queue.shift)
      stime = Time.now.to_f
      ret = process_task(task)
      etime = Time.now.to_f

      case ret
      when :requeue
        self.queue.push(task)
      when :drop, :done
        # Processed or dropped
      end
      cnt += 1

      ltask = task
    end

    spin = (cnt == 0) ? (spin + 1) : 0

    if spin > 10
      ::IO.select(nil, nil, nil, 0.25)
    end

  end
  self.thread = nil
end

#queue_proc(proc) ⇒ Object

Add a new task via proc


73
74
75
76
77
# File 'lib/msf/core/task_manager.rb', line 73

def queue_proc(proc)
  task = Task.new(proc)
  queue_task(task)
  return task
end

#queue_task(task) ⇒ Object

Add a new task to the queue unless we are called by the queue thread itself.


83
84
85
86
87
88
89
# File 'lib/msf/core/task_manager.rb', line 83

def queue_task(task)
  if Thread.current[:task_manager]
    process_task(task)
  else
    self.queue.push(task)
  end
end

#restartObject

Restart the task processor


143
144
145
146
# File 'lib/msf/core/task_manager.rb', line 143

def restart
  stop
  start
end

#startObject

Start processing tasks


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/msf/core/task_manager.rb', line 121

def start
  return if self.thread
  self.processing = true
  self.thread     = framework.threads.spawn("TaskManager", true) do
    begin
      process_tasks
    rescue ::Exception => e
      elog("taskmanager: process_tasks exception: #{e.class} #{e} #{e.backtrace}")
      retry
    end
  end

  # Mark this thread as the task manager
  self.thread[:task_manager] = true

  # Return the thread object to the caller
  self.thread
end

#stopObject

Stop processing events


101
102
103
104
105
106
# File 'lib/msf/core/task_manager.rb', line 101

def stop
  return if not self.thread
  self.processing = false
  self.thread.join
  self.thread = nil
end