Class: OFlow::Actors::ShellRepeat
- Inherits:
-
OFlow::Actor
- Object
- OFlow::Actor
- OFlow::Actors::ShellRepeat
- Defined in:
- lib/oflow/actors/shellrepeat.rb
Instance Attribute Summary collapse
-
#cmd ⇒ Object
readonly
Returns the value of attribute cmd.
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
-
#out ⇒ Object
readonly
Returns the value of attribute out.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
- #busy? ⇒ Boolean
- #clearCtx(ctx) ⇒ Object
- #getCtx(ctx) ⇒ Object
- #hasCtx?(ctx) ⇒ Boolean
-
#initialize(task, options) ⇒ ShellRepeat
constructor
A new instance of ShellRepeat.
- #kill ⇒ Object
- #perform(op, box) ⇒ Object
Methods inherited from OFlow::Actor
#inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ ShellRepeat
Returns a new instance of ShellRepeat.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/oflow/actors/shellrepeat.rb', line 16 def initialize(task, ) super @dir = [:dir] @dir = '.' if @dir.nil? @dir = File.(@dir.strip) @cmd = [:cmd] @timeout = .fetch(:timeout, 1.0).to_f @timeout = 0.001 if 0.001 > @timeout @in = nil @out = nil @err = nil @pid = nil @outThread = nil @ctxs = {} @ctxCnt = 0 @killLock = Mutex.new end |
Instance Attribute Details
#cmd ⇒ Object (readonly)
Returns the value of attribute cmd.
12 13 14 |
# File 'lib/oflow/actors/shellrepeat.rb', line 12 def cmd @cmd end |
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
11 12 13 |
# File 'lib/oflow/actors/shellrepeat.rb', line 11 def dir @dir end |
#out ⇒ Object (readonly)
Returns the value of attribute out.
14 15 16 |
# File 'lib/oflow/actors/shellrepeat.rb', line 14 def out @out end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
13 14 15 |
# File 'lib/oflow/actors/shellrepeat.rb', line 13 def timeout @timeout end |
Instance Method Details
#busy? ⇒ Boolean
73 74 75 |
# File 'lib/oflow/actors/shellrepeat.rb', line 73 def busy?() !@ctxs.empty? end |
#clearCtx(ctx) ⇒ Object
85 86 87 |
# File 'lib/oflow/actors/shellrepeat.rb', line 85 def clearCtx(ctx) @ctxs.delete(ctx) end |
#getCtx(ctx) ⇒ Object
77 78 79 |
# File 'lib/oflow/actors/shellrepeat.rb', line 77 def getCtx(ctx) @ctxs[ctx] end |
#hasCtx?(ctx) ⇒ Boolean
81 82 83 |
# File 'lib/oflow/actors/shellrepeat.rb', line 81 def hasCtx?(ctx) @ctxs.has_key?(ctx) end |
#kill ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/oflow/actors/shellrepeat.rb', line 89 def kill() status = nil @killLock.synchronize do # kill but don't wait for an exit. Leave it orphaned so a new app can be # started. status = Process.kill("HUP", @pid) unless @pid.nil? @in.close() unless @in.nil? @out.close() unless @out.nil? @err.close() unless @err.nil? Thread.kill(@outThread) unless @outThread.nil? @in = nil @out = nil @err = nil @pid = nil @outThread = nil end status end |
#perform(op, box) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/oflow/actors/shellrepeat.rb', line 35 def perform(op, box) if :kill == op status = kill() task.ship(:killed, Box.new(status, box.tracker)) return end if @pid.nil? @in, @out, @err, wt = Open3.popen3(@cmd, chdir: @dir) @pid = wt[:pid] @outThread = Thread.start(self) do |me| Thread.current[:name] = me.task.full_name() + "-out" Oj.load(me.out, mode: :compat) do |o| begin k = o["ctx"] raise Exception.new("missing context in #{cmd} reply") if k.nil? raise Exception.new("context not found in #{cmd} reply for #{k}") unless me.hasCtx?(k) ctx = me.clearCtx(k) me.task.ship(nil, Box.new(o["out"], ctx)) rescue Exception => e me.task.handle_error(e) end end @outThread = nil kill() end end if @in.closed? kill() return end @ctxCnt += 1 @ctxs[@ctxCnt] = box.tracker wrap = { "ctx" => @ctxCnt, "in" => box.contents } input = Oj.dump(wrap, mode: :compat, indent: 0) @in.write(input + "\n") @in.flush end |