Class: Pressure

Inherits:
Object
  • Object
show all
Defined in:
lib/pressure.rb,
lib/pressure/version.rb

Constant Summary collapse

VERSION =
'0.1.3'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &data_source_block) ⇒ Pressure

Returns a new instance of Pressure.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/pressure.rb', line 10

def initialize(options = {}, &data_source_block)
  @wrapper_template = {}
  @current_upstream = {}
  @send_queue = Queue.new
  @sockets = []
  @websocket_worker_delay = options[:websocket_worker_delay] || (1.0 / 20.0)
  @incoming_monitor_delay = options[:incoming_monitor_delay] || (1.0 / 20.0)
  @no_wrap = options[:no_wrap] || false
  incoming_monitor(&data_source_block)
  websocket_worker_loop
end

Instance Attribute Details

#socketsObject (readonly)

Returns the value of attribute sockets.



8
9
10
# File 'lib/pressure.rb', line 8

def sockets
  @sockets
end

#wrapper_templateObject

Returns the value of attribute wrapper_template.



7
8
9
# File 'lib/pressure.rb', line 7

def wrapper_template
  @wrapper_template
end

Instance Method Details

#<<(socket) ⇒ Object



22
23
24
25
# File 'lib/pressure.rb', line 22

def <<(socket)
  @sockets << socket
  socket.send JSON.generate(@current_upstream)
end

#data_changed?(current_data, previous_data) ⇒ Boolean

Returns:

  • (Boolean)


36
37
38
39
40
41
42
# File 'lib/pressure.rb', line 36

def data_changed?(current_data, previous_data)
  if current_data && previous_data
    current_data != previous_data
  else
    true
  end
end

#delete(socket) ⇒ Object



27
28
29
# File 'lib/pressure.rb', line 27

def delete(socket)
  @sockets.delete socket
end

#incoming_monitor(&data_source_block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pressure.rb', line 44

def incoming_monitor(&data_source_block)
  Thread.new do
    begin
      data = {}
      loop do
        upstream_data = data_source_block.call
        if data_changed?(upstream_data, data[:upstream_data])
          data = wrap_data(upstream_data)
          @current_upstream = data
          if @no_wrap
            @send_queue << upstream_data
          else
            @send_queue << data
          end
        end
        sleep(@incoming_monitor_delay)
      end
    rescue => e
      puts "error #{e}"
    end
  end
end

#websocket_workerObject



67
68
69
70
71
72
73
# File 'lib/pressure.rb', line 67

def websocket_worker
  # logger.info data.inspect
  queued_data = JSON.generate(@send_queue.shift)
  @sockets.each do |socket|
    socket.send queued_data
  end
end

#websocket_worker_loopObject



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pressure.rb', line 75

def websocket_worker_loop
  Thread.new do
    begin
      loop do
        websocket_worker
        sleep(@websocket_worker_delay)
      end
    rescue => e
      puts "Worker error: #{e}"
      retry
    end
  end
end

#wrap_data(data) ⇒ Object



31
32
33
34
# File 'lib/pressure.rb', line 31

def wrap_data(data)
  @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)),
                          last_update_ts: Time.now.utc.to_i)
end