Class: ConcurrentPipeline::Producer
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Producer
show all
- Defined in:
- lib/concurrent_pipeline/producer.rb
Defined Under Namespace
Modules: CustomPipelines
Classes: Stream
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(data: nil, store: nil, stream: nil, dir: nil, &initialization_block) ⇒ Producer
Returns a new instance of Producer.
31
32
33
34
35
36
37
38
39
|
# File 'lib/concurrent_pipeline/producer.rb', line 31
def initialize(data: nil, store: nil, stream: nil, dir: nil, &initialization_block)
raise ArgumentError.new("provide data or store but not both") if data && store
raise ArgumentError.new("must provide initial data, a store, or a block") unless data || store || initialization_block
@dir = dir
@data = data
@store = store&.reader? ? store.store : store
@initialization_block = initialization_block
@stream = stream
end
|
Class Method Details
.model(klass_or_symbol, as: nil, &block) ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/concurrent_pipeline/producer.rb', line 109
def model(klass_or_symbol, as: nil, &block)
if klass_or_symbol.is_a?(Class)
raise ArgumentError.new("Cannot provide both a class and a block") if block
as ||= klass_or_symbol.name.split("::").last.to_sym
registry.register(as, klass_or_symbol)
elsif klass_or_symbol.is_a?(Symbol)
registry.register(klass_or_symbol, Class.new do
extend Model
instance_eval(&block)
end)
else
raise ArgumentError.new("Must provide either a class or a symbol")
end
end
|
.pipeline(klass_or_symbol = nil, **opts, &block) ⇒ Object
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/concurrent_pipeline/producer.rb', line 132
def pipeline(klass_or_symbol = nil, **opts, &block)
pipelineable = (
if klass_or_symbol.is_a?(Class)
raise ArgumentError.new("Cannot provide both a class and a block") if block
klass_or_symbol
elsif klass_or_symbol.is_a?(Symbol) || klass_or_symbol.nil?
klass_or_symbol ||= "Pipeline#{pipelineables.count}"
pipeline_class = Class.new(Pipeline, &block)
class_name = klass_or_symbol.to_s.split("_").collect(&:capitalize).join
CustomPipelines.const_set(class_name, pipeline_class)
pipeline_class
else
raise ArgumentError.new("Must provide either a class or a symbol")
end
)
opts.each do |meth, args|
pipelineable.public_send(meth, *Array(args))
end
pipelineables << pipelineable
end
|
.pipelineables ⇒ Object
97
98
99
|
# File 'lib/concurrent_pipeline/producer.rb', line 97
def pipelineables
@pipelineables ||= []
end
|
.registry ⇒ Object
101
102
103
104
105
106
107
|
# File 'lib/concurrent_pipeline/producer.rb', line 101
def registry
@registry ||= (
Registry
.new
.tap { _1.register(:PipelineStep, Pipeline::PipelineStep) }
)
end
|
.store(klass = nil) ⇒ Object
92
93
94
95
|
# File 'lib/concurrent_pipeline/producer.rb', line 92
def store(klass = nil)
@store = klass || Stores::Yaml
@store
end
|
.stream(&block) ⇒ Object
127
128
129
130
|
# File 'lib/concurrent_pipeline/producer.rb', line 127
def stream(&block)
return @stream unless block
@stream = Stream.new.tap { _1.instance_exec(&block) }
end
|
Instance Method Details
#call(&block) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/concurrent_pipeline/producer.rb', line 41
def call(&block)
changeset = self.store.changeset
@initialization_block&.call(changeset)
store.apply(changeset)
Processors::ActorProcessor.call(
store: store,
pipelineables: pipelineables,
registry: registry,
stream: stream
)
store.reader.all(:PipelineStep).all?(&:success?)
end
|
#data ⇒ Object
56
57
58
|
# File 'lib/concurrent_pipeline/producer.rb', line 56
def data
store.reader.to_h
end
|
#dir ⇒ Object
76
77
78
|
# File 'lib/concurrent_pipeline/producer.rb', line 76
def dir
@dir ||= Dir.mktmpdir
end
|
#history ⇒ Object
72
73
74
|
# File 'lib/concurrent_pipeline/producer.rb', line 72
def history
self.class.store.history(dir: dir, registry: registry)
end
|
#store ⇒ Object
60
61
62
|
# File 'lib/concurrent_pipeline/producer.rb', line 60
def store
@store ||= self.class.store.build_writer(data: @data || {}, dir: dir, registry: registry)
end
|
#stream ⇒ Object
64
65
66
|
# File 'lib/concurrent_pipeline/producer.rb', line 64
def stream
@stream || self.class.stream
end
|
#versions ⇒ Object
68
69
70
|
# File 'lib/concurrent_pipeline/producer.rb', line 68
def versions
self.class.store.versions(dir: dir, registry: registry)
end
|