Class: Fluent::Plugin::Buffer::Chunk

Inherits:
Object
  • Object
show all
Includes:
UniqueId::Mixin, MonitorMixin
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Direct Known Subclasses

FileChunk, FileSingleChunk, MemoryChunk

Defined Under Namespace

Modules: GzipDecompressable, ZstdDecompressable

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Constructor Details

#initialize(metadata, compress: :text) ⇒ Chunk

TODO: CompressedPackedMessage of forward protocol?



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51

def initialize(, compress: :text)
  super()
  @unique_id = generate_unique_id
  @metadata = 

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Fluent::Clock.real_now
  @modified_at = Fluent::Clock.real_now
  if compress == :gzip
    extend GzipDecompressable
  elsif compress == :zstd
    extend ZstdDecompressable
  end
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



69
70
71
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69

def 
  @metadata
end

#stateObject (readonly)

Returns the value of attribute state.



69
70
71
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69

def state
  @state
end

#unique_idObject (readonly)

Returns the value of attribute unique_id.



69
70
71
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69

def unique_id
  @unique_id
end

Instance Method Details

#append(data, **kwargs) ⇒ Object

data is array of formatted record string

Raises:

  • (ArgumentError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/buffer/chunk.rb', line 90

def append(data, **kwargs)
  raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
  begin
    adding = data.join.force_encoding(Encoding::ASCII_8BIT)
  rescue
    # Fallback
    # Array#join throws an exception if data contains strings with a different encoding.
    # Although such cases may be rare, it should be considered as a safety precaution.
    adding = ''.force_encoding(Encoding::ASCII_8BIT)
    data.each do |d|
      adding << d.b
    end
  end
  concat(adding, data.size)
end

#bytesizeObject

Raises:

  • (NotImplementedError)


119
120
121
# File 'lib/fluent/plugin/buffer/chunk.rb', line 119

def bytesize
  raise NotImplementedError, "Implement this method in child class"
end

#closeObject



167
168
169
170
# File 'lib/fluent/plugin/buffer/chunk.rb', line 167

def close
  @state = :closed
  self
end

#closed?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/fluent/plugin/buffer/chunk.rb', line 148

def closed?
  @state == :closed
end

#commitObject

Raises:

  • (NotImplementedError)


111
112
113
# File 'lib/fluent/plugin/buffer/chunk.rb', line 111

def commit
  raise NotImplementedError, "Implement this method in child class"
end

#concat(bulk, records) ⇒ Object

for event streams which is packed or zipped (and we want not to unpack/uncompress)

Raises:

  • (NotImplementedError)


107
108
109
# File 'lib/fluent/plugin/buffer/chunk.rb', line 107

def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end

#created_atObject

for compatibility



80
81
82
# File 'lib/fluent/plugin/buffer/chunk.rb', line 80

def created_at
  @created_at_object ||= Time.at(@created_at)
end

#empty?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/fluent/plugin/buffer/chunk.rb', line 128

def empty?
  size == 0
end

#enqueued!Object



162
163
164
165
# File 'lib/fluent/plugin/buffer/chunk.rb', line 162

def enqueued!
  @state = :queued
  self
end

#modified_atObject

for compatibility



85
86
87
# File 'lib/fluent/plugin/buffer/chunk.rb', line 85

def modified_at
  @modified_at_object ||= Time.at(@modified_at)
end

#open(**kwargs, &block) ⇒ Object

Raises:

  • (ArgumentError)


182
183
184
185
# File 'lib/fluent/plugin/buffer/chunk.rb', line 182

def open(**kwargs, &block)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  raise NotImplementedError, "Implement this method in child class"
end

#purgeObject



172
173
174
175
# File 'lib/fluent/plugin/buffer/chunk.rb', line 172

def purge
  @state = :closed
  self
end

#queued?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/fluent/plugin/buffer/chunk.rb', line 144

def queued?
  @state == :queued
end

#raw_create_atObject



71
72
73
# File 'lib/fluent/plugin/buffer/chunk.rb', line 71

def raw_create_at
  @created_at
end

#raw_modified_atObject



75
76
77
# File 'lib/fluent/plugin/buffer/chunk.rb', line 75

def raw_modified_at
  @modified_at
end

#read(**kwargs) ⇒ Object

Raises:

  • (ArgumentError)


177
178
179
180
# File 'lib/fluent/plugin/buffer/chunk.rb', line 177

def read(**kwargs)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  raise NotImplementedError, "Implement this method in child class"
end

#rollbackObject

Raises:

  • (NotImplementedError)


115
116
117
# File 'lib/fluent/plugin/buffer/chunk.rb', line 115

def rollback
  raise NotImplementedError, "Implement this method in child class"
end

#sizeObject Also known as: length

Raises:

  • (NotImplementedError)


123
124
125
# File 'lib/fluent/plugin/buffer/chunk.rb', line 123

def size
  raise NotImplementedError, "Implement this method in child class"
end

#staged!Object



152
153
154
155
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152

def staged!
  @state = :staged
  self
end

#staged?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/fluent/plugin/buffer/chunk.rb', line 140

def staged?
  @state == :staged
end

#unstaged!Object



157
158
159
160
# File 'lib/fluent/plugin/buffer/chunk.rb', line 157

def unstaged!
  @state = :unstaged
  self
end

#unstaged?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/fluent/plugin/buffer/chunk.rb', line 136

def unstaged?
  @state == :unstaged
end

#writable?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/fluent/plugin/buffer/chunk.rb', line 132

def writable?
  @state == :staged || @state == :unstaged
end

#write_to(io, **kwargs) ⇒ Object

Raises:

  • (ArgumentError)


187
188
189
190
191
192
# File 'lib/fluent/plugin/buffer/chunk.rb', line 187

def write_to(io, **kwargs)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  open do |i|
    IO.copy_stream(i, io)
  end
end