Class: PushyClient::ProtocolHandler
- Inherits:
-
Object
- Object
- PushyClient::ProtocolHandler
- Defined in:
- lib/pushy_client/protocol_handler.rb
Defined Under Namespace
Classes: TimeRecvWrapper, TimeSendWrapper
Constant Summary collapse
- MAX_BODY_SIZE =
The maximum size, in bytes, allowed for a message body. This is not configurable because it is configurable (though not documented) on the server, and we don’t want to make users have to sync the two values. The max on the server is actually 65536, but we leave a little room since the server is measuring the signed message and we’re just counting the size of the stderr and stdout.
63000
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#client_private_key ⇒ Object
readonly
Returns the value of attribute client_private_key.
-
#command_address ⇒ Object
readonly
Returns the value of attribute command_address.
-
#server_heartbeat_address ⇒ Object
readonly
Returns the value of attribute server_heartbeat_address.
-
#server_public_key ⇒ Object
readonly
Returns the value of attribute server_public_key.
-
#session_key ⇒ Object
readonly
Returns the value of attribute session_key.
-
#session_method ⇒ Object
readonly
Returns the value of attribute session_method.
Instance Method Summary collapse
-
#initialize(client) ⇒ ProtocolHandler
constructor
A new instance of ProtocolHandler.
- #node_name ⇒ Object
- #reconfigure ⇒ Object
- #send_command(message_type, job_id, params) ⇒ Object
- #send_heartbeat(sequence) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(client) ⇒ ProtocolHandler
Returns a new instance of ProtocolHandler.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/pushy_client/protocol_handler.rb', line 71 def initialize(client) @client = client # We synchronize on this when we change the socket (so if you want a # valid socket to send or receive on, synchronize on this) @socket_lock = Mutex.new # This holds the same purpose, but receive blocks for a while so it gets # its own lock to avoid blocking sends. reconfigure will take both locks. @receive_socket_lock = Mutex.new # When the server goes down, close and reopen sockets. client.on_server_availability_change do |available| if !available Thread.new do begin Chef::Log.info "[#{node_name}] Closing and reopening sockets since server is down ..." reconfigure Chef::Log.info "[#{node_name}] Done closing and reopening sockets." rescue client.log_exception("Error reconfiguring sockets when server went down", $!) end end end end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
96 97 98 |
# File 'lib/pushy_client/protocol_handler.rb', line 96 def client @client end |
#client_private_key ⇒ Object (readonly)
Returns the value of attribute client_private_key.
102 103 104 |
# File 'lib/pushy_client/protocol_handler.rb', line 102 def client_private_key @client_private_key end |
#command_address ⇒ Object (readonly)
Returns the value of attribute command_address.
98 99 100 |
# File 'lib/pushy_client/protocol_handler.rb', line 98 def command_address @command_address end |
#server_heartbeat_address ⇒ Object (readonly)
Returns the value of attribute server_heartbeat_address.
97 98 99 |
# File 'lib/pushy_client/protocol_handler.rb', line 97 def server_heartbeat_address @server_heartbeat_address end |
#server_public_key ⇒ Object (readonly)
Returns the value of attribute server_public_key.
99 100 101 |
# File 'lib/pushy_client/protocol_handler.rb', line 99 def server_public_key @server_public_key end |
#session_key ⇒ Object (readonly)
Returns the value of attribute session_key.
100 101 102 |
# File 'lib/pushy_client/protocol_handler.rb', line 100 def session_key @session_key end |
#session_method ⇒ Object (readonly)
Returns the value of attribute session_method.
101 102 103 |
# File 'lib/pushy_client/protocol_handler.rb', line 101 def session_method @session_method end |
Instance Method Details
#node_name ⇒ Object
104 105 106 |
# File 'lib/pushy_client/protocol_handler.rb', line 104 def node_name client.node_name end |
#reconfigure ⇒ Object
186 187 188 189 190 191 192 193 |
# File 'lib/pushy_client/protocol_handler.rb', line 186 def reconfigure @socket_lock.synchronize do @receive_socket_lock.synchronize do internal_stop start # Start picks up new configuration end end end |
#send_command(message_type, job_id, params) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/pushy_client/protocol_handler.rb', line 195 def send_command(, job_id, params) Chef::Log.debug("[#{node_name}] Sending command #{} for job #{job_id}") # Nil params will be stripped by JSON.generate() = { :node => node_name, :client => client.hostname, :protocol_version => PushyClient::PROTOCOL_VERSION, :org => client.org_name, :type => , :sequence => -1, :timestamp => TimeSendWrapper.now.httpdate, :incarnation_id => client.incarnation_id, :job_id => job_id }.merge(validate_params(params)) send_signed_json_command(:hmac_sha256, ) end |
#send_heartbeat(sequence) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/pushy_client/protocol_handler.rb', line 213 def send_heartbeat(sequence) Chef::Log.debug("[#{node_name}] Sending heartbeat (sequence ##{sequence})") job_state = client.job_state = { :node => node_name, :client => client.hostname, :protocol_version => PushyClient::PROTOCOL_VERSION, :org => client.org_name, :type => :heartbeat, :sequence => -1, :timestamp => TimeSendWrapper.now.httpdate, :incarnation_id => client.incarnation_id, :job_state => job_state[:state], :job_id => job_state[:job_id] } send_signed_json_command(:hmac_sha256, ) end |
#start ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/pushy_client/protocol_handler.rb', line 108 def start server_address = URI(client.config['push_jobs']['heartbeat']['out_addr']).host check_server_address(node_name, server_address) @server_heartbeat_address = client.config['push_jobs']['heartbeat']['out_addr'] @command_address = client.config['push_jobs']['heartbeat']['command_addr'] @server_public_key = OpenSSL::PKey::RSA.new(client.config['public_key']) @client_private_key = ProtocolHandler::load_key(client.client_key) @max_message_skew = client.config['max_message_skew'] if client.using_curve server_curve_pub_key = client.config['curve_public_key'] # decode and extract session key begin @session_method = client.config['encoded_session_key']['method'] enc_session_key = Base64::decode64(client.config['encoded_session_key']['key']) @session_key = @client_private_key.private_decrypt(enc_session_key) rescue =>_ Chef::Log.error "[#{node_name}] No session key found in config" exit(-1) end else # decode and extract session key begin @session_method = client.config['encoded_session_key']['method'] enc_session_key = Base64::decode64(client.config['encoded_session_key']['key']) @session_key = @client_private_key.private_decrypt(enc_session_key) rescue =>e Chef::Log.error "[#{node_name}] No session key found in config" exit(-1) end end Chef::Log.info "[#{node_name}] Starting ZMQ version #{LibZMQ.version}" # Server heartbeat socket Chef::Log.info "[#{node_name}] Listening for server heartbeat at #{@server_heartbeat_address}" @server_heartbeat_socket = PushyClient::ZmqContext.socket(ZMQ::SUB) @server_heartbeat_socket.connect(@server_heartbeat_address) @server_heartbeat_socket.setsockopt(ZMQ::SUBSCRIBE, "") @server_heartbeat_seq_no = -1 # Command socket Chef::Log.info "[#{node_name}] Connecting to command channel at #{@command_address}" # TODO # This needs to be set up to be able to handle bidirectional messages; right now this is Tx only # Probably need to set it up with a handler, like the subscriber socket above. @command_socket = PushyClient::ZmqContext.socket(ZMQ::DEALER) @command_socket.setsockopt(ZMQ::LINGER, 0) # Note setting this to '1' causes the client to crash on send, but perhaps that # beats storming the server when the server restarts @command_socket.setsockopt(ZMQ::RCVHWM, 0) # Buffering more than a few heartbeats can cause trauma on the server after restart @command_socket.setsockopt(ZMQ::SNDHWM, 3) if client.using_curve @command_socket.setsockopt(ZMQ::CURVE_SERVERKEY, server_curve_pub_key) @command_socket.setsockopt(ZMQ::CURVE_PUBLICKEY, client.client_curve_pub_key) @command_socket.setsockopt(ZMQ::CURVE_SECRETKEY, client.client_curve_sec_key) end @command_socket.connect(@command_address) @command_socket_server_seq_no = -1 @command_socket_outgoing_seq = 0 @receive_thread = start_receive_thread end |
#stop ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/pushy_client/protocol_handler.rb', line 178 def stop @socket_lock.synchronize do @receive_socket_lock.synchronize do internal_stop end end end |