Class: NatsWork::Server::SimpleWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/simple_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, queues = nil) ⇒ SimpleWorker

Returns a new instance of SimpleWorker.



10
11
12
13
14
15
16
# File 'lib/natswork/simple_worker.rb', line 10

def initialize(id, queues = nil)
  @id = id
  @queues = queues || NatsWork.config.worker_queues || ['default']
  @pool = Concurrent::FixedThreadPool.new(5)
  @running = false
  @subscriptions = []
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



8
9
10
# File 'lib/natswork/simple_worker.rb', line 8

def id
  @id
end

#poolObject (readonly)

Returns the value of attribute pool.



8
9
10
# File 'lib/natswork/simple_worker.rb', line 8

def pool
  @pool
end

#queuesObject (readonly)

Returns the value of attribute queues.



8
9
10
# File 'lib/natswork/simple_worker.rb', line 8

def queues
  @queues
end

Instance Method Details

#startObject



18
19
20
21
22
23
24
# File 'lib/natswork/simple_worker.rb', line 18

def start
  @running = true
  NatsWork.logger.info "Worker #{@id} starting..."

  # Subscribe to job queues
  subscribe_to_queues
end

#stopObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/natswork/simple_worker.rb', line 26

def stop
  @running = false

  # Unsubscribe from all queues
  @subscriptions.each do |sid|
    NatsWork::Client.instance.connection_pool.with_connection do |conn|
      conn.unsubscribe(sid)
    end
  rescue StandardError => e
    NatsWork.logger.error "Error unsubscribing: #{e.message}"
  end

  @subscriptions.clear
  NatsWork.logger.info "Worker #{@id} stopped"
end