Class: Fibril

Inherits:
Fiber
  • Object
show all
Defined in:
lib/fibril/core.rb,
lib/fibril/version.rb

Defined Under Namespace

Classes: AsyncProxy, FAsyncProxy, FFuture, FibrilProxy, ForkedNonBlockingIOWrapper, Future, Guard, NonBlockingIOWrapper, TickProxy

Constant Summary collapse

VERSION =
"0.0.5"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&blk) ⇒ Fibril

Returns a new instance of Fibril.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fibril/core.rb', line 39

def initialize(&blk)
  self.id = Fibril.id_seq += 1
  self.block  = blk
  self.guards = []
  define_singleton_method :execute_fibril, self.block
  if Fibril.running
    super(&method(:execute))
    Fibril.enqueue self
  else
    Fibril.task_count = 0
    Fibril.stopped = false
    Fibril.running = true
    super(&method(:execute))
    Fibril.enqueue self
    Fibril.start
  end
end

Class Attribute Details

.currentObject

Returns the value of attribute current.



9
10
11
# File 'lib/fibril/core.rb', line 9

def current
  @current
end

.guardsObject

Returns the value of attribute guards.



9
10
11
# File 'lib/fibril/core.rb', line 9

def guards
  @guards
end

.id_seqObject

Returns the value of attribute id_seq.



9
10
11
# File 'lib/fibril/core.rb', line 9

def id_seq
  @id_seq
end

.loop_threadObject

Returns the value of attribute loop_thread.



9
10
11
# File 'lib/fibril/core.rb', line 9

def loop_thread
  @loop_thread
end

.queueObject

Returns the value of attribute queue.



9
10
11
# File 'lib/fibril/core.rb', line 9

def queue
  @queue
end

.runningObject

Returns the value of attribute running.



9
10
11
# File 'lib/fibril/core.rb', line 9

def running
  @running
end

.stoppedObject

Returns the value of attribute stopped.



9
10
11
# File 'lib/fibril/core.rb', line 9

def stopped
  @stopped
end

.task_countObject

Returns the value of attribute task_count.



9
10
11
# File 'lib/fibril/core.rb', line 9

def task_count
  @task_count
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



17
18
19
# File 'lib/fibril/core.rb', line 17

def block
  @block
end

#fiberObject

Returns the value of attribute fiber.



17
18
19
# File 'lib/fibril/core.rb', line 17

def fiber
  @fiber
end

#guardsObject

Returns the value of attribute guards.



17
18
19
# File 'lib/fibril/core.rb', line 17

def guards
  @guards
end

#idObject

Returns the value of attribute id.



17
18
19
# File 'lib/fibril/core.rb', line 17

def id
  @id
end

Class Method Details

.deplete_guard(guard, result) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fibril/core.rb', line 109

def self.deplete_guard(guard, result)
  return unless waiter_list = guards[guard.id]
  waiter_list.each do |waiters|
    switches = waiters[:switches]
    switches[guard.id] = true
    if waiters.has_key?(:to_fulfill)
      Fibril.enqueue waiters[:to_fulfill] if switches.values.all?
      waiters[:result] ||= []
      waiters[:result] << result
    else
      waiters[:result] ||= []
      waiters[:result] << result
      waiters[:block][*sort_results(waiters[:result], waiters[:guards])] if waiters[:block] && switches.values.all?
    end
  end
end

.enqueue(fibril) ⇒ Object



94
95
96
97
# File 'lib/fibril/core.rb', line 94

def self.enqueue(fibril)
  Fibril.log "Enqueing fibril #{fibril.id}"
  Fibril.queue << fibril
end

.guardObject



27
28
29
# File 'lib/fibril/core.rb', line 27

def self.guard
  @@guard ||= OpenStruct.new
end

.log(msg) ⇒ Object



19
20
21
# File 'lib/fibril/core.rb', line 19

def self.log(msg)
  # puts msg
end

.pending_tasks?Boolean

Returns:

  • (Boolean)


209
210
211
# File 'lib/fibril/core.rb', line 209

def self.pending_tasks?
  ((@task_count > 0 || !@queue.empty?) && !@stopped)
end

.profile(test) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/fibril/core.rb', line 192

def self.profile(test)
  starts = Time.now
  result = yield
  ends   = Time.now
  Fibril.log "#{test} took #{ends - starts}"
  return result
end

.sort_results(results, guards) ⇒ Object



140
141
142
143
144
145
# File 'lib/fibril/core.rb', line 140

def self.sort_results(results, guards)
  by_complete_order = guards.sort_by(&:depleted_at)
  results.zip(by_complete_order).sort do |(_, guard_a), (_, guard_b)|
    guards.index(guard_a) <=> guards.index(guard_b)
  end.map(&:first)
end

.startObject



187
188
189
190
# File 'lib/fibril/core.rb', line 187

def self.start
  self.start_loop if !queue.empty?
  self.running = false
end

.start_loopObject



200
201
202
203
204
205
206
207
# File 'lib/fibril/core.rb', line 200

def self.start_loop
  Fibril.log "Starting loop inside #{Fibril.current}"
  Fibril.loop_thread = Thread.current
  while pending_tasks?
    Fibril.current = nil
    Fibril.queue.shift.resume while !queue.empty?
  end
end

.stopObject



175
176
177
178
179
# File 'lib/fibril/core.rb', line 175

def self.stop
  Fibril do
    Fibril.stopped = true
  end
end

.variablesObject



35
36
37
# File 'lib/fibril/core.rb', line 35

def self.variables
  @@variables ||= OpenStruct.new
end

Instance Method Details

#await(*guards, &block) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fibril/core.rb', line 147

def await(*guards, &block)
  guards.map!{|guard| guard.kind_of?(Symbol) ? Fibril.guard.send(guard) : guard}
  raise "Invalid guard given #{guards}" unless guards.all?{|g| g.kind_of?(Guard) || g.kind_of?(Future)}
  if block_given?
    return block[*guards.map(&:result)] if guards.all?(&:result?)
    await_block = {
      switches: Hash[guards.map{|guard| [guard.id, false]}],
      block: block,
      guards: guards
    }
    guards.each do |guard|
      Fibril.guards[guard.id] << await_block
    end
  else
    guard = guards.first
    guard.kind_of?(Future) ? await_future(guard) : await_fibril(guards)
  end
end

#await_all(*futures) ⇒ Object



171
172
173
# File 'lib/fibril/core.rb', line 171

def await_all(*futures)
  futures.map(&:await)
end

#await_fibril(guards) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fibril/core.rb', line 126

def await_fibril(guards)
  singular = guards.one?
  return singular ? guards[0].result : guards.map(&:result) if guards.all?(&:result?)
  await_block = {
    switches: Hash[guards.map{|guard| [guard.id, false]}],
    to_fulfill: Fibril.current
  }
  guards.each do |guard|
    Fibril.guards[guard.id] << await_block
  end
  self.yield
  return singular ? await_block[:result][0] : Fibril.sort_results(await_block[:result], guards)
end

#await_future(future) ⇒ Object



166
167
168
169
# File 'lib/fibril/core.rb', line 166

def await_future(future)
  tick while future.alive?
  future.await
end

#currentObject



105
106
107
# File 'lib/fibril/core.rb', line 105

def current
  self
end

#enqueueObject



90
91
92
# File 'lib/fibril/core.rb', line 90

def enqueue
  Fibril.enqueue(self)
end

#executeObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fibril/core.rb', line 63

def execute
  Fibril.task_count += 1
  exception = nil
  result = begin
    execute_fibril
  rescue Exception => e
    exception = e
  end
  self.guards.each do |guard|
    guard.visit(result)
  end
  Fibril.task_count -= 1
  Fibril.log "Ending #{id}"
  raise exception if exception
end

#Guard(i, fibril) ⇒ Object



213
214
215
# File 'lib/fibril/core.rb', line 213

def Guard(i, fibril)
  return Guard.new(i, fibril)
end

#guardObject



23
24
25
# File 'lib/fibril/core.rb', line 23

def guard
  Fibril.guard
end

#reset(guard) ⇒ Object



57
58
59
60
61
# File 'lib/fibril/core.rb', line 57

def reset(guard)
  copy = Fibril.new(&self.block)
  copy.guards << guard
  return copy
end

#resumeObject



181
182
183
184
185
# File 'lib/fibril/core.rb', line 181

def resume
  Fibril.current = self
  Fibril.log "Resuming #{id}"
  super
end

#tickObject



79
80
81
82
83
84
85
86
87
88
# File 'lib/fibril/core.rb', line 79

def tick
  if Thread.current != Fibril.loop_thread
    Fibril.log "Current thread is #{Thread.current.object_id}"
    Fibril.log "Fibril thread is #{Fibril.loop_thread.object_id}"
    Fibril.log "WARN: Cannot tick inside async code outside of main loop thread. This will be a noop"
  elsif !Fibril.queue.empty?
    Fibril.enqueue self
    self.yield
  end
end

#variablesObject



31
32
33
# File 'lib/fibril/core.rb', line 31

def variables
  Fibril.variables
end

#yield {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:

  • _self (Fibril)

    the object that the method was called on



99
100
101
102
103
# File 'lib/fibril/core.rb', line 99

def yield
  Fibril.log "Yielding #{id}"
  yield(self) if block_given?
  Fiber.yield
end