Class: Fluent::Plugin::Buffer::Chunk
- Inherits:
-
Object
- Object
- Fluent::Plugin::Buffer::Chunk
show all
- Includes:
- UniqueId::Mixin, MonitorMixin
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Defined Under Namespace
Modules: Decompressable
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata, compress: :text) ⇒ Chunk
TODO: CompressedPackedMessage of forward protocol?
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51
def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
@state = :unstaged
@size = 0
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now
extend Decompressable if compress == :gzip
end
|
Instance Attribute Details
Returns the value of attribute metadata.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def metadata
@metadata
end
|
#state ⇒ Object
Returns the value of attribute state.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def state
@state
end
|
#unique_id ⇒ Object
Returns the value of attribute unique_id.
66
67
68
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 66
def unique_id
@unique_id
end
|
Instance Method Details
#append(data, **kwargs) ⇒ Object
data is array of formatted record string
87
88
89
90
91
92
93
94
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 87
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
end
concat(adding, data.size)
end
|
#bytesize ⇒ Object
109
110
111
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 109
def bytesize
raise NotImplementedError, "Implement this method in child class"
end
|
#close ⇒ Object
157
158
159
160
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 157
def close
@state = :closed
self
end
|
#closed? ⇒ Boolean
138
139
140
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 138
def closed?
@state == :closed
end
|
#commit ⇒ Object
101
102
103
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 101
def commit
raise NotImplementedError, "Implement this method in child class"
end
|
#concat(bulk, records) ⇒ Object
for event streams which is packed or zipped (and we want not to unpack/uncompress)
97
98
99
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 97
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end
|
#created_at ⇒ Object
77
78
79
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 77
def created_at
@created_at_object ||= Time.at(@created_at)
end
|
#empty? ⇒ Boolean
118
119
120
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 118
def empty?
size == 0
end
|
#enqueued! ⇒ Object
152
153
154
155
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 152
def enqueued!
@state = :queued
self
end
|
#modified_at ⇒ Object
82
83
84
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 82
def modified_at
@modified_at_object ||= Time.at(@modified_at)
end
|
#open(**kwargs, &block) ⇒ Object
172
173
174
175
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 172
def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
|
#purge ⇒ Object
162
163
164
165
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 162
def purge
@state = :closed
self
end
|
#queued? ⇒ Boolean
134
135
136
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 134
def queued?
@state == :queued
end
|
#raw_create_at ⇒ Object
68
69
70
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 68
def raw_create_at
@created_at
end
|
#raw_modified_at ⇒ Object
72
73
74
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 72
def raw_modified_at
@modified_at
end
|
#read(**kwargs) ⇒ Object
167
168
169
170
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 167
def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end
|
#rollback ⇒ Object
105
106
107
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 105
def rollback
raise NotImplementedError, "Implement this method in child class"
end
|
#size ⇒ Object
Also known as:
length
113
114
115
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 113
def size
raise NotImplementedError, "Implement this method in child class"
end
|
#staged! ⇒ Object
142
143
144
145
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 142
def staged!
@state = :staged
self
end
|
#staged? ⇒ Boolean
130
131
132
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 130
def staged?
@state == :staged
end
|
#unstaged! ⇒ Object
147
148
149
150
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 147
def unstaged!
@state = :unstaged
self
end
|
#unstaged? ⇒ Boolean
126
127
128
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 126
def unstaged?
@state == :unstaged
end
|
#writable? ⇒ Boolean
122
123
124
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 122
def writable?
@state == :staged || @state == :unstaged
end
|
#write_to(io, **kwargs) ⇒ Object
177
178
179
180
181
182
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 177
def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
open do |i|
IO.copy_stream(i, io)
end
end
|