Class: Fluent::Plugin::Buffer::Chunk

Inherits:
Object
  • Object
show all
Includes:
UniqueId::Mixin, MonitorMixin
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Direct Known Subclasses

FileChunk, FileSingleChunk, MemoryChunk

Defined Under Namespace

Modules: Decompressable

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Constructor Details

#initialize(metadata, compress: :text) ⇒ Chunk

TODO: CompressedPackedMessage of forward protocol?



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51

def initialize(, compress: :text)
  super()
  @unique_id = generate_unique_id
  @metadata = 

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Fluent::Clock.real_now
  @modified_at = Fluent::Clock.real_now

  extend Decompressable if compress == :gzip
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



66
67
68
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66

def 
  @metadata
end

#stateObject (readonly)

Returns the value of attribute state.



66
67
68
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66

def state
  @state
end

#unique_idObject (readonly)

Returns the value of attribute unique_id.



66
67
68
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66

def unique_id
  @unique_id
end

Instance Method Details

#append(data, **kwargs) ⇒ Object

data is array of formatted record string

Raises:

  • (ArgumentError)


87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/buffer/chunk.rb', line 87

def append(data, **kwargs)
  raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
  adding = ''.b
  data.each do |d|
    adding << d.b
  end
  concat(adding, data.size)
end

#bytesizeObject

Raises:

  • (NotImplementedError)


109
110
111
# File 'lib/fluent/plugin/buffer/chunk.rb', line 109

def bytesize
  raise NotImplementedError, "Implement this method in child class"
end

#closeObject



157
158
159
160
# File 'lib/fluent/plugin/buffer/chunk.rb', line 157

def close
  @state = :closed
  self
end

#closed?Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/fluent/plugin/buffer/chunk.rb', line 138

def closed?
  @state == :closed
end

#commitObject

Raises:

  • (NotImplementedError)


101
102
103
# File 'lib/fluent/plugin/buffer/chunk.rb', line 101

def commit
  raise NotImplementedError, "Implement this method in child class"
end

#concat(bulk, records) ⇒ Object

for event streams which is packed or zipped (and we want not to unpack/uncompress)

Raises:

  • (NotImplementedError)


97
98
99
# File 'lib/fluent/plugin/buffer/chunk.rb', line 97

def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end

#created_atObject

for compatibility



77
78
79
# File 'lib/fluent/plugin/buffer/chunk.rb', line 77

def created_at
  @created_at_object ||= Time.at(@created_at)
end

#empty?Boolean

Returns:

  • (Boolean)


118
119
120
# File 'lib/fluent/plugin/buffer/chunk.rb', line 118

def empty?
  size == 0
end

#enqueued!Object



152
153
154
155
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152

def enqueued!
  @state = :queued
  self
end

#modified_atObject

for compatibility



82
83
84
# File 'lib/fluent/plugin/buffer/chunk.rb', line 82

def modified_at
  @modified_at_object ||= Time.at(@modified_at)
end

#open(**kwargs, &block) ⇒ Object

Raises:

  • (ArgumentError)


172
173
174
175
# File 'lib/fluent/plugin/buffer/chunk.rb', line 172

def open(**kwargs, &block)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end

#purgeObject



162
163
164
165
# File 'lib/fluent/plugin/buffer/chunk.rb', line 162

def purge
  @state = :closed
  self
end

#queued?Boolean

Returns:

  • (Boolean)


134
135
136
# File 'lib/fluent/plugin/buffer/chunk.rb', line 134

def queued?
  @state == :queued
end

#raw_create_atObject



68
69
70
# File 'lib/fluent/plugin/buffer/chunk.rb', line 68

def raw_create_at
  @created_at
end

#raw_modified_atObject



72
73
74
# File 'lib/fluent/plugin/buffer/chunk.rb', line 72

def raw_modified_at
  @modified_at
end

#read(**kwargs) ⇒ Object

Raises:

  • (ArgumentError)


167
168
169
170
# File 'lib/fluent/plugin/buffer/chunk.rb', line 167

def read(**kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end

#rollbackObject

Raises:

  • (NotImplementedError)


105
106
107
# File 'lib/fluent/plugin/buffer/chunk.rb', line 105

def rollback
  raise NotImplementedError, "Implement this method in child class"
end

#sizeObject Also known as: length

Raises:

  • (NotImplementedError)


113
114
115
# File 'lib/fluent/plugin/buffer/chunk.rb', line 113

def size
  raise NotImplementedError, "Implement this method in child class"
end

#staged!Object



142
143
144
145
# File 'lib/fluent/plugin/buffer/chunk.rb', line 142

def staged!
  @state = :staged
  self
end

#staged?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/fluent/plugin/buffer/chunk.rb', line 130

def staged?
  @state == :staged
end

#unstaged!Object



147
148
149
150
# File 'lib/fluent/plugin/buffer/chunk.rb', line 147

def unstaged!
  @state = :unstaged
  self
end

#unstaged?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/fluent/plugin/buffer/chunk.rb', line 126

def unstaged?
  @state == :unstaged
end

#writable?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/fluent/plugin/buffer/chunk.rb', line 122

def writable?
  @state == :staged || @state == :unstaged
end

#write_to(io, **kwargs) ⇒ Object

Raises:

  • (ArgumentError)


177
178
179
180
181
182
# File 'lib/fluent/plugin/buffer/chunk.rb', line 177

def write_to(io, **kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  open do |i|
    IO.copy_stream(i, io)
  end
end