Class: Pressure

Inherits:
Object
  • Object
show all
Defined in:
lib/pressure.rb,
lib/pressure/version.rb

Constant Summary collapse

VERSION =
'0.1.1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &data_source_block) ⇒ Pressure

Returns a new instance of Pressure.



11
12
13
14
15
16
17
18
19
20
# File 'lib/pressure.rb', line 11

def initialize(options = {}, &data_source_block)
  @wrapper_template = {}
  @current_upstream = {}
  @send_queue = Queue.new
  @options = options
  @sockets = []
  @websocket_worker_delay = options[:websocket_worker_delay] || (1.0 / 20.0)
  incoming_monitor(&data_source_block)
  websocket_worker_loop
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



9
10
11
# File 'lib/pressure.rb', line 9

def options
  @options
end

#socketsObject (readonly)

Returns the value of attribute sockets.



8
9
10
# File 'lib/pressure.rb', line 8

def sockets
  @sockets
end

#wrapper_templateObject

Returns the value of attribute wrapper_template.



7
8
9
# File 'lib/pressure.rb', line 7

def wrapper_template
  @wrapper_template
end

Instance Method Details

#<<(socket) ⇒ Object



22
23
24
# File 'lib/pressure.rb', line 22

def <<(socket)
  @sockets << socket
end

#data_changed?(current_data, previous_data) ⇒ Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/pressure.rb', line 35

def data_changed?(current_data, previous_data)
  current_data != previous_data
end

#delete(socket) ⇒ Object



26
27
28
# File 'lib/pressure.rb', line 26

def delete(socket)
  @sockets.delete socket
end

#incoming_monitor(&data_source_block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/pressure.rb', line 39

def incoming_monitor(&data_source_block)
  Thread.new do
    begin
      data = {}
      loop do
        upstream_data = data_source_block.call
        if data_changed?(data[:upstream_data], upstream_data)
          data = wrap_data(upstream_data)
          if @options[:no_wrap]
            @send_queue << upstream_data
          else
            @send_queue << data
          end
        end
        sleep(1.0 / 20.0)
      end
    rescue => e
      puts "error #{e}"
    end
  end
end

#websocket_workerObject



61
62
63
64
65
66
67
# File 'lib/pressure.rb', line 61

def websocket_worker
  # logger.info data.inspect
  queued_data = JSON.generate(@send_queue.shift)
  @sockets.each do |socket|
    socket.send queued_data
  end
end

#websocket_worker_loopObject



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/pressure.rb', line 69

def websocket_worker_loop
  Thread.new do
    begin
      loop do
        websocket_worker
        sleep(@websocket_worker_delay)
      end
    rescue => e
      puts "Worker error: #{e}"
      retry
    end
  end
end

#wrap_data(data) ⇒ Object



30
31
32
33
# File 'lib/pressure.rb', line 30

def wrap_data(data)
  @wrapper_template.merge(upstream_data: data.clone,
                          last_update_ts: Time.now.utc.to_i)
end