Class: Plushie::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/plushie/connection.rb

Overview

Low-level protocol client for the plushie renderer.

Manages a bidirectional pipe to the renderer binary, handles wire framing, and provides thread-safe message sending. Decoded messages are pushed to a Thread::Queue or dispatched via a callback proc.

This layer is usable standalone for scripting and REPL exploration without the full Elm architecture:

conn = Plushie::Connection.spawn(format: :json) # hello is available after spawn puts conn.hello[:version] conn.send_encoded(Protocol::Encode.encode_snapshot(tree, :json)) conn.close

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#format:msgpack, :json (readonly)

Returns wire format.

Returns:

  • (:msgpack, :json)

    wire format



23
24
25
# File 'lib/plushie/connection.rb', line 23

def format
  @format
end

#helloHash? (readonly)

Returns hello handshake response from renderer.

Returns:

  • (Hash, nil)

    hello handshake response from renderer



26
27
28
# File 'lib/plushie/connection.rb', line 26

def hello
  @hello
end

Class Method Details

.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection

Attach to existing IO streams (for :stdio transport).

Parameters:

  • stdin (IO)

    writable stream to renderer

  • stdout (IO)

    readable stream from renderer

  • format (:msgpack, :json) (defaults to: :msgpack)

    wire format

  • settings (Hash) (defaults to: {})

    initial settings to send

  • queue (Thread::Queue, nil) (defaults to: nil)
  • on_message (Proc, nil) (defaults to: nil)

Returns:



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/plushie/connection.rb', line 57

def self.attach(stdin:, stdout:, format: :msgpack, settings: {},
  queue: nil, on_message: nil)
  conn = new(format: format, queue: queue, on_message: on_message)
  conn.instance_variable_set(:@stdin, stdin)
  conn.instance_variable_set(:@stdout, stdout)
  stdin.binmode
  stdout.binmode
  conn.send(:perform_handshake, settings)
  conn.send(:start_reader)
  conn
end

.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection

Create a connection backed by an iostream adapter.

The adapter mediates between the Connection and an external I/O source (SSH channel, TCP socket, WebSocket, etc.). Instead of reading/writing pipes, the connection exchanges data with the adapter via method calls.

The adapter must respond to:

  • +on_bridge(connection)+ -- called during init, adapter stores connection ref
  • +send_data(data)+ -- called by Connection to write encoded bytes

The adapter calls back:

  • +connection.receive_data(data)+ -- when data arrives from the transport
  • +connection.transport_closed(reason)+ -- when the transport closes

Parameters:

  • adapter (#on_bridge, #send_data)

    iostream adapter object

  • format (:msgpack, :json) (defaults to: :msgpack)

    wire format

  • settings (Hash) (defaults to: {})

    initial settings to send

  • queue (Thread::Queue, nil) (defaults to: nil)
  • on_message (Proc, nil) (defaults to: nil)

Returns:



90
91
92
93
94
95
# File 'lib/plushie/connection.rb', line 90

def self.iostream(adapter:, format: :msgpack, settings: {},
  queue: nil, on_message: nil)
  conn = new(format: format, queue: queue, on_message: on_message)
  conn.send(:setup_iostream, adapter, settings)
  conn
end

.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) ⇒ Connection

Spawn a renderer process and perform the hello handshake.

Parameters:

  • format (:msgpack, :json) (defaults to: :msgpack)

    wire format

  • binary (String, nil) (defaults to: nil)

    path to renderer binary (nil = auto-resolve)

  • mode (:windowed, :headless, :mock, nil) (defaults to: nil)

    execution mode

  • max_sessions (Integer, nil) (defaults to: nil)

    max concurrent sessions

  • log_level (Symbol, nil) (defaults to: nil)

    renderer log level

  • settings (Hash) (defaults to: {})

    initial settings to send

  • queue (Thread::Queue, nil) (defaults to: nil)

    queue to push decoded messages to

  • on_message (Proc, nil) (defaults to: nil)

    callback for decoded messages (alternative to queue)

Returns:



39
40
41
42
43
44
45
46
# File 'lib/plushie/connection.rb', line 39

def self.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil,
  log_level: nil, settings: {}, queue: nil, on_message: nil)
  conn = new(format: format, queue: queue, on_message: on_message)
  conn.send(:spawn_process, binary, mode, max_sessions, log_level)
  conn.send(:perform_handshake, settings)
  conn.send(:start_reader)
  conn
end

Instance Method Details

#closeObject

Close the connection and clean up resources.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/plushie/connection.rb', line 164

def close
  return if @closed
  @closed = true
  @reader_thread&.kill
  if @iostream_adapter
    @iostream_adapter.stop if @iostream_adapter.respond_to?(:stop)
  else
    begin
      @stdin&.close
    rescue
      nil
    end
    begin
      @stdout&.close
    rescue
      nil
    end
    begin
      @process_thread&.value
    rescue
      nil
    end
  end
end

#closed?Boolean

Returns true if the connection is closed.

Returns:

  • (Boolean)

    true if the connection is closed



190
191
192
# File 'lib/plushie/connection.rb', line 190

def closed?
  @closed
end

#receive_data(data) ⇒ Object

Called by the iostream adapter when data arrives from the transport. The adapter is responsible for framing -- each call should deliver one complete protocol message.

Parameters:

  • data (String)

    a complete protocol message (decoded from framing)



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/plushie/connection.rb', line 125

def receive_data(data)
  return if @closed

  msg = Protocol::Decode.decode(data, @format)
  decoded = Protocol::Decode.dispatch_message(msg)

  if !@hello && decoded.is_a?(Hash) && decoded[:type] == :hello
    @hello = decoded
    @handshake_queue&.push(decoded)
  elsif decoded
    dispatch_message(decoded)
  end
end

#send_encoded(data) ⇒ Object

Send pre-encoded wire bytes to the renderer. Thread-safe.

Parameters:

  • data (String)

    encoded message bytes



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/plushie/connection.rb', line 100

def send_encoded(data)
  @write_mutex.synchronize do
    if @iostream_adapter
      @iostream_adapter.send_data(data)
    else
      case @format
      when :msgpack
        @stdin.write([data.bytesize].pack("N"))
        @stdin.write(data)
      when :json
        @stdin.write(data)
      end
      @stdin.flush
    end
  end
rescue IOError, Errno::EPIPE => e
  @closed = true
  dispatch_message({type: :connection_error, error: e})
end

#send_message(msg) ⇒ Object

Encode a hash and send it. Injects session field if missing.

Parameters:

  • msg (Hash)

    message to encode and send



151
152
153
# File 'lib/plushie/connection.rb', line 151

def send_message(msg)
  send_encoded(Protocol::Encode.encode(msg, @format))
end

#send_message_for_session(msg, session_id) ⇒ Object

Send a message with a specific session ID injected.

Parameters:

  • msg (Hash)

    message hash

  • session_id (String)

    session identifier



159
160
161
# File 'lib/plushie/connection.rb', line 159

def send_message_for_session(msg, session_id)
  send_message(msg.merge(session: session_id))
end

#transport_closed(reason) ⇒ Object

Called by the iostream adapter when the transport closes.

Parameters:

  • reason (Symbol, String)

    reason for closure



142
143
144
145
146
# File 'lib/plushie/connection.rb', line 142

def transport_closed(reason)
  return if @closed
  @closed = true
  dispatch_message({type: :connection_closed, reason: reason})
end