Class: Conveyor::Channel
- Inherits:
-
BaseChannel
- Object
- BaseChannel
- Conveyor::Channel
- Defined in:
- lib/conveyor/channel.rb
Overview
Channel
A basic channel.
Constant Summary
Constants inherited from BaseChannel
BaseChannel::BUCKET_SIZE, BaseChannel::NAME_PATTERN
Instance Method Summary collapse
-
#get_next ⇒ Object
Returns the next item from the global (non-group) iterator.
-
#get_next_by_group(group) ⇒ Object
Returns the next item for
group
. - #get_next_n(n) ⇒ Object
- #get_next_n_by_group(n, group) ⇒ Object
-
#initialize(directory) ⇒ Channel
constructor
If
directory
doesn’t already exist, it will be created during initialization. -
#post(data) ⇒ Object
Add data to the channel.
- #rewind(*opts) ⇒ Object
- #status ⇒ Object
Methods inherited from BaseChannel
#bucket_file, #commit, #get, #inspect, #parse_headers, #pick_bucket, valid_channel_name?
Constructor Details
#initialize(directory) ⇒ Channel
If directory
doesn’t already exist, it will be created during initialization.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/conveyor/channel.rb', line 10 def initialize directory @group_iterators = {} @group_iterators_files = {} super(directory) iterator_path = File.join(@directory, 'iterator') if File.exists?(iterator_path) && File.size(iterator_path) > 0 @iterator_file = File.open(iterator_path, 'r+') @iterator_file.each_line do |line| @iterator = line.to_i end @iterator_file.seek(0, IO::SEEK_END) else @iterator_file = File.open(iterator_path, 'a') end @iterator_file.sync = true Dir.glob(File.join(@directory, 'iterator-*')) do |i| g = i.split(%r{/}).last.match(%r{iterator-(.*)}).captures[0] @group_iterators_files[g] = File.open(i, 'r+') @group_iterators[g] = 1 @group_iterators_files[g].each_line do |line| @group_iterators[g] = line.to_i end @group_iterators_files[g].seek(0, IO::SEEK_END) end end |
Instance Method Details
#get_next ⇒ Object
Returns the next item from the global (non-group) iterator.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/conveyor/channel.rb', line 46 def get_next r = nil Thread.exclusive do if @iterator <= @last_id r = get(@iterator) @iterator += 1 @iterator_file.write("#{@iterator}\n") r else nil end end end |
#get_next_by_group(group) ⇒ Object
Returns the next item for group
. If group
hasn’t been seen before, the first item is returned.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/conveyor/channel.rb', line 61 def get_next_by_group group r = nil Thread.exclusive do @group_iterators[group] = 1 unless @group_iterators.key?(group) if @iterator <= @last_id r = get(@group_iterators[group]) @group_iterators[group] += 1 group_iterators_file(group) do |f| f.write("#{@group_iterators[group]}\n") end else nil end end r end |
#get_next_n(n) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/conveyor/channel.rb', line 78 def get_next_n n r = [] Thread.exclusive do while r.length < n && @iterator <= @last_id r << get(@iterator) @iterator += 1 @iterator_file.write("#{@iterator}\n") r end end r end |
#get_next_n_by_group(n, group) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/conveyor/channel.rb', line 91 def get_next_n_by_group n, group r = [] Thread.exclusive do @group_iterators[group] = 1 unless @group_iterators.key?(group) while r.length < n && @group_iterators[group] < @last_id r << get(@group_iterators[group]) @group_iterators[group] += 1 group_iterators_file(group) do |f| f.write("#{@group_iterators[group]}\n") end end end r end |
#post(data) ⇒ Object
Add data to the channel.
41 42 43 |
# File 'lib/conveyor/channel.rb', line 41 def post data commit data end |
#rewind(*opts) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/conveyor/channel.rb', line 118 def rewind *opts opts = opts.first if opts.key?(:id) if opts.key?(:group) Thread.exclusive do @group_iterators[opts[:group]] = opts[:id].to_i group_iterators_file(opts[:group]) do |f| f.write("#{@group_iterators[opts[:group]]}\n") end end else Thread.exclusive do @iterator = opts[:id].to_i @iterator_file.write("#{@iterator}\n") end end end end |
#status ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/conveyor/channel.rb', line 106 def status { :directory => @directory, :index => { :size => @index.length }, :data_files => @data_files.collect{|f| {:path => f.path, :bytes => File.size(f.path)}}, :iterator => {:position => @iterator}, :iterator_groups => @group_iterators.inject({}){|m,(k,v)| m[k] = v; m} } end |