Class: Pants::Readers::BaseReader
- Inherits:
-
Object
- Object
- Pants::Readers::BaseReader
- Includes:
- LogSwitch::Mixin
- Defined in:
- lib/pants/readers/base_reader.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#core_stopper_callback ⇒ EventMachine::Callback
readonly
The callback from Core that should be called when the Reader is done reading.
-
#write_to_channel ⇒ EventMachine::Channel
readonly
The channel that Writers should subscribe to.
-
#writers ⇒ Array
readonly
The list of Writers attached to the Reader.
Instance Method Summary collapse
-
#add_seam(klass, *args) ⇒ Pants::Seam
Allows for adding a Pants::Seam (or child) object to the reader’s list of internal writers.
-
#add_writer(obj, *args) ⇒ Object
One method of adding a Writer to the Reader.
-
#initialize(core_stopper_callback) ⇒ BaseReader
constructor
A new instance of BaseReader.
-
#read_object ⇒ String
Allows for adding “about me” info, depending on the reader type.
-
#remove_writer(obj, key_value_pairs = nil) ⇒ Object
Removes a writer object from the internal list of writers.
- #running? ⇒ Boolean
-
#start(callback) ⇒ Object
Starts all of the writers, then starts the reader.
-
#stop! ⇒ Object
Calls the reader’s #stopper, thus forcing the reader to shutdown.
-
#write_to(uri) ⇒ Pants::Writers::BaseWriter
The newly created writer.
Constructor Details
#initialize(core_stopper_callback) ⇒ BaseReader
Returns a new instance of BaseReader.
27 28 29 30 31 32 33 34 35 |
# File 'lib/pants/readers/base_reader.rb', line 27 def initialize(core_stopper_callback) @writers = [] @write_to_channel = EM::Channel.new @core_stopper_callback = core_stopper_callback @read_object ||= nil @starter = nil @stopper = nil @running = false end |
Instance Attribute Details
#core_stopper_callback ⇒ EventMachine::Callback (readonly)
Returns The callback from Core that should be called when the Reader is done reading.
18 19 20 |
# File 'lib/pants/readers/base_reader.rb', line 18 def core_stopper_callback @core_stopper_callback end |
#write_to_channel ⇒ EventMachine::Channel (readonly)
Returns The channel that Writers should subscribe to.
14 15 16 |
# File 'lib/pants/readers/base_reader.rb', line 14 def write_to_channel @write_to_channel end |
#writers ⇒ Array (readonly)
Returns The list of Writers attached to the Reader.
10 11 12 |
# File 'lib/pants/readers/base_reader.rb', line 10 def writers @writers end |
Instance Method Details
#add_seam(klass, *args) ⇒ Pants::Seam
Allows for adding a Pants::Seam (or child) object to the reader’s list of internal writers. For more info on Seams, see the docs for Pants::Seam.
205 206 207 208 209 |
# File 'lib/pants/readers/base_reader.rb', line 205 def add_seam(klass, *args) @writers << klass.new(@core_stopper_callback, @write_to_channel, *args) @writers.last end |
#add_writer(obj, *args) ⇒ Object
One method of adding a Writer to the Reader. Use this method to add an a) already instantiated Writer object, or b) a Writers from a class of Writer objects.
Notice how using the last method requires you to pass in the channel that the reader is pushing data to–this is probably one reason for avoiding this method of adding a writer, yet remains available for flexibility.
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/pants/readers/base_reader.rb', line 130 def add_writer(obj, *args) if obj.is_a? Class @writers << obj.new(*args, @write_to_channel) elsif obj.kind_of? Pants::Writers::BaseWriter @writers << obj else raise Pants::Error, "Don't know how to add a writer of type #{obj}" end @writers.last end |
#read_object ⇒ String
Allows for adding “about me” info, depending on the reader type. This info is printed out when Pants starts, so you know get confirmation of what you’re about to do. If you don’t define this in your reader, nothing will be printed out.
76 77 78 79 80 81 82 |
# File 'lib/pants/readers/base_reader.rb', line 76 def read_object if @read_object @read_object else warn "No read_object info has been defined for this reader." end end |
#remove_writer(obj, key_value_pairs = nil) ⇒ Object
Removes a writer object from the internal list of writers.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/pants/readers/base_reader.rb', line 159 def remove_writer(obj, key_value_pairs=nil) if obj.is_a? Class @writers.delete_if do |writer| writer.is_a?(obj) && key_value_pairs.all? { |k, v| writer.send(k) == v } end elsif obj.is_a? String writer = begin uri = obj.is_a?(URI) ? obj : URI(obj) rescue URI::InvalidURIError find_writer_from_uri(nil) else find_writer_from_uri(uri) end unless writer raise ArgumentError, "No writer found wth URI scheme: #{uri.scheme}" end key_value_pairs = if writer[:args] writer[:args].inject({}) do |result, arg| result[arg] = uri.send(arg) result end else {} end @writers.delete_if do |w| w.is_a?(writer[:klass]) && key_value_pairs.all? { |k, v| w.send(k) == v } end end end |
#running? ⇒ Boolean
85 86 87 |
# File 'lib/pants/readers/base_reader.rb', line 85 def running? @running end |
#start(callback) ⇒ Object
Starts all of the writers, then starts the reader. Child readers must call this to make sure the writers are all running and ready for data before the reader starts pushing data onto its Channel.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pants/readers/base_reader.rb', line 45 def start(callback) start_loop = EM.tick_loop do if @writers.empty? || @writers.all?(&:running?) :stop end end start_loop.on_stop { callback.call } log "Starting writers for reader #{self.__id__}..." EM::Iterator.new(@writers).each do |writer, iter| writer.start iter.next end end |
#stop! ⇒ Object
Calls the reader’s #stopper, thus forcing the reader to shutdown. For readers that intend to read a finite amount of data, the Reader should call the #stopper when it’s done; for readers that read a non-stop stream (i.e. like an open socket), this gets called by OS signals (i.e. if you ctrl-c).
65 66 67 |
# File 'lib/pants/readers/base_reader.rb', line 65 def stop! stopper.call end |
#write_to(uri) ⇒ Pants::Writers::BaseWriter
Returns The newly created writer.
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/pants/readers/base_reader.rb', line 93 def write_to(uri) begin uri = uri.is_a?(URI) ? uri : URI(uri) rescue URI::InvalidURIError @writers << new_writer_from_uri(nil, @write_to_channel) else @writers << new_writer_from_uri(uri, @write_to_channel) end @writers.last end |