Class: Plushie::Connection
- Inherits:
-
Object
- Object
- Plushie::Connection
- Defined in:
- lib/plushie/connection.rb
Overview
Low-level protocol client for the plushie renderer.
Manages a bidirectional pipe to the renderer binary, handles wire framing, and provides thread-safe message sending. Decoded messages are pushed to a Thread::Queue or dispatched via a callback proc.
This layer is usable standalone for scripting and REPL exploration without the full Elm architecture:
conn = Plushie::Connection.spawn(format: :json) # hello is available after spawn puts conn.hello[:version] conn.send_encoded(Protocol::Encode.encode_snapshot(tree, :json)) conn.close
Instance Attribute Summary collapse
-
#format ⇒ :msgpack, :json
readonly
Wire format.
-
#hello ⇒ Hash?
readonly
Hello handshake response from renderer.
Class Method Summary collapse
-
.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Attach to existing IO streams (for :stdio transport).
-
.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Create a connection backed by an iostream adapter.
-
.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Spawn a renderer process and perform the hello handshake.
Instance Method Summary collapse
-
#close ⇒ Object
Close the connection and clean up resources.
-
#closed? ⇒ Boolean
True if the connection is closed.
-
#receive_data(data) ⇒ Object
Called by the iostream adapter when data arrives from the transport.
-
#send_encoded(data) ⇒ Object
Send pre-encoded wire bytes to the renderer.
-
#send_message(msg) ⇒ Object
Encode a hash and send it.
-
#send_message_for_session(msg, session_id) ⇒ Object
Send a message with a specific session ID injected.
-
#transport_closed(reason) ⇒ Object
Called by the iostream adapter when the transport closes.
Instance Attribute Details
#format ⇒ :msgpack, :json (readonly)
Returns wire format.
23 24 25 |
# File 'lib/plushie/connection.rb', line 23 def format @format end |
#hello ⇒ Hash? (readonly)
Returns hello handshake response from renderer.
26 27 28 |
# File 'lib/plushie/connection.rb', line 26 def hello @hello end |
Class Method Details
.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Attach to existing IO streams (for :stdio transport).
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/plushie/connection.rb', line 57 def self.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.instance_variable_set(:@stdin, stdin) conn.instance_variable_set(:@stdout, stdout) stdin.binmode stdout.binmode conn.send(:perform_handshake, settings) conn.send(:start_reader) conn end |
.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Create a connection backed by an iostream adapter.
The adapter mediates between the Connection and an external I/O source (SSH channel, TCP socket, WebSocket, etc.). Instead of reading/writing pipes, the connection exchanges data with the adapter via method calls.
The adapter must respond to:
- +on_bridge(connection)+ -- called during init, adapter stores connection ref
- +send_data(data)+ -- called by Connection to write encoded bytes
The adapter calls back:
- +connection.receive_data(data)+ -- when data arrives from the transport
- +connection.transport_closed(reason)+ -- when the transport closes
90 91 92 93 94 95 |
# File 'lib/plushie/connection.rb', line 90 def self.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.send(:setup_iostream, adapter, settings) conn end |
.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Spawn a renderer process and perform the hello handshake.
39 40 41 42 43 44 45 46 |
# File 'lib/plushie/connection.rb', line 39 def self.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.send(:spawn_process, binary, mode, max_sessions, log_level) conn.send(:perform_handshake, settings) conn.send(:start_reader) conn end |
Instance Method Details
#close ⇒ Object
Close the connection and clean up resources.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/plushie/connection.rb', line 164 def close return if @closed @closed = true @reader_thread&.kill if @iostream_adapter @iostream_adapter.stop if @iostream_adapter.respond_to?(:stop) else begin @stdin&.close rescue nil end begin @stdout&.close rescue nil end begin @process_thread&.value rescue nil end end end |
#closed? ⇒ Boolean
Returns true if the connection is closed.
190 191 192 |
# File 'lib/plushie/connection.rb', line 190 def closed? @closed end |
#receive_data(data) ⇒ Object
Called by the iostream adapter when data arrives from the transport. The adapter is responsible for framing -- each call should deliver one complete protocol message.
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/plushie/connection.rb', line 125 def receive_data(data) return if @closed msg = Protocol::Decode.decode(data, @format) decoded = Protocol::Decode.(msg) if !@hello && decoded.is_a?(Hash) && decoded[:type] == :hello @hello = decoded @handshake_queue&.push(decoded) elsif decoded (decoded) end end |
#send_encoded(data) ⇒ Object
Send pre-encoded wire bytes to the renderer. Thread-safe.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/plushie/connection.rb', line 100 def send_encoded(data) @write_mutex.synchronize do if @iostream_adapter @iostream_adapter.send_data(data) else case @format when :msgpack @stdin.write([data.bytesize].pack("N")) @stdin.write(data) when :json @stdin.write(data) end @stdin.flush end end rescue IOError, Errno::EPIPE => e @closed = true ({type: :connection_error, error: e}) end |
#send_message(msg) ⇒ Object
Encode a hash and send it. Injects session field if missing.
151 152 153 |
# File 'lib/plushie/connection.rb', line 151 def (msg) send_encoded(Protocol::Encode.encode(msg, @format)) end |
#send_message_for_session(msg, session_id) ⇒ Object
Send a message with a specific session ID injected.
159 160 161 |
# File 'lib/plushie/connection.rb', line 159 def (msg, session_id) (msg.merge(session: session_id)) end |
#transport_closed(reason) ⇒ Object
Called by the iostream adapter when the transport closes.
142 143 144 145 146 |
# File 'lib/plushie/connection.rb', line 142 def transport_closed(reason) return if @closed @closed = true ({type: :connection_closed, reason: reason}) end |