Class: Uc::EventStream

Inherits:
Object
  • Object
show all
Defined in:
lib/uc/event_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name) ⇒ EventStream

Returns a new instance of EventStream.



11
12
13
# File 'lib/uc/event_stream.rb', line 11

def initialize(queue_name)
  @queue_name = queue_name
end

Instance Attribute Details

#debug_outputObject

Returns the value of attribute debug_output.



9
10
11
# File 'lib/uc/event_stream.rb', line 9

def debug_output
  @debug_output
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



8
9
10
# File 'lib/uc/event_stream.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#close_connectionsObject



95
96
97
98
# File 'lib/uc/event_stream.rb', line 95

def close_connections
  writer.close if writer
  @writer = nil
end

#debug(msg) ⇒ Object



19
20
21
# File 'lib/uc/event_stream.rb', line 19

def debug(msg)
  pub :debug, msg
end

#expect(event_type, timeout: 30, recreate: true, &block) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/uc/event_stream.rb', line 38

def expect(event_type, timeout: 30, recreate: true, &block)
  begin
    mq.recreate if recreate
    mq.clear
    t = wait_in_background(event_type, timeout, output: true, first_timeout: 50)
    yield
    t.join
    raise t[:error] if t[:error]
  rescue => e
    raise uc_error(e)
  ensure
    t.kill if t
  end
end

#fatal(msg) ⇒ Object



27
28
29
# File 'lib/uc/event_stream.rb', line 27

def fatal(msg)
  pub :fatal, msg
end

#info(msg) ⇒ Object



15
16
17
# File 'lib/uc/event_stream.rb', line 15

def info(msg)
  pub :info, msg
end


82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/uc/event_stream.rb', line 82

def print(event)
  case event.type
  when "info"
    puts event.msg
  when "warn"
    puts "#{"warn".yellow.bold} #{event.msg}"
  when "debug"
    puts "#{"debug".blue.bold} #{event.msg}" if debug_output
  when "fatal"
    raise ::Uc::Error, event.msg
  end
end

#pub(type, msg) ⇒ Object



31
32
33
34
35
36
# File 'lib/uc/event_stream.rb', line 31

def pub(type, msg)
  event = Event.new(type, msg)
  writer.send event.to_s(mq.msg_size)
rescue Errno::ENOENT, Errno::EAGAIN, Errno::EACCES, Errno::EMSGSIZE => e
  puts "#{e.class} #{e.message}"
end

#wait(event_type, timeout, output: false, first_timeout: nil) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/uc/event_stream.rb', line 53

def wait(event_type, timeout, output: false, first_timeout: nil)
  event_type = event_type.to_s
  message = ""
  event = ""
  t = first_timeout || timeout
  mq.reader do |r|
    loop do
      r.receive(message, t)
      t = timeout
      event = Event.parse message
      print event if output
      break if event.type == event_type
    end
  end
  puts "#{"success".green.bold} #{event.msg}"
  true
end

#wait_in_background(event_type, timeout, **kwargs) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/uc/event_stream.rb', line 71

def wait_in_background(event_type, timeout, **kwargs)
  Thread.new do
    begin
      wait(event_type, timeout, **kwargs)
    rescue => e
      Thread.current[:error] = e
      false
    end
  end
end

#warn(msg) ⇒ Object



23
24
25
# File 'lib/uc/event_stream.rb', line 23

def warn(msg)
  pub :warn, msg
end