Module: Qwirk::Batch::FileWorker
- Includes:
- Qwirk::BaseWorker
- Defined in:
- lib/qwirk/batch/file_worker.rb
Overview
Batch worker which reads records from files and queues them up for a separate worker (Qwirk::Adapter::JMS::ReplyWorker) to process. For instance, a worker of this type might look as follows:
class MyBatchWorker
include Qwirk::Batch::FileWorker
file :glob => '/home/batch_files/input/**', :age => 1.minute, :max_outstanding_records => 100, :fail_threshold => 0.8, :save_period => 30.seconds
marshal :string
end
The following options can be used for configuring the class
file:
:glob => <glob_path>
The path where files will be processed from. Files will be renamed with a .processing extension while they are being processed
and to a .completed extension when processing is completed.
:age => <duration>
How old a file must be before it will be processed. This is to prevent files that are in the middle of being uploaded from begin acquired.
:poll_time => <duration>
How often the glob is queried for new files.
:max_outstanding_records => <integer>
This is how many outstanding records can be queued at a time.
:
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary
Attributes included from Qwirk::BaseWorker
Class Method Summary collapse
- .default_acquire_file_strategy ⇒ Object
-
.default_acquire_file_strategy=(default_strategy) ⇒ Object
Set the global default acquire_file_strategy for an organization.
- .default_file_status_strategy ⇒ Object
-
.default_file_status_strategy=(default_strategy) ⇒ Object
Set the global default process_file_strategy for an organization.
- .default_parse_file_strategy ⇒ Object
-
.default_parse_file_strategy=(default_strategy) ⇒ Object
Set the global default parse_file_strategy for an organization.
- .file_status_strategy_from_sym(strategy) ⇒ Object
- .file_status_strategy_to_sym(strategy) ⇒ Object
- .included(base) ⇒ Object
Instance Method Summary collapse
Methods included from Qwirk::BaseWorker
Class Method Details
.default_acquire_file_strategy ⇒ Object
70 71 72 |
# File 'lib/qwirk/batch/file_worker.rb', line 70 def self.default_acquire_file_strategy @@default_acquire_file_strategy end |
.default_acquire_file_strategy=(default_strategy) ⇒ Object
Set the global default acquire_file_strategy for an organization
66 67 68 |
# File 'lib/qwirk/batch/file_worker.rb', line 66 def self.default_acquire_file_strategy=(default_strategy) @@default_acquire_file_strategy = default_strategy end |
.default_file_status_strategy ⇒ Object
88 89 90 |
# File 'lib/qwirk/batch/file_worker.rb', line 88 def self.default_file_status_strategy self.file_status_strategy_to_sym(@@default_file_status_strategy) end |
.default_file_status_strategy=(default_strategy) ⇒ Object
Set the global default process_file_strategy for an organization
84 85 86 |
# File 'lib/qwirk/batch/file_worker.rb', line 84 def self.default_file_status_strategy=(default_strategy) @@default_file_status_strategy = self.file_status_strategy_from_sym(default_strategy) end |
.default_parse_file_strategy ⇒ Object
79 80 81 |
# File 'lib/qwirk/batch/file_worker.rb', line 79 def self.default_parse_file_strategy @@default_parse_file_strategy end |
.default_parse_file_strategy=(default_strategy) ⇒ Object
Set the global default parse_file_strategy for an organization
75 76 77 |
# File 'lib/qwirk/batch/file_worker.rb', line 75 def self.default_parse_file_strategy=(default_strategy) @@default_parse_file_strategy = default_strategy end |
.file_status_strategy_from_sym(strategy) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/qwirk/batch/file_worker.rb', line 92 def self.file_status_strategy_from_sym(strategy) if strategy.kind_of?(Symbol) if strategy == :active_record require 'qwirk/batch/active_record' Qwirk::Batch::ActiveRecord::BatchJob elsif strategy == :mongoid require 'qwirk/batch/mongoid' Qwirk::Batch::Mongoid::BatchJob else raise "Invalid symbol for file_status_strategy=#{strategy}" end else strategy end end |
.file_status_strategy_to_sym(strategy) ⇒ Object
108 109 110 111 112 113 114 115 116 |
# File 'lib/qwirk/batch/file_worker.rb', line 108 def self.file_status_strategy_to_sym(strategy) if strategy == Qwirk::Batch::ActiveRecord::BatchJob :active_record elsif strategy == Qwirk::Batch::ActiveRecord::BatchJob :mongoid else strategy end end |
.included(base) ⇒ Object
60 61 62 63 |
# File 'lib/qwirk/batch/file_worker.rb', line 60 def self.included(base) Qwirk::BaseWorker.included(base) base.extend(ClassMethods) end |
Instance Method Details
#initialize(opts = {}) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/qwirk/batch/file_worker.rb', line 130 def initialize(opts={}) super @marshal_type = (self.class.marshal_type || :ruby).to_s @marshaler = MarshalStrategy.find(@marshal_type) @stopped = false @queue_name = opts[:queue_name] || self.class.queue_name || (self.name.match(/(.*)File$/) && $1) raise "queue_name not specified in #{self.class.name}" unless @queue_name @reply_queue_name = opts[:reply_queue_name] || self.class.reply_queue_name || "#{@queue_name}Reply" = self.class. raise "file_options not set for #{self.class.name}" unless acquire_strategy_class = .delete(:acquire_strategy) || FileWorker.default_acquire_file_strategy parse_strategy_class = .delete(:parse_strategy) || FileWorker.default_parse_file_strategy status_strategy = .delete(:status_strategy) || FileWorker.default_parse_file_strategy raise 'No status_strategy defined' unless status_strategy status_strategy_class = FileWorker.file_status_strategy_from_sym(status_strategy) @acquire_file_strategy = acquire_strategy_class.new() @parse_file_strategy = parse_strategy_class.new() @file_status_strategy = status_strategy_class.new() @max_outstanding_records = [:max_outstanding_records] || 10 end |
#join ⇒ Object
175 176 177 |
# File 'lib/qwirk/batch/file_worker.rb', line 175 def join thread.join end |
#start ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/qwirk/batch/file_worker.rb', line 152 def start #TODO: look for current job while file = @acquire_file_strategy.acquire_file do @parse_file_strategy.open(file) @reply_thread = Thread.new do java.lang.Thread.current_thread.name = "Qwirk worker (reply): #{worker}" reply_event_loop end begin @record_total = @parse_file_strategy.record_total process_file ensure @parse_file_strategy.close end end end |
#status ⇒ Object
179 180 181 |
# File 'lib/qwirk/batch/file_worker.rb', line 179 def status raise "Need to override status method in #{self.class.name}" end |
#stop ⇒ Object
170 171 172 173 |
# File 'lib/qwirk/batch/file_worker.rb', line 170 def stop @stopped = true @acquire_file_strategy.stop end |