Class: Orchestrator::Device::Processor
- Inherits:
-
Object
- Object
- Orchestrator::Device::Processor
- Includes:
- Transcoder
- Defined in:
- lib/orchestrator/device/processor.rb
Constant Summary collapse
- SEND_DEFAULTS =
{ wait: true, # wait for a response before continuing with sends delay: 0, # make sure sends are separated by at least this (in milliseconds) delay_on_receive: 0, # delay the next send by this (milliseconds) after a receive max_waits: 3, # number of times we will ignore valid tokens before retry retries: 2, # Retry attempts before we give up on the command hex_string: false, # Does the input need conversion timeout: 5000, # Time we will wait for a response priority: 50, # Priority of a send force_disconnect: false # Mainly for use with make and break # Other options include: # * emit callback to occur once command complete (may be discarded if a named command) # * on_receive (alternative to received function) # * clear_queue (clear further commands once this has run) }
- CONFIG_DEFAULTS =
{ tokenize: false, # If replaced with a callback can define custom tokenizers size_limit: 524288, # 512kb buffer max clear_queue_on_disconnect: false, flush_buffer_on_disconnect: false, priority_bonus: 20, # give commands bonus priority under certain conditions update_status: true, # auto update connected status? thrashing_threshold: 1500 # min milliseconds between connection retries # Other options include: # * inactivity_timeout (used with make and break) # * delimiter (string or regex to match message end) # * indicator (string or regex to match message start) # * verbose (throw errors or silently recover) # * wait_ready (wait for some signal before signaling connected) # * encoding (BINARY) (force encoding on incoming data) }
- SUCCESS =
Set.new([true, :success, :abort, nil, :ignore])
- FAILURE =
Set.new([false, :retry, :failed, :fail])
- DUMMY_RESOLVER =
proc {}
- TERMINATE_MSG =
Error::CommandCanceled.new 'command canceled due to module shutdown'
- UNNAMED =
'unnamed'
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#last_receive_at ⇒ Object
readonly
For statistics only.
-
#last_sent_at ⇒ Object
readonly
For statistics only.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#timeout ⇒ Object
readonly
For statistics only.
-
#transport ⇒ Object
Returns the value of attribute transport.
Instance Method Summary collapse
- #buffer(data) ⇒ Object
- #check_next ⇒ Object
-
#connected ⇒ Object
Callbacks ————————-.
- #disconnected ⇒ Object
-
#initialize(man) ⇒ Processor
constructor
init -> mod.load -> post_init So config can be set in on_load if desired.
-
#queue_command(options) ⇒ Object
Public interface.
-
#send_options(options) ⇒ Object
Helper functions ——————.
- #terminate ⇒ Object
Methods included from Transcoder
array_to_str, byte_to_hex, hex_to_byte, str_to_array
Constructor Details
#initialize(man) ⇒ Processor
init -> mod.load -> post_init So config can be set in on_load if desired
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/orchestrator/device/processor.rb', line 75 def initialize(man) @man = man @thread = @man.thread @logger = @man.logger @defaults = SEND_DEFAULTS.dup @config = CONFIG_DEFAULTS.dup @queue = CommandQueue.new(@thread, method(:send_next)) @responses = [] @wait = false @connected = false @checking = Mutex.new @bonus = 0 @last_sent_at = 0 @last_receive_at = 0 # Used to indicate when we can start the next response processing @head = ::Libuv::Q::ResolvedPromise.new(@thread, true) @tail = ::Libuv::Q::ResolvedPromise.new(@thread, true) # Method variables @resolver = proc { |resp| @thread.schedule { resolve_callback(resp) } } @resp_success = proc { |result| @thread.next_tick { resp_success(result) } } @resp_failure = proc { |reason| @thread.next_tick { resp_failure(reason) } } end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
66 67 68 |
# File 'lib/orchestrator/device/processor.rb', line 66 def config @config end |
#last_receive_at ⇒ Object (readonly)
For statistics only
70 71 72 |
# File 'lib/orchestrator/device/processor.rb', line 70 def last_receive_at @last_receive_at end |
#last_sent_at ⇒ Object (readonly)
For statistics only
70 71 72 |
# File 'lib/orchestrator/device/processor.rb', line 70 def last_sent_at @last_sent_at end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
66 67 68 |
# File 'lib/orchestrator/device/processor.rb', line 66 def queue @queue end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
66 67 68 |
# File 'lib/orchestrator/device/processor.rb', line 66 def thread @thread end |
#timeout ⇒ Object (readonly)
For statistics only
70 71 72 |
# File 'lib/orchestrator/device/processor.rb', line 70 def timeout @timeout end |
#transport ⇒ Object
Returns the value of attribute transport.
67 68 69 |
# File 'lib/orchestrator/device/processor.rb', line 67 def transport @transport end |
Instance Method Details
#buffer(data) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/orchestrator/device/processor.rb', line 173 def buffer(data) @last_receive_at = @thread.now if @buffer @responses.concat @buffer.extract(data) else # tokenizing buffer above will enforce encoding if @config[:encoding] data.force_encoding(@config[:encoding]) end @responses << data end # if we are waiting we don't want to process this data just yet if !@wait check_next end end |
#check_next ⇒ Object
196 197 198 199 200 201 202 203 204 |
# File 'lib/orchestrator/device/processor.rb', line 196 def check_next return if @checking.locked? || @responses.length <= 0 @checking.synchronize { loop do check_data(@responses.shift) break if @wait || @responses.length == 0 end } end |
#connected ⇒ Object
Callbacks ————————-
148 149 150 151 152 153 154 155 |
# File 'lib/orchestrator/device/processor.rb', line 148 def connected @connected = true new_buffer @man.notify_connected if @config[:update_status] @man.trak(:connected, true) end end |
#disconnected ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/orchestrator/device/processor.rb', line 157 def disconnected @connected = false @man.notify_disconnected if @config[:update_status] @man.trak(:connected, false) end if @buffer && @config[:flush_buffer_on_disconnect] check_data(@buffer.flush) end @buffer = nil if @queue.waiting resp_failure(:disconnected) end end |
#queue_command(options) ⇒ Object
Public interface
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/orchestrator/device/processor.rb', line 119 def queue_command() # Make sure we are sending appropriately formatted data raw = [:data] if raw.is_a?(Array) [:data] = array_to_str(raw) elsif [:hex_string] == true [:data] = hex_to_byte(raw) end data = [:data] [:retries] = 0 if [:wait] == false if [:name].is_a? String [:name] = [:name].to_sym end # merge in the defaults = @defaults.merge() @queue.push(, [:priority] + @bonus) rescue => e [:defer].reject(e) @logger.print_error(e, 'error queuing command') end |
#send_options(options) ⇒ Object
Helper functions ——————
107 108 109 |
# File 'lib/orchestrator/device/processor.rb', line 107 def () @defaults.merge!() end |
#terminate ⇒ Object
192 193 194 |
# File 'lib/orchestrator/device/processor.rb', line 192 def terminate @thread.schedule method(:do_terminate) end |