Class: Pressure
- Inherits:
-
Object
- Object
- Pressure
- Defined in:
- lib/pressure.rb,
lib/pressure/version.rb
Constant Summary collapse
- VERSION =
'0.1.1'
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#sockets ⇒ Object
readonly
Returns the value of attribute sockets.
-
#wrapper_template ⇒ Object
Returns the value of attribute wrapper_template.
Instance Method Summary collapse
- #<<(socket) ⇒ Object
- #data_changed?(current_data, previous_data) ⇒ Boolean
- #delete(socket) ⇒ Object
- #incoming_monitor(&data_source_block) ⇒ Object
-
#initialize(options = {}, &data_source_block) ⇒ Pressure
constructor
A new instance of Pressure.
- #websocket_worker ⇒ Object
- #websocket_worker_loop ⇒ Object
- #wrap_data(data) ⇒ Object
Constructor Details
#initialize(options = {}, &data_source_block) ⇒ Pressure
Returns a new instance of Pressure.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/pressure.rb', line 11 def initialize( = {}, &data_source_block) @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new = @sockets = [] @websocket_worker_delay = [:websocket_worker_delay] || (1.0 / 20.0) incoming_monitor(&data_source_block) websocket_worker_loop end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
9 10 11 |
# File 'lib/pressure.rb', line 9 def end |
#sockets ⇒ Object (readonly)
Returns the value of attribute sockets.
8 9 10 |
# File 'lib/pressure.rb', line 8 def sockets @sockets end |
#wrapper_template ⇒ Object
Returns the value of attribute wrapper_template.
7 8 9 |
# File 'lib/pressure.rb', line 7 def wrapper_template @wrapper_template end |
Instance Method Details
#<<(socket) ⇒ Object
22 23 24 |
# File 'lib/pressure.rb', line 22 def <<(socket) @sockets << socket end |
#data_changed?(current_data, previous_data) ⇒ Boolean
35 36 37 |
# File 'lib/pressure.rb', line 35 def data_changed?(current_data, previous_data) current_data != previous_data end |
#delete(socket) ⇒ Object
26 27 28 |
# File 'lib/pressure.rb', line 26 def delete(socket) @sockets.delete socket end |
#incoming_monitor(&data_source_block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/pressure.rb', line 39 def incoming_monitor(&data_source_block) Thread.new do begin data = {} loop do upstream_data = data_source_block.call if data_changed?(data[:upstream_data], upstream_data) data = wrap_data(upstream_data) if [:no_wrap] @send_queue << upstream_data else @send_queue << data end end sleep(1.0 / 20.0) end rescue => e puts "error #{e}" end end end |
#websocket_worker ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/pressure.rb', line 61 def websocket_worker # logger.info data.inspect queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end |
#websocket_worker_loop ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pressure.rb', line 69 def websocket_worker_loop Thread.new do begin loop do websocket_worker sleep(@websocket_worker_delay) end rescue => e puts "Worker error: #{e}" retry end end end |
#wrap_data(data) ⇒ Object
30 31 32 33 |
# File 'lib/pressure.rb', line 30 def wrap_data(data) @wrapper_template.merge(upstream_data: data.clone, last_update_ts: Time.now.utc.to_i) end |