Class: Pressure
- Inherits:
-
Object
- Object
- Pressure
- Defined in:
- lib/pressure.rb,
lib/pressure/version.rb
Constant Summary collapse
- VERSION =
'0.1.3'
Instance Attribute Summary collapse
-
#sockets ⇒ Object
readonly
Returns the value of attribute sockets.
-
#wrapper_template ⇒ Object
Returns the value of attribute wrapper_template.
Instance Method Summary collapse
- #<<(socket) ⇒ Object
- #data_changed?(current_data, previous_data) ⇒ Boolean
- #delete(socket) ⇒ Object
- #incoming_monitor(&data_source_block) ⇒ Object
-
#initialize(options = {}, &data_source_block) ⇒ Pressure
constructor
A new instance of Pressure.
- #websocket_worker ⇒ Object
- #websocket_worker_loop ⇒ Object
- #wrap_data(data) ⇒ Object
Constructor Details
#initialize(options = {}, &data_source_block) ⇒ Pressure
Returns a new instance of Pressure.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/pressure.rb', line 10 def initialize( = {}, &data_source_block) @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = [] @websocket_worker_delay = [:websocket_worker_delay] || (1.0 / 20.0) @incoming_monitor_delay = [:incoming_monitor_delay] || (1.0 / 20.0) @no_wrap = [:no_wrap] || false incoming_monitor(&data_source_block) websocket_worker_loop end |
Instance Attribute Details
#sockets ⇒ Object (readonly)
Returns the value of attribute sockets.
8 9 10 |
# File 'lib/pressure.rb', line 8 def sockets @sockets end |
#wrapper_template ⇒ Object
Returns the value of attribute wrapper_template.
7 8 9 |
# File 'lib/pressure.rb', line 7 def wrapper_template @wrapper_template end |
Instance Method Details
#<<(socket) ⇒ Object
22 23 24 25 |
# File 'lib/pressure.rb', line 22 def <<(socket) @sockets << socket socket.send JSON.generate(@current_upstream) end |
#data_changed?(current_data, previous_data) ⇒ Boolean
36 37 38 39 40 41 42 |
# File 'lib/pressure.rb', line 36 def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end |
#delete(socket) ⇒ Object
27 28 29 |
# File 'lib/pressure.rb', line 27 def delete(socket) @sockets.delete socket end |
#incoming_monitor(&data_source_block) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pressure.rb', line 44 def incoming_monitor(&data_source_block) Thread.new do begin data = {} loop do upstream_data = data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << upstream_data else @send_queue << data end end sleep(@incoming_monitor_delay) end rescue => e puts "error #{e}" end end end |
#websocket_worker ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/pressure.rb', line 67 def websocket_worker # logger.info data.inspect queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end |
#websocket_worker_loop ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pressure.rb', line 75 def websocket_worker_loop Thread.new do begin loop do websocket_worker sleep(@websocket_worker_delay) end rescue => e puts "Worker error: #{e}" retry end end end |
#wrap_data(data) ⇒ Object
31 32 33 34 |
# File 'lib/pressure.rb', line 31 def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end |