Class: Fog::AWS::Kinesis::Mock
- Inherits:
-
Object
- Object
- Fog::AWS::Kinesis::Mock
- Defined in:
- lib/fog/aws/kinesis.rb,
lib/fog/aws/requests/kinesis/put_record.rb,
lib/fog/aws/requests/kinesis/get_records.rb,
lib/fog/aws/requests/kinesis/put_records.rb,
lib/fog/aws/requests/kinesis/split_shard.rb,
lib/fog/aws/requests/kinesis/list_streams.rb,
lib/fog/aws/requests/kinesis/merge_shards.rb,
lib/fog/aws/requests/kinesis/create_stream.rb,
lib/fog/aws/requests/kinesis/delete_stream.rb,
lib/fog/aws/requests/kinesis/describe_stream.rb,
lib/fog/aws/requests/kinesis/add_tags_to_stream.rb,
lib/fog/aws/requests/kinesis/get_shard_iterator.rb,
lib/fog/aws/requests/kinesis/list_tags_for_stream.rb,
lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb
Class Method Summary collapse
- .data ⇒ Object
- .mutex ⇒ Object
- .next_sequence_number ⇒ Object
- .next_shard_id ⇒ Object
- .reset ⇒ Object
Instance Method Summary collapse
- #add_tags_to_stream(options = {}) ⇒ Object
- #create_stream(options = {}) ⇒ Object
- #data ⇒ Object
- #delete_stream(options = {}) ⇒ Object
- #describe_stream(options = {}) ⇒ Object
- #get_records(options = {}) ⇒ Object
- #get_shard_iterator(options = {}) ⇒ Object
-
#initialize(options = {}) ⇒ Mock
constructor
A new instance of Mock.
- #list_streams(options = {}) ⇒ Object
- #list_tags_for_stream(options = {}) ⇒ Object
- #merge_shards(options = {}) ⇒ Object
- #mutex ⇒ Object
- #next_sequence_number ⇒ Object
- #next_shard_id ⇒ Object
- #put_record(options = {}) ⇒ Object
- #put_records(options = {}) ⇒ Object
- #remove_tags_from_stream(options = {}) ⇒ Object
- #reset_data ⇒ Object
- #split_shard(options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Mock
Returns a new instance of Mock.
147 148 149 150 151 152 153 |
# File 'lib/fog/aws/kinesis.rb', line 147 def initialize(={}) @account_id = Fog::AWS::Mock.owner_id @aws_access_key_id = [:aws_access_key_id] @region = [:region] || 'us-east-1' Fog::AWS.validate_region!(@region) end |
Class Method Details
.data ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/fog/aws/kinesis.rb', line 133 def self.data @data ||= Hash.new do |hash, region| hash[region] = Hash.new do |region_hash, key| region_hash[key] = { :kinesis_streams => {} } end end end |
.mutex ⇒ Object
128 129 130 |
# File 'lib/fog/aws/kinesis.rb', line 128 def self.mutex @mutex ||= Mutex.new end |
.next_sequence_number ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/fog/aws/kinesis.rb', line 163 def self.next_sequence_number mutex.synchronize do @sequence_number ||= -1 @sequence_number += 1 @sequence_number.to_s end end |
.next_shard_id ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/fog/aws/kinesis.rb', line 172 def self.next_shard_id mutex.synchronize do @shard_id ||= -1 @shard_id += 1 "shardId-#{@shard_id.to_s.rjust(12, "0")}" end end |
.reset ⇒ Object
143 144 145 |
# File 'lib/fog/aws/kinesis.rb', line 143 def self.reset @data = nil end |
Instance Method Details
#add_tags_to_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/add_tags_to_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("Tags") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].merge() response = Excon::Response.new response.status = 200 response.body = "" response end |
#create_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fog/aws/requests/kinesis/create_stream.rb', line 30 def create_stream(={}) stream_name = .delete("StreamName") shard_count = .delete("ShardCount") || 1 stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}" if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.") end shards = (0...shard_count).map do |shard| { "HashKeyRange"=>{ "EndingHashKey"=>"340282366920938463463374607431768211455", "StartingHashKey"=>"0" }, "SequenceNumberRange"=>{ "StartingSequenceNumber"=> next_sequence_number }, "ShardId"=>next_shard_id, "Records" => [] } end data[:kinesis_streams] = [{ "HasMoreShards" => false, "StreamARN" => stream_arn, "StreamName" => stream_name, "StreamStatus" => "ACTIVE", "Shards" => shards, "Tags" => {} }] response = Excon::Response.new response.status = 200 response.body = "" response end |
#data ⇒ Object
155 156 157 |
# File 'lib/fog/aws/kinesis.rb', line 155 def data self.class.data[@region][@aws_access_key_id] end |
#delete_stream(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fog/aws/requests/kinesis/delete_stream.rb', line 28 def delete_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end data[:kinesis_streams].delete(stream) response = Excon::Response.new response.status = 200 response.body = "" response end |
#describe_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fog/aws/requests/kinesis/describe_stream.rb', line 36 def describe_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end # Strip Records key out of shards for response shards = stream["Shards"].reject{ |k,_| k == "Records" } response = Excon::Response.new response.status = 200 response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) } response end |
#get_records(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fog/aws/requests/kinesis/get_records.rb', line 32 def get_records(={}) shard_iterator = Fog::JSON.decode(.delete("ShardIterator")) limit = .delete("Limit") || -1 stream_name = shard_iterator["StreamName"] shard_id = shard_iterator["ShardId"] starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end records = [] shard["Records"].each do |record| next if record["SequenceNumber"].to_i < starting_sequence_number records << record break if records.size == limit end shard_iterator["StartingSequenceNumber"] = if records.empty? starting_sequence_number.to_s else (records.last["SequenceNumber"].to_i + 1).to_s end response = Excon::Response.new response.status = 200 response.body = { "MillisBehindLatest"=> 0, "NextShardIterator"=> Fog::JSON.encode(shard_iterator), "Records"=> records } response end |
#get_shard_iterator(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fog/aws/requests/kinesis/get_shard_iterator.rb', line 36 def get_shard_iterator(={}) stream_name = ["StreamName"] unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "ShardIterator" => Fog::JSON.encode() # just encode the options that were given, we decode them in get_records } response end |
#list_streams(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/fog/aws/requests/kinesis/list_streams.rb', line 28 def list_streams(={}) response = Excon::Response.new response.status = 200 response.body = { "HasMoreStreams" => false, "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] } } response end |
#list_tags_for_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fog/aws/requests/kinesis/list_tags_for_stream.rb', line 36 def (={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "HasMoreTags" => false, "Tags" => stream["Tags"].map{ |k,v| {"Key" => k, "Value" => v} } } response end |
#merge_shards(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/merge_shards.rb', line 32 def merge_shards(={}) stream_name = .delete("StreamName") shard_to_merge_id = .delete("ShardToMerge") adjacent_shard_to_merge_id = .delete("AdjacentShardToMerge") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end # Close shards (set an EndingSequenceNumber on them) shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number new_starting_hash_key = [ shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i ].min.to_s new_ending_hash_key = [ shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i ].max.to_s # create a new shard with ParentShardId and AdjacentParentShardID stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => new_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_to_merge_id, "AdjacentParentShardId" => adjacent_shard_to_merge_id } response = Excon::Response.new response.status = 200 response.body = "" response end |
#mutex ⇒ Object
131 |
# File 'lib/fog/aws/kinesis.rb', line 131 def mutex; self.class.mutex; end |
#next_sequence_number ⇒ Object
170 |
# File 'lib/fog/aws/kinesis.rb', line 170 def next_sequence_number; self.class.next_sequence_number; end |
#next_shard_id ⇒ Object
179 |
# File 'lib/fog/aws/kinesis.rb', line 179 def next_shard_id; self.class.next_shard_id; end |
#put_record(options = {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fog/aws/requests/kinesis/put_record.rb', line 38 def put_record(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end sequence_number = next_sequence_number data = .delete("Data") partition_key = .delete("PartitionKey") sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample shard_id = stream["Shards"].send(sample_method)["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << { "SequenceNumber" => sequence_number, "Data" => data, "PartitionKey" => partition_key } response = Excon::Response.new response.status = 200 response.body = { "SequenceNumber" => sequence_number, "ShardId" => shard_id } response end |
#put_records(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fog/aws/requests/kinesis/put_records.rb', line 36 def put_records(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end records = .delete("Records") record_results = records.map { |r| sequence_number = next_sequence_number sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample shard_id = stream["Shards"].send(sample_method)["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << r.merge("SequenceNumber" => sequence_number) { "SequenceNumber" => sequence_number, "ShardId" => shard_id } } response = Excon::Response.new response.status = 200 response.body = { "FailedRecordCount" => 0, "Records" => record_results } response end |
#remove_tags_from_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("TagKeys") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].delete_if { |k,_| .include?(k) } response = Excon::Response.new response.status = 200 response.body = "" response end |
#reset_data ⇒ Object
159 160 161 |
# File 'lib/fog/aws/kinesis.rb', line 159 def reset_data self.class.data[@region].delete(@aws_access_key_id) end |
#split_shard(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/split_shard.rb', line 32 def split_shard(={}) stream_name = .delete("StreamName") shard_id = .delete("ShardToSplit") stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end # Close original shard (set an EndingSequenceNumber on it) shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number # Calculate new shard ranges parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"] parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] new_starting_hash_key = .delete("NewStartingHashKey") # Create two new shards using contiguous hash space based on the original shard stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s, "StartingHashKey" => parent_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } stream["Shards"] << { "HashKeyRange" => { "EndingHashKey" => parent_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" =>{ "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } response = Excon::Response.new response.status = 200 response.body = "" response end |