Class: Orchestrator::Device::Processor

Inherits:
Object
  • Object
show all
Includes:
Transcoder
Defined in:
lib/orchestrator/device/processor.rb

Constant Summary collapse

SEND_DEFAULTS =
{
    wait: true,                 # wait for a response before continuing with sends
    delay: 0,                   # make sure sends are separated by at least this (in milliseconds)
    delay_on_receive: 0,        # delay the next send by this (milliseconds) after a receive
    max_waits: 3,               # number of times we will ignore valid tokens before retry
    retries: 2,                 # Retry attempts before we give up on the command
    hex_string: false,          # Does the input need conversion
    timeout: 5000,              # Time we will wait for a response
    priority: 50,               # Priority of a send
    force_disconnect: false     # Mainly for use with make and break

    # Other options include:
    # * emit callback to occur once command complete (may be discarded if a named command)
    # * on_receive (alternative to received function)
    # * clear_queue (clear further commands once this has run)
}
CONFIG_DEFAULTS =
{
    tokenize: false,    # If replaced with a callback can define custom tokenizers
    size_limit: 524288, # 512kb buffer max
    clear_queue_on_disconnect: false,
    flush_buffer_on_disconnect: false,
    priority_bonus: 20,  # give commands bonus priority under certain conditions
    update_status: true, # auto update connected status?
    thrashing_threshold: 1500  # min milliseconds between connection retries

    # Other options include:
    # * inactivity_timeout (used with make and break)
    # * delimiter (string or regex to match message end)
    # * indicator (string or regex to match message start)
    # * verbose (throw errors or silently recover)
    # * wait_ready (wait for some signal before signaling connected)
    # * encoding (BINARY) (force encoding on incoming data)
}
SUCCESS =
Set.new([true, :success, :abort, nil, :ignore])
FAILURE =
Set.new([false, :retry, :failed, :fail])
DUMMY_RESOLVER =
proc {}
TERMINATE_MSG =
Error::CommandCanceled.new 'command canceled due to module shutdown'
UNNAMED =
'unnamed'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Transcoder

array_to_str, byte_to_hex, hex_to_byte, str_to_array

Constructor Details

#initialize(man) ⇒ Processor

init -> mod.load -> post_init So config can be set in on_load if desired



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/orchestrator/device/processor.rb', line 75

def initialize(man)
    @man = man

    @thread = @man.thread
    @logger = @man.logger
    @defaults = SEND_DEFAULTS.dup
    @config = CONFIG_DEFAULTS.dup

    @queue = CommandQueue.new(@thread, method(:send_next))
    @responses = []
    @wait = false
    @connected = false
    @checking = Mutex.new
    @bonus = 0

    @last_sent_at = 0
    @last_receive_at = 0


    # Used to indicate when we can start the next response processing
    @head = ::Libuv::Q::ResolvedPromise.new(@thread, true)
    @tail = ::Libuv::Q::ResolvedPromise.new(@thread, true)

    # Method variables
    @resolver = proc { |resp| @thread.schedule { resolve_callback(resp) } }

    @resp_success = proc { |result| @thread.next_tick { resp_success(result) } }
    @resp_failure = proc { |reason| @thread.next_tick { resp_failure(reason) } }
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



66
67
68
# File 'lib/orchestrator/device/processor.rb', line 66

def config
  @config
end

#last_receive_atObject (readonly)

For statistics only



70
71
72
# File 'lib/orchestrator/device/processor.rb', line 70

def last_receive_at
  @last_receive_at
end

#last_sent_atObject (readonly)

For statistics only



70
71
72
# File 'lib/orchestrator/device/processor.rb', line 70

def last_sent_at
  @last_sent_at
end

#queueObject (readonly)

Returns the value of attribute queue.



66
67
68
# File 'lib/orchestrator/device/processor.rb', line 66

def queue
  @queue
end

#threadObject (readonly)

Returns the value of attribute thread.



66
67
68
# File 'lib/orchestrator/device/processor.rb', line 66

def thread
  @thread
end

#timeoutObject (readonly)

For statistics only



70
71
72
# File 'lib/orchestrator/device/processor.rb', line 70

def timeout
  @timeout
end

#transportObject

Returns the value of attribute transport.



67
68
69
# File 'lib/orchestrator/device/processor.rb', line 67

def transport
  @transport
end

Instance Method Details

#buffer(data) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/orchestrator/device/processor.rb', line 173

def buffer(data)
    @last_receive_at = @thread.now

    if @buffer
        @responses.concat @buffer.extract(data)
    else
        # tokenizing buffer above will enforce encoding
        if @config[:encoding]
            data.force_encoding(@config[:encoding])
        end
        @responses << data
    end

    # if we are waiting we don't want to process this data just yet
    if !@wait
        check_next
    end
end

#check_nextObject



196
197
198
199
200
201
202
203
204
# File 'lib/orchestrator/device/processor.rb', line 196

def check_next
    return if @checking.locked? || @responses.length <= 0
    @checking.synchronize {
        loop do
            check_data(@responses.shift)
            break if @wait || @responses.length == 0
        end
    }
end

#connectedObject

Callbacks ————————-



148
149
150
151
152
153
154
155
# File 'lib/orchestrator/device/processor.rb', line 148

def connected
    @connected = true
    new_buffer
    @man.notify_connected
    if @config[:update_status]
        @man.trak(:connected, true)
    end
end

#disconnectedObject



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/orchestrator/device/processor.rb', line 157

def disconnected
    @connected = false
    @man.notify_disconnected
    if @config[:update_status]
        @man.trak(:connected, false)
    end
    if @buffer && @config[:flush_buffer_on_disconnect]
        check_data(@buffer.flush)
    end
    @buffer = nil

    if @queue.waiting
        resp_failure(:disconnected)
    end
end

#queue_command(options) ⇒ Object

Public interface



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/orchestrator/device/processor.rb', line 119

def queue_command(options)
    # Make sure we are sending appropriately formatted data
    raw = options[:data]

    if raw.is_a?(Array)
        options[:data] = array_to_str(raw)
    elsif options[:hex_string] == true
        options[:data] = hex_to_byte(raw)
    end

    data = options[:data]
    options[:retries] = 0 if options[:wait] == false

    if options[:name].is_a? String
        options[:name] = options[:name].to_sym
    end

    # merge in the defaults
    options = @defaults.merge(options)

    @queue.push(options, options[:priority] + @bonus)

rescue => e
    options[:defer].reject(e)
    @logger.print_error(e, 'error queuing command')
end

#send_options(options) ⇒ Object

Helper functions ——————



107
108
109
# File 'lib/orchestrator/device/processor.rb', line 107

def send_options(options)
    @defaults.merge!(options)
end

#terminateObject



192
193
194
# File 'lib/orchestrator/device/processor.rb', line 192

def terminate
    @thread.schedule method(:do_terminate)
end