Class: Pants::Seam
- Inherits:
-
Readers::BaseReader
- Object
- Readers::BaseReader
- Pants::Seam
- Includes:
- LogSwitch::Mixin
- Defined in:
- lib/pants/seam.rb
Overview
A Seam is a core Pants object type (like Readers and Writers) that lets you attach to a Reader, work with the read data, and pass it on to attached Writers. It implements buffering by using EventMachine Queues: pop data off the @read_queue, work with it, then push it onto the @write_queue. Once on the @write_queue, the Seam will pass on to all Writers that have been added to it.
The @read_queue is wrapped by #read_items, which yields data chunks from the Reader in, allowing easy access to each bit of data as it was when it was read in. The @write_queue is wrapped by #write, which lets you just give it the data you want to pass on to the attached Writers.
Seams are particularly useful for working with network data, where if you’re redirecting traffic from one place to another, you may need to alter data in those packets to make it useful to the receiving ends.
Instance Attribute Summary
Attributes inherited from Readers::BaseReader
#core_stopper_callback, #write_to_channel, #writers
Instance Method Summary collapse
-
#initialize(core_stopper_callback, reader_channel) ⇒ Seam
constructor
A new instance of Seam.
-
#read_items(&block) {|item| ... } ⇒ Object
Call this to read data that was put into the read queue.
- #start(callback) ⇒ Object
-
#stop ⇒ Object
Make sure you call this (with super()) in your child to ensure read and write queues are flushed.
-
#write(data) ⇒ Object
Call this after your Seam child has processed data and is ready to send it to its writers.
-
#write_object ⇒ String
A String that identifies what the writer is writing to.
Methods inherited from Readers::BaseReader
#add_seam, #add_writer, #read_object, #remove_writer, #running?, #stop!, #write_to
Constructor Details
#initialize(core_stopper_callback, reader_channel) ⇒ Seam
Returns a new instance of Seam.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/pants/seam.rb', line 29 def initialize(core_stopper_callback, reader_channel) @read_queue = EM::Queue.new @write_queue = EM::Queue.new @write_object ||= nil @receives = 0 @reads = 0 @writes = 0 @sends = 0 reader_channel.subscribe do |data| log "Got data on reader channel" @read_queue << data @receives += data.size end super(core_stopper_callback) send_data end |
Instance Method Details
#read_items(&block) {|item| ... } ⇒ Object
Call this to read data that was put into the read queue. It yields one “item” (however the data was put onto the queue) at a time. It will continually yield as there is data that comes in on the queue.
89 90 91 92 93 94 95 96 97 |
# File 'lib/pants/seam.rb', line 89 def read_items(&block) processor = proc do |item| block.call(item) @reads += item.size @read_queue.pop(&processor) end @read_queue.pop(&processor) end |
#start(callback) ⇒ Object
49 50 51 52 53 |
# File 'lib/pants/seam.rb', line 49 def start(callback) super(callback) starter.call end |
#stop ⇒ Object
Make sure you call this (with super()) in your child to ensure read and write queues are flushed.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pants/seam.rb', line 57 def stop log "Stopping..." log "receives #{@receives}" log "reads #{@reads}" log "writes #{@writes}" log "sends #{@sends}" finish_loop = EM.tick_loop do if @read_queue.empty? && @write_queue.empty? :stop end end finish_loop.on_stop { stopper.call } end |
#write(data) ⇒ Object
Call this after your Seam child has processed data and is ready to send it to its writers.
103 104 105 106 |
# File 'lib/pants/seam.rb', line 103 def write(data) @write_queue << data @writes += data.size end |
#write_object ⇒ String
Returns A String that identifies what the writer is writing to. This is simply used for displaying info to the user.
75 76 77 78 79 80 81 |
# File 'lib/pants/seam.rb', line 75 def write_object if @write_object @write_object else warn "No write_object info has been defined for this writer." end end |