Class: Pressure

Inherits:
Object
  • Object
show all
Defined in:
lib/pressure.rb,
lib/pressure/version.rb

Overview

Instances of the Pressure class can be used to read data from an upstream provider and broadcast it out to a set of downstream consumers.

Constant Summary collapse

DEFAULT_DELAY =

The default delay between loops of worker threads.

(1.0 / 20.0)
VERSION =

Current version of the Pressure gem.

'0.1.6'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) { ... } ⇒ Pressure

Create a new Pressure instance and start reading data.

Parameters:

  • options (Hash) (defaults to: {})

    Additional options.

Options Hash (options):

  • :no_wrap (Boolean)

    Data from upstream is wrapped in a JSON container by default, with some additional metadata. If this option is set to true, the data will be passed through without modification.

  • :read_worker_delay (Float)

    The amount of time, in seconds, to sleep between attempts to read data from upstream. Defaults to DEFAULT_DELAY.

  • :broadcast_worker_delay (Float)

    The amount of time, in seconds, to sleep between attempts to broadcast data to downstream sockets. Defaults to DEFAULT_DELAY.

Yields:

  • Block to call to read data from upstream.

Yield Returns:

  • Data to send downstream.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pressure.rb', line 44

def initialize(options = {}, &data_source_block)
  @threads = []
  @wrapper_template = {}
  @current_upstream = {}
  @send_queue = Queue.new
  @sockets = Hamster::Set.new
  @broadcast_worker_delay = (options[:broadcast_worker_delay] ||
                             options[:websocket_worker_delay] ||
                             DEFAULT_DELAY)
  @read_worker_delay = (options[:read_worker_delay] ||
                        options[:incoming_monitor_delay] ||
                        DEFAULT_DELAY)
  @no_wrap = options[:no_wrap] || false
  @running = false
  @data_source_block = data_source_block
  start unless options[:start] == false
end

Instance Attribute Details

#loggerLogger

Returns Messages are logged to this object.

Returns:

  • (Logger)

    Messages are logged to this object.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pressure.rb', line 21

class Pressure
  attr_writer :logger
  attr_reader :running
  attr_reader :sockets
  attr_accessor :wrapper_template

  # The default delay between loops of worker threads.
  DEFAULT_DELAY = (1.0 / 20.0)

  # Create a new Pressure instance and start reading data.
  #
  # @param options [Hash] Additional options.
  # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON
  #   container by default, with some additional metadata. If this option is
  #   set to true, the data will be passed through without modification.
  # @option options [Float] :read_worker_delay The amount of time, in seconds,
  #   to sleep between attempts to read data from upstream. Defaults to
  #   DEFAULT_DELAY.
  # @option options [Float] :broadcast_worker_delay The amount of time, in
  #   seconds, to sleep between attempts to broadcast data to downstream
  #   sockets. Defaults to DEFAULT_DELAY.
  # @yield Block to call to read data from upstream.
  # @yieldreturn Data to send downstream.
  def initialize(options = {}, &data_source_block)
    @threads = []
    @wrapper_template = {}
    @current_upstream = {}
    @send_queue = Queue.new
    @sockets = Hamster::Set.new
    @broadcast_worker_delay = (options[:broadcast_worker_delay] ||
                               options[:websocket_worker_delay] ||
                               DEFAULT_DELAY)
    @read_worker_delay = (options[:read_worker_delay] ||
                          options[:incoming_monitor_delay] ||
                          DEFAULT_DELAY)
    @no_wrap = options[:no_wrap] || false
    @running = false
    @data_source_block = data_source_block
    start unless options[:start] == false
  end

  def logger
    @logger ||= Logger.new(STDERR).tap do |logger|
      logger.progname = 'pressure'
      logger.level = Logger::WARN
    end
  end

  # Add a downstream socket to the Pressure instance. The latest upstream
  # data will be immediately sent to the socket.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send.
  def <<(socket)
    new_sockets = @sockets.add? socket
    if new_sockets != false
      @sockets = new_sockets
      socket.send JSON.generate(@current_upstream)
    end
  end

  # Remove a downstream socket from the Pressure instance.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send,
  #   as added using <<.
  # @return the deleted socket object, or nil.
  def delete(socket)
    new_sockets = @sockets.delete? socket
    if new_sockets != false
      @sockets = new_sockets
      socket
    else
      nil
    end
  end

  # Start the worker threads.
  def start
    stop
    @running = true
    @threads << broadcast_worker_loop
    @threads << read_worker_loop
  end

  # Stop the worker threads.
  def stop
    @running = false
    @threads.each do |thread|
      thread.kill(5)
      thread.join
    end
    @threads = []
  end

  # @return [Boolean] true if the workers are running.
  def running?
    @running
  end

  protected

  # Wraps upstream data in a container to add metadata to the object.
  #
  # @param data Upstream data.
  # @return [Hash] wrapped data.
  def wrap_data(data)
    @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)),
                            last_update_ts: Time.now.utc.to_i)
  end

  # Check whether two pieces of upstream data are different.
  #
  # @param current_data The most recent data read from upstream.
  # @param previous_data The last set of data read from upstream.
  # @return [Boolean] true if the data is different.
  def data_changed?(current_data, previous_data)
    if current_data && previous_data
      current_data != previous_data
    else
      true
    end
  end

  # Loop function for the thread reading data from the upstream provider.
  def read_worker_loop
    Thread.new do
      begin
        data = {}
        while @running
          upstream_data = @data_source_block.call
          if data_changed?(upstream_data, data[:upstream_data])
            data = wrap_data(upstream_data)
            @current_upstream = data
            if @no_wrap
              @send_queue << [upstream_data]
            else
              @send_queue << data
            end
          end
          sleep(@read_worker_delay)
        end
      rescue => e
        logger.error "Read worker error:"
        logger.error e
      end
    end
  end

  # Write data to downstream sockets.
  def broadcast
    queued_data = JSON.generate(@send_queue.shift)
    @sockets.each do |socket|
      socket.send queued_data
    end
  end

  # Loop function for the thread writing data to the downstream providers.
  def broadcast_worker_loop
    Thread.new do
      begin
        while @running
          broadcast
          sleep(@broadcast_worker_delay)
        end
      rescue => e
        logger.error "Broadcast worker error:"
        logger.error e
        retry
      end
    end
  end
end

#runningObject (readonly)

Returns the value of attribute running.



23
24
25
# File 'lib/pressure.rb', line 23

def running
  @running
end

#socketsHamster::Set<#send> (readonly)

Returns List of downstream sockets the instance is sending to. These objects don't necessarily need to be sockets, as long as they support a send method. Add objects to this list using the << method.

Returns:

  • (Hamster::Set<#send>)

    List of downstream sockets the instance is sending to. These objects don't necessarily need to be sockets, as long as they support a send method. Add objects to this list using the << method.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pressure.rb', line 21

class Pressure
  attr_writer :logger
  attr_reader :running
  attr_reader :sockets
  attr_accessor :wrapper_template

  # The default delay between loops of worker threads.
  DEFAULT_DELAY = (1.0 / 20.0)

  # Create a new Pressure instance and start reading data.
  #
  # @param options [Hash] Additional options.
  # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON
  #   container by default, with some additional metadata. If this option is
  #   set to true, the data will be passed through without modification.
  # @option options [Float] :read_worker_delay The amount of time, in seconds,
  #   to sleep between attempts to read data from upstream. Defaults to
  #   DEFAULT_DELAY.
  # @option options [Float] :broadcast_worker_delay The amount of time, in
  #   seconds, to sleep between attempts to broadcast data to downstream
  #   sockets. Defaults to DEFAULT_DELAY.
  # @yield Block to call to read data from upstream.
  # @yieldreturn Data to send downstream.
  def initialize(options = {}, &data_source_block)
    @threads = []
    @wrapper_template = {}
    @current_upstream = {}
    @send_queue = Queue.new
    @sockets = Hamster::Set.new
    @broadcast_worker_delay = (options[:broadcast_worker_delay] ||
                               options[:websocket_worker_delay] ||
                               DEFAULT_DELAY)
    @read_worker_delay = (options[:read_worker_delay] ||
                          options[:incoming_monitor_delay] ||
                          DEFAULT_DELAY)
    @no_wrap = options[:no_wrap] || false
    @running = false
    @data_source_block = data_source_block
    start unless options[:start] == false
  end

  def logger
    @logger ||= Logger.new(STDERR).tap do |logger|
      logger.progname = 'pressure'
      logger.level = Logger::WARN
    end
  end

  # Add a downstream socket to the Pressure instance. The latest upstream
  # data will be immediately sent to the socket.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send.
  def <<(socket)
    new_sockets = @sockets.add? socket
    if new_sockets != false
      @sockets = new_sockets
      socket.send JSON.generate(@current_upstream)
    end
  end

  # Remove a downstream socket from the Pressure instance.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send,
  #   as added using <<.
  # @return the deleted socket object, or nil.
  def delete(socket)
    new_sockets = @sockets.delete? socket
    if new_sockets != false
      @sockets = new_sockets
      socket
    else
      nil
    end
  end

  # Start the worker threads.
  def start
    stop
    @running = true
    @threads << broadcast_worker_loop
    @threads << read_worker_loop
  end

  # Stop the worker threads.
  def stop
    @running = false
    @threads.each do |thread|
      thread.kill(5)
      thread.join
    end
    @threads = []
  end

  # @return [Boolean] true if the workers are running.
  def running?
    @running
  end

  protected

  # Wraps upstream data in a container to add metadata to the object.
  #
  # @param data Upstream data.
  # @return [Hash] wrapped data.
  def wrap_data(data)
    @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)),
                            last_update_ts: Time.now.utc.to_i)
  end

  # Check whether two pieces of upstream data are different.
  #
  # @param current_data The most recent data read from upstream.
  # @param previous_data The last set of data read from upstream.
  # @return [Boolean] true if the data is different.
  def data_changed?(current_data, previous_data)
    if current_data && previous_data
      current_data != previous_data
    else
      true
    end
  end

  # Loop function for the thread reading data from the upstream provider.
  def read_worker_loop
    Thread.new do
      begin
        data = {}
        while @running
          upstream_data = @data_source_block.call
          if data_changed?(upstream_data, data[:upstream_data])
            data = wrap_data(upstream_data)
            @current_upstream = data
            if @no_wrap
              @send_queue << [upstream_data]
            else
              @send_queue << data
            end
          end
          sleep(@read_worker_delay)
        end
      rescue => e
        logger.error "Read worker error:"
        logger.error e
      end
    end
  end

  # Write data to downstream sockets.
  def broadcast
    queued_data = JSON.generate(@send_queue.shift)
    @sockets.each do |socket|
      socket.send queued_data
    end
  end

  # Loop function for the thread writing data to the downstream providers.
  def broadcast_worker_loop
    Thread.new do
      begin
        while @running
          broadcast
          sleep(@broadcast_worker_delay)
        end
      rescue => e
        logger.error "Broadcast worker error:"
        logger.error e
        retry
      end
    end
  end
end

#wrapper_templateHash

Returns The template used to wrap upstream data before sending it to downstream consumers. See wrap_data and the no_wrap option to initialize.

Returns:

  • (Hash)

    The template used to wrap upstream data before sending it to downstream consumers. See wrap_data and the no_wrap option to initialize.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pressure.rb', line 21

class Pressure
  attr_writer :logger
  attr_reader :running
  attr_reader :sockets
  attr_accessor :wrapper_template

  # The default delay between loops of worker threads.
  DEFAULT_DELAY = (1.0 / 20.0)

  # Create a new Pressure instance and start reading data.
  #
  # @param options [Hash] Additional options.
  # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON
  #   container by default, with some additional metadata. If this option is
  #   set to true, the data will be passed through without modification.
  # @option options [Float] :read_worker_delay The amount of time, in seconds,
  #   to sleep between attempts to read data from upstream. Defaults to
  #   DEFAULT_DELAY.
  # @option options [Float] :broadcast_worker_delay The amount of time, in
  #   seconds, to sleep between attempts to broadcast data to downstream
  #   sockets. Defaults to DEFAULT_DELAY.
  # @yield Block to call to read data from upstream.
  # @yieldreturn Data to send downstream.
  def initialize(options = {}, &data_source_block)
    @threads = []
    @wrapper_template = {}
    @current_upstream = {}
    @send_queue = Queue.new
    @sockets = Hamster::Set.new
    @broadcast_worker_delay = (options[:broadcast_worker_delay] ||
                               options[:websocket_worker_delay] ||
                               DEFAULT_DELAY)
    @read_worker_delay = (options[:read_worker_delay] ||
                          options[:incoming_monitor_delay] ||
                          DEFAULT_DELAY)
    @no_wrap = options[:no_wrap] || false
    @running = false
    @data_source_block = data_source_block
    start unless options[:start] == false
  end

  def logger
    @logger ||= Logger.new(STDERR).tap do |logger|
      logger.progname = 'pressure'
      logger.level = Logger::WARN
    end
  end

  # Add a downstream socket to the Pressure instance. The latest upstream
  # data will be immediately sent to the socket.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send.
  def <<(socket)
    new_sockets = @sockets.add? socket
    if new_sockets != false
      @sockets = new_sockets
      socket.send JSON.generate(@current_upstream)
    end
  end

  # Remove a downstream socket from the Pressure instance.
  #
  # @param socket [Socket, #send] A socket, or object that responds to #send,
  #   as added using <<.
  # @return the deleted socket object, or nil.
  def delete(socket)
    new_sockets = @sockets.delete? socket
    if new_sockets != false
      @sockets = new_sockets
      socket
    else
      nil
    end
  end

  # Start the worker threads.
  def start
    stop
    @running = true
    @threads << broadcast_worker_loop
    @threads << read_worker_loop
  end

  # Stop the worker threads.
  def stop
    @running = false
    @threads.each do |thread|
      thread.kill(5)
      thread.join
    end
    @threads = []
  end

  # @return [Boolean] true if the workers are running.
  def running?
    @running
  end

  protected

  # Wraps upstream data in a container to add metadata to the object.
  #
  # @param data Upstream data.
  # @return [Hash] wrapped data.
  def wrap_data(data)
    @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)),
                            last_update_ts: Time.now.utc.to_i)
  end

  # Check whether two pieces of upstream data are different.
  #
  # @param current_data The most recent data read from upstream.
  # @param previous_data The last set of data read from upstream.
  # @return [Boolean] true if the data is different.
  def data_changed?(current_data, previous_data)
    if current_data && previous_data
      current_data != previous_data
    else
      true
    end
  end

  # Loop function for the thread reading data from the upstream provider.
  def read_worker_loop
    Thread.new do
      begin
        data = {}
        while @running
          upstream_data = @data_source_block.call
          if data_changed?(upstream_data, data[:upstream_data])
            data = wrap_data(upstream_data)
            @current_upstream = data
            if @no_wrap
              @send_queue << [upstream_data]
            else
              @send_queue << data
            end
          end
          sleep(@read_worker_delay)
        end
      rescue => e
        logger.error "Read worker error:"
        logger.error e
      end
    end
  end

  # Write data to downstream sockets.
  def broadcast
    queued_data = JSON.generate(@send_queue.shift)
    @sockets.each do |socket|
      socket.send queued_data
    end
  end

  # Loop function for the thread writing data to the downstream providers.
  def broadcast_worker_loop
    Thread.new do
      begin
        while @running
          broadcast
          sleep(@broadcast_worker_delay)
        end
      rescue => e
        logger.error "Broadcast worker error:"
        logger.error e
        retry
      end
    end
  end
end

Instance Method Details

#<<(socket) ⇒ Object

Add a downstream socket to the Pressure instance. The latest upstream data will be immediately sent to the socket.

Parameters:

  • socket (Socket, #send)

    A socket, or object that responds to #send.



73
74
75
76
77
78
79
# File 'lib/pressure.rb', line 73

def <<(socket)
  new_sockets = @sockets.add? socket
  if new_sockets != false
    @sockets = new_sockets
    socket.send JSON.generate(@current_upstream)
  end
end

#delete(socket) ⇒ Object

Remove a downstream socket from the Pressure instance.

Parameters:

  • socket (Socket, #send)

    A socket, or object that responds to #send, as added using <<.

Returns:

  • the deleted socket object, or nil.



86
87
88
89
90
91
92
93
94
# File 'lib/pressure.rb', line 86

def delete(socket)
  new_sockets = @sockets.delete? socket
  if new_sockets != false
    @sockets = new_sockets
    socket
  else
    nil
  end
end

#running?Boolean

Returns true if the workers are running.

Returns:

  • (Boolean)

    true if the workers are running.



115
116
117
# File 'lib/pressure.rb', line 115

def running?
  @running
end

#startObject

Start the worker threads.



97
98
99
100
101
102
# File 'lib/pressure.rb', line 97

def start
  stop
  @running = true
  @threads << broadcast_worker_loop
  @threads << read_worker_loop
end

#stopObject

Stop the worker threads.



105
106
107
108
109
110
111
112
# File 'lib/pressure.rb', line 105

def stop
  @running = false
  @threads.each do |thread|
    thread.kill(5)
    thread.join
  end
  @threads = []
end