Class: Fluent::Plugin::Buffer::Chunk
- Inherits:
-
Object
- Object
- Fluent::Plugin::Buffer::Chunk
show all
- Includes:
- UniqueId::Mixin, MonitorMixin
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Defined Under Namespace
Modules: GzipDecompressable, ZstdDecompressable
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata, compress: :text) ⇒ Chunk
TODO: CompressedPackedMessage of forward protocol?
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51
def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
@state = :unstaged
@size = 0
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now
if compress == :gzip
extend GzipDecompressable
elsif compress == :zstd
extend ZstdDecompressable
end
end
|
Instance Attribute Details
Returns the value of attribute metadata.
69
70
71
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69
def metadata
@metadata
end
|
#state ⇒ Object
Returns the value of attribute state.
69
70
71
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69
def state
@state
end
|
#unique_id ⇒ Object
Returns the value of attribute unique_id.
69
70
71
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 69
def unique_id
@unique_id
end
|
Instance Method Details
#append(data, **kwargs) ⇒ Object
data is array of formatted record string
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 90
def append(data, **kwargs)
raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
begin
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
rescue
adding = ''.force_encoding(Encoding::ASCII_8BIT)
data.each do |d|
adding << d.b
end
end
concat(adding, data.size)
end
|
#bytesize ⇒ Object
119
120
121
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 119
def bytesize
raise NotImplementedError, "Implement this method in child class"
end
|
#close ⇒ Object
167
168
169
170
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 167
def close
@state = :closed
self
end
|
#closed? ⇒ Boolean
148
149
150
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 148
def closed?
@state == :closed
end
|
#commit ⇒ Object
111
112
113
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 111
def commit
raise NotImplementedError, "Implement this method in child class"
end
|
#concat(bulk, records) ⇒ Object
for event streams which is packed or zipped (and we want not to unpack/uncompress)
107
108
109
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 107
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end
|
#created_at ⇒ Object
80
81
82
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 80
def created_at
@created_at_object ||= Time.at(@created_at)
end
|
#empty? ⇒ Boolean
128
129
130
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 128
def empty?
size == 0
end
|
#enqueued! ⇒ Object
162
163
164
165
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 162
def enqueued!
@state = :queued
self
end
|
#modified_at ⇒ Object
85
86
87
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 85
def modified_at
@modified_at_object ||= Time.at(@modified_at)
end
|
#open(**kwargs, &block) ⇒ Object
182
183
184
185
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 182
def open(**kwargs, &block)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end
|
#purge ⇒ Object
172
173
174
175
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 172
def purge
@state = :closed
self
end
|
#queued? ⇒ Boolean
144
145
146
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 144
def queued?
@state == :queued
end
|
#raw_create_at ⇒ Object
71
72
73
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 71
def raw_create_at
@created_at
end
|
#raw_modified_at ⇒ Object
75
76
77
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 75
def raw_modified_at
@modified_at
end
|
#read(**kwargs) ⇒ Object
177
178
179
180
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 177
def read(**kwargs)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end
|
#rollback ⇒ Object
115
116
117
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 115
def rollback
raise NotImplementedError, "Implement this method in child class"
end
|
#size ⇒ Object
Also known as:
length
123
124
125
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 123
def size
raise NotImplementedError, "Implement this method in child class"
end
|
#staged! ⇒ Object
152
153
154
155
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152
def staged!
@state = :staged
self
end
|
#staged? ⇒ Boolean
140
141
142
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 140
def staged?
@state == :staged
end
|
#unstaged! ⇒ Object
157
158
159
160
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 157
def unstaged!
@state = :unstaged
self
end
|
#unstaged? ⇒ Boolean
136
137
138
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 136
def unstaged?
@state == :unstaged
end
|
#writable? ⇒ Boolean
132
133
134
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 132
def writable?
@state == :staged || @state == :unstaged
end
|
#write_to(io, **kwargs) ⇒ Object
187
188
189
190
191
192
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 187
def write_to(io, **kwargs)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
open do |i|
IO.copy_stream(i, io)
end
end
|