Class: Core::ConnectorJob
- Inherits:
-
Object
- Object
- Core::ConnectorJob
- Defined in:
- lib/core/connector_job.rb
Constant Summary collapse
- DEFAULT_PAGE_SIZE =
100
- IDLE_THRESHOLD =
60
Class Method Summary collapse
- .delete_jobs(jobs) ⇒ Object
- .enqueue(_connector_id) ⇒ Object
- .fetch_by_id(job_id) ⇒ Object
- .idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object
- .orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE) ⇒ Object
- .pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object
Instance Method Summary collapse
- #[](property_name) ⇒ Object
- #active? ⇒ Boolean
- #cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #canceled? ⇒ Boolean
- #canceling? ⇒ Boolean
- #configuration ⇒ Object
- #connector ⇒ Object
- #connector_id ⇒ Object
- #connector_snapshot ⇒ Object
- #done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #error ⇒ Object
- #error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #es_source ⇒ Object
- #extract_binary_content? ⇒ Boolean
- #filtering ⇒ Object
- #id ⇒ Object
- #in_progress? ⇒ Boolean
- #index_name ⇒ Object
- #language ⇒ Object
- #make_running! ⇒ Object
- #pending? ⇒ Boolean
- #pipeline ⇒ Object
- #reduce_whitespace? ⇒ Boolean
- #run_ml_inference? ⇒ Boolean
- #service_type ⇒ Object
- #status ⇒ Object
- #suspended? ⇒ Boolean
- #terminated? ⇒ Boolean
- #update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
- #with_concurrency_control {|response, , | ... } ⇒ Object
Class Method Details
.delete_jobs(jobs) ⇒ Object
44 45 46 47 |
# File 'lib/core/connector_job.rb', line 44 def self.delete_jobs(jobs) query = { terms: { '_id': jobs.map(&:id) } } ElasticConnectorActions.delete_jobs_by_query(query) end |
.enqueue(_connector_id) ⇒ Object
67 68 69 |
# File 'lib/core/connector_job.rb', line 67 def self.enqueue(_connector_id) nil end |
.fetch_by_id(job_id) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/core/connector_job.rb', line 20 def self.fetch_by_id(job_id) es_response = ElasticConnectorActions.get_job(job_id) return nil unless es_response[:found] new(es_response) end |
.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/core/connector_job.rb', line 49 def self.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) connector_ids = if connector_id [connector_id] else ConnectorSettings.fetch_native_connectors.map(&:id) end query = { bool: { filter: [ { terms: { 'connector.id': connector_ids } }, { terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } }, { range: { last_seen: { lte: "now-#{IDLE_THRESHOLD}s" } } } ] } } fetch_jobs_by_query(query, page_size) end |
.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE) ⇒ Object
39 40 41 42 |
# File 'lib/core/connector_job.rb', line 39 def self.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE) query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } } fetch_jobs_by_query(query, page_size) end |
.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/core/connector_job.rb', line 27 def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) status_term = { status: Connectors::SyncStatus::PENDING_STATUSES } query = { bool: { must: [{ terms: status_term }] } } return fetch_jobs_by_query(query, page_size) if connectors_ids.empty? query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } } fetch_jobs_by_query(query, page_size) end |
Instance Method Details
#[](property_name) ⇒ Object
75 76 77 |
# File 'lib/core/connector_job.rb', line 75 def [](property_name) @elasticsearch_response[:_source][property_name] end |
#active? ⇒ Boolean
107 108 109 |
# File 'lib/core/connector_job.rb', line 107 def active? Connectors::SyncStatus::ACTIVE_STATUSES.include?(status) end |
#cancel!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
178 179 180 |
# File 'lib/core/connector_job.rb', line 178 def cancel!(ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, ) end |
#canceled? ⇒ Boolean
99 100 101 |
# File 'lib/core/connector_job.rb', line 99 def canceled? status == Connectors::SyncStatus::CANCELED end |
#canceling? ⇒ Boolean
91 92 93 |
# File 'lib/core/connector_job.rb', line 91 def canceling? status == Connectors::SyncStatus::CANCELING end |
#configuration ⇒ Object
135 136 137 |
# File 'lib/core/connector_job.rb', line 135 def configuration connector_snapshot[:configuration] end |
#connector ⇒ Object
159 160 161 |
# File 'lib/core/connector_job.rb', line 159 def connector @connector ||= ConnectorSettings.fetch_by_id(connector_id) end |
#connector_id ⇒ Object
119 120 121 |
# File 'lib/core/connector_job.rb', line 119 def connector_id connector_snapshot[:id] end |
#connector_snapshot ⇒ Object
115 116 117 |
# File 'lib/core/connector_job.rb', line 115 def connector_snapshot self[:connector] || {} end |
#done!(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
170 171 172 |
# File 'lib/core/connector_job.rb', line 170 def done!(ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, ) end |
#error ⇒ Object
79 80 81 |
# File 'lib/core/connector_job.rb', line 79 def error self[:error] end |
#error!(message, ingestion_stats = {}, connector_metadata = {}) ⇒ Object
174 175 176 |
# File 'lib/core/connector_job.rb', line 174 def error!(, ingestion_stats = {}, = {}) terminate!(Connectors::SyncStatus::ERROR, , ingestion_stats, ) end |
#es_source ⇒ Object
202 203 204 |
# File 'lib/core/connector_job.rb', line 202 def es_source @elasticsearch_response[:_source] end |
#extract_binary_content? ⇒ Boolean
147 148 149 |
# File 'lib/core/connector_job.rb', line 147 def extract_binary_content? pipeline[:extract_binary_content] end |
#filtering ⇒ Object
139 140 141 |
# File 'lib/core/connector_job.rb', line 139 def filtering connector_snapshot[:filtering] end |
#id ⇒ Object
71 72 73 |
# File 'lib/core/connector_job.rb', line 71 def id @elasticsearch_response[:_id] end |
#in_progress? ⇒ Boolean
87 88 89 |
# File 'lib/core/connector_job.rb', line 87 def in_progress? status == Connectors::SyncStatus::IN_PROGRESS end |
#index_name ⇒ Object
123 124 125 |
# File 'lib/core/connector_job.rb', line 123 def index_name connector_snapshot[:index_name] end |
#language ⇒ Object
127 128 129 |
# File 'lib/core/connector_job.rb', line 127 def language connector_snapshot[:language] end |
#make_running! ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/core/connector_job.rb', line 188 def make_running! with_concurrency_control do |es_doc, seq_no, primary_term| now = Time.now doc = { status: Connectors::SyncStatus::IN_PROGRESS, started_at: now, last_seen: now, worker_hostname: Socket.gethostname } ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term) end end |
#pending? ⇒ Boolean
103 104 105 |
# File 'lib/core/connector_job.rb', line 103 def pending? Connectors::SyncStatus::PENDING_STATUSES.include?(status) end |
#pipeline ⇒ Object
143 144 145 |
# File 'lib/core/connector_job.rb', line 143 def pipeline connector_snapshot[:pipeline] || {} end |
#reduce_whitespace? ⇒ Boolean
151 152 153 |
# File 'lib/core/connector_job.rb', line 151 def reduce_whitespace? pipeline[:reduce_whitespace] end |
#run_ml_inference? ⇒ Boolean
155 156 157 |
# File 'lib/core/connector_job.rb', line 155 def run_ml_inference? pipeline[:run_ml_inference] end |
#service_type ⇒ Object
131 132 133 |
# File 'lib/core/connector_job.rb', line 131 def service_type connector_snapshot[:service_type] end |
#status ⇒ Object
83 84 85 |
# File 'lib/core/connector_job.rb', line 83 def status self[:status] end |
#suspended? ⇒ Boolean
95 96 97 |
# File 'lib/core/connector_job.rb', line 95 def suspended? status == Connectors::SyncStatus::SUSPENDED end |
#terminated? ⇒ Boolean
111 112 113 |
# File 'lib/core/connector_job.rb', line 111 def terminated? Connectors::SyncStatus::TERMINAL_STATUSES.include?(status) end |
#update_metadata(ingestion_stats = {}, connector_metadata = {}) ⇒ Object
163 164 165 166 167 168 |
# File 'lib/core/connector_job.rb', line 163 def (ingestion_stats = {}, = {}) ingestion_stats ||= {} doc = { :last_seen => Time.now }.merge(ingestion_stats) doc[:metadata] = if &.any? ElasticConnectorActions.update_job_fields(id, doc) end |
#with_concurrency_control {|response, , | ... } ⇒ Object
182 183 184 185 186 |
# File 'lib/core/connector_job.rb', line 182 def with_concurrency_control response = ElasticConnectorActions.get_job(id) yield response, response['_seq_no'], response['_primary_term'] end |