Class: Pressure
- Inherits:
-
Object
- Object
- Pressure
- Defined in:
- lib/pressure.rb,
lib/pressure/version.rb
Overview
Instances of the Pressure class can be used to read data from an upstream provider and broadcast it out to a set of downstream consumers.
Constant Summary collapse
- DEFAULT_DELAY =
The default delay between loops of worker threads.
(1.0 / 20.0)
- VERSION =
Current version of the Pressure gem.
'0.1.6'
Instance Attribute Summary collapse
-
#logger ⇒ Logger
Messages are logged to this object.
-
#running ⇒ Object
readonly
Returns the value of attribute running.
-
#sockets ⇒ Hamster::Set<#send>
readonly
List of downstream sockets the instance is sending to.
-
#wrapper_template ⇒ Hash
The template used to wrap upstream data before sending it to downstream consumers.
Instance Method Summary collapse
-
#<<(socket) ⇒ Object
Add a downstream socket to the Pressure instance.
-
#delete(socket) ⇒ Object
Remove a downstream socket from the Pressure instance.
-
#initialize(options = {}) { ... } ⇒ Pressure
constructor
Create a new Pressure instance and start reading data.
-
#running? ⇒ Boolean
True if the workers are running.
-
#start ⇒ Object
Start the worker threads.
-
#stop ⇒ Object
Stop the worker threads.
Constructor Details
#initialize(options = {}) { ... } ⇒ Pressure
Create a new Pressure instance and start reading data.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pressure.rb', line 44 def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end |
Instance Attribute Details
#logger ⇒ Logger
Returns Messages are logged to this object.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
#running ⇒ Object (readonly)
Returns the value of attribute running.
23 24 25 |
# File 'lib/pressure.rb', line 23 def running @running end |
#sockets ⇒ Hamster::Set<#send> (readonly)
Returns List of downstream sockets the instance is sending to. These objects don't necessarily need to be sockets, as long as they support a send method. Add objects to this list using the << method.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
#wrapper_template ⇒ Hash
Returns The template used to wrap upstream data before sending it to downstream consumers. See wrap_data and the no_wrap option to initialize.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
Instance Method Details
#<<(socket) ⇒ Object
Add a downstream socket to the Pressure instance. The latest upstream data will be immediately sent to the socket.
73 74 75 76 77 78 79 |
# File 'lib/pressure.rb', line 73 def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end |
#delete(socket) ⇒ Object
Remove a downstream socket from the Pressure instance.
86 87 88 89 90 91 92 93 94 |
# File 'lib/pressure.rb', line 86 def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end |
#running? ⇒ Boolean
Returns true if the workers are running.
115 116 117 |
# File 'lib/pressure.rb', line 115 def running? @running end |
#start ⇒ Object
Start the worker threads.
97 98 99 100 101 102 |
# File 'lib/pressure.rb', line 97 def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end |
#stop ⇒ Object
Stop the worker threads.
105 106 107 108 109 110 111 112 |
# File 'lib/pressure.rb', line 105 def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end |