Class: LocalBus::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/local_bus/subscriber.rb

Overview

Wraps a Callable (Proc) and Message intended for asynchronous execution.

Defined Under Namespace

Classes: Error

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(callable, message) ⇒ Subscriber

Constructor



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/local_bus/subscriber.rb', line 26

def initialize(callable, message)
  super()
  @callable = callable
  @message = message
  @id = callable.object_id
  @source_location = case callable
  in Proc then callable.source_location
  else callable.method(:call).source_location
  end
  @metadata = {}
end

Instance Attribute Details

#callableObject (readonly)

Callable object – Proc, lambda, etc. (must respond to #call)



48
49
50
# File 'lib/local_bus/subscriber.rb', line 48

def callable
  @callable
end

#errorObject (readonly)

Error if the subscriber fails (available after performing)



52
53
54
# File 'lib/local_bus/subscriber.rb', line 52

def error
  @error
end

#idObject (readonly)

Unique identifier for the subscriber



40
41
42
# File 'lib/local_bus/subscriber.rb', line 40

def id
  @id
end

#messageObject (readonly)

Message for the subscriber to process



56
57
58
# File 'lib/local_bus/subscriber.rb', line 56

def message
  @message
end

#metadataObject (readonly)

Metadata for the subscriber (available after performing)



60
61
62
# File 'lib/local_bus/subscriber.rb', line 60

def 
  @metadata
end

#source_locationObject (readonly)

Source location of the callable



44
45
46
# File 'lib/local_bus/subscriber.rb', line 44

def source_location
  @source_location
end

#valueObject (readonly)

Value returned by the callable (available after performing)



64
65
66
# File 'lib/local_bus/subscriber.rb', line 64

def value
  @value
end

Instance Method Details

#deconstruct_keys(keys) ⇒ Object

Allows pattern matching on subscriber attributes



122
123
124
# File 'lib/local_bus/subscriber.rb', line 122

def deconstruct_keys(keys)
  keys.any? ? to_h.slice(*keys) : to_h
end

#errored?Boolean

Indicates if the subscriber has errored

Returns:

  • (Boolean)


80
81
82
# File 'lib/local_bus/subscriber.rb', line 80

def errored?
  !!error
end

#pending?Boolean

Indicates if the subscriber is pending or unperformed

Returns:

  • (Boolean)


74
75
76
# File 'lib/local_bus/subscriber.rb', line 74

def pending?
  .empty?
end

#performObject

Performs the subscriber’s callable



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/local_bus/subscriber.rb', line 86

def perform
  synchronize do
    return if performed?

     do
      @value = callable.call(message)
    rescue => cause
      @error = Error.new("Invocation failed! #{cause.message}", cause: cause)
    end
  end
end

#performed?Boolean

Indicates if the subscriber has been performed

Returns:

  • (Boolean)


68
69
70
# File 'lib/local_bus/subscriber.rb', line 68

def performed?
  .any?
end

#timeout(cause) ⇒ Object

Handles timeout for the subscriber



101
102
103
104
105
106
107
# File 'lib/local_bus/subscriber.rb', line 101

def timeout(cause)
  return if performed?

   do
    @error = Error.new("Timeout expired before invocation! Waited #{message.timeout} seconds!", cause: cause)
  end
end

#to_hObject

Returns the subscriber’s data as a hash



111
112
113
114
115
116
117
# File 'lib/local_bus/subscriber.rb', line 111

def to_h
  {
    error: error,
    metadata: ,
    value: value
  }
end