Class: Pressure
- Inherits:
-
Object
- Object
- Pressure
- Defined in:
- lib/pressure.rb,
lib/pressure/version.rb
Overview
Instances of the Pressure class can be used to read data from an upstream provider and broadcast it out to a set of downstream consumers.
Constant Summary collapse
- DEFAULT_DELAY =
The default delay between loops of worker threads.
(1.0 / 20.0)
- VERSION =
Current version of the Pressure gem.
'0.1.6'
Instance Attribute Summary collapse
-
#logger ⇒ Logger
Messages are logged to this object.
-
#running ⇒ Object
readonly
Returns the value of attribute running.
-
#sockets ⇒ Hamster::Set<#send>
readonly
List of downstream sockets the instance is sending to.
-
#wrapper_template ⇒ Hash
The template used to wrap upstream data before sending it to downstream consumers.
Instance Method Summary collapse
-
#<<(socket) ⇒ Object
Add a downstream socket to the Pressure instance.
-
#delete(socket) ⇒ Object
Remove a downstream socket from the Pressure instance.
-
#initialize(options = {}) { ... } ⇒ Pressure
constructor
Create a new Pressure instance and start reading data.
-
#running? ⇒ Boolean
True if the workers are running.
-
#start ⇒ Object
Start the worker threads.
-
#stop ⇒ Object
Stop the worker threads.
Constructor Details
#initialize(options = {}) { ... } ⇒ Pressure
Create a new Pressure instance and start reading data.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pressure.rb', line 44 def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end |
Instance Attribute Details
#logger ⇒ Logger
Returns Messages are logged to this object.
|
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
#running ⇒ Object (readonly)
Returns the value of attribute running.
23 24 25 |
# File 'lib/pressure.rb', line 23 def running @running end |
#sockets ⇒ Hamster::Set<#send> (readonly)
Returns List of downstream sockets the instance is sending to. These objects don't necessarily need to be sockets, as long as they support a send method. Add objects to this list using the << method.
|
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
#wrapper_template ⇒ Hash
Returns The template used to wrap upstream data before sending it to downstream consumers. See wrap_data and the no_wrap option to initialize.
|
# File 'lib/pressure.rb', line 21 class Pressure attr_writer :logger attr_reader :running attr_reader :sockets attr_accessor :wrapper_template # The default delay between loops of worker threads. DEFAULT_DELAY = (1.0 / 20.0) # Create a new Pressure instance and start reading data. # # @param options [Hash] Additional options. # @option options [Boolean] :no_wrap Data from upstream is wrapped in a JSON # container by default, with some additional metadata. If this option is # set to true, the data will be passed through without modification. # @option options [Float] :read_worker_delay The amount of time, in seconds, # to sleep between attempts to read data from upstream. Defaults to # DEFAULT_DELAY. # @option options [Float] :broadcast_worker_delay The amount of time, in # seconds, to sleep between attempts to broadcast data to downstream # sockets. Defaults to DEFAULT_DELAY. # @yield Block to call to read data from upstream. # @yieldreturn Data to send downstream. def initialize( = {}, &data_source_block) @threads = [] @wrapper_template = {} @current_upstream = {} @send_queue = Queue.new @sockets = Hamster::Set.new @broadcast_worker_delay = ([:broadcast_worker_delay] || [:websocket_worker_delay] || DEFAULT_DELAY) @read_worker_delay = ([:read_worker_delay] || [:incoming_monitor_delay] || DEFAULT_DELAY) @no_wrap = [:no_wrap] || false @running = false @data_source_block = data_source_block start unless [:start] == false end def logger @logger ||= Logger.new(STDERR).tap do |logger| logger.progname = 'pressure' logger.level = Logger::WARN end end # Add a downstream socket to the Pressure instance. The latest upstream # data will be immediately sent to the socket. # # @param socket [Socket, #send] A socket, or object that responds to #send. def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end # Remove a downstream socket from the Pressure instance. # # @param socket [Socket, #send] A socket, or object that responds to #send, # as added using <<. # @return the deleted socket object, or nil. def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end # Start the worker threads. def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end # Stop the worker threads. def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end # @return [Boolean] true if the workers are running. def running? @running end protected # Wraps upstream data in a container to add metadata to the object. # # @param data Upstream data. # @return [Hash] wrapped data. def wrap_data(data) @wrapper_template.merge(upstream_data: Marshal.load(Marshal.dump(data)), last_update_ts: Time.now.utc.to_i) end # Check whether two pieces of upstream data are different. # # @param current_data The most recent data read from upstream. # @param previous_data The last set of data read from upstream. # @return [Boolean] true if the data is different. def data_changed?(current_data, previous_data) if current_data && previous_data current_data != previous_data else true end end # Loop function for the thread reading data from the upstream provider. def read_worker_loop Thread.new do begin data = {} while @running upstream_data = @data_source_block.call if data_changed?(upstream_data, data[:upstream_data]) data = wrap_data(upstream_data) @current_upstream = data if @no_wrap @send_queue << [upstream_data] else @send_queue << data end end sleep(@read_worker_delay) end rescue => e logger.error "Read worker error:" logger.error e end end end # Write data to downstream sockets. def broadcast queued_data = JSON.generate(@send_queue.shift) @sockets.each do |socket| socket.send queued_data end end # Loop function for the thread writing data to the downstream providers. def broadcast_worker_loop Thread.new do begin while @running broadcast sleep(@broadcast_worker_delay) end rescue => e logger.error "Broadcast worker error:" logger.error e retry end end end end |
Instance Method Details
#<<(socket) ⇒ Object
Add a downstream socket to the Pressure instance. The latest upstream data will be immediately sent to the socket.
73 74 75 76 77 78 79 |
# File 'lib/pressure.rb', line 73 def <<(socket) new_sockets = @sockets.add? socket if new_sockets != false @sockets = new_sockets socket.send JSON.generate(@current_upstream) end end |
#delete(socket) ⇒ Object
Remove a downstream socket from the Pressure instance.
86 87 88 89 90 91 92 93 94 |
# File 'lib/pressure.rb', line 86 def delete(socket) new_sockets = @sockets.delete? socket if new_sockets != false @sockets = new_sockets socket else nil end end |
#running? ⇒ Boolean
Returns true if the workers are running.
115 116 117 |
# File 'lib/pressure.rb', line 115 def running? @running end |
#start ⇒ Object
Start the worker threads.
97 98 99 100 101 102 |
# File 'lib/pressure.rb', line 97 def start stop @running = true @threads << broadcast_worker_loop @threads << read_worker_loop end |
#stop ⇒ Object
Stop the worker threads.
105 106 107 108 109 110 111 112 |
# File 'lib/pressure.rb', line 105 def stop @running = false @threads.each do |thread| thread.kill(5) thread.join end @threads = [] end |