Class: DBus::Connection
- Inherits:
-
Object
- Object
- DBus::Connection
- Defined in:
- lib/dbus/bus.rb
Overview
D-Bus main connection class
Main class that maintains a connection to a bus and can handle incoming and outgoing messages.
Direct Known Subclasses
Defined Under Namespace
Classes: NameRequestError
Constant Summary collapse
- NAME_FLAG_ALLOW_REPLACEMENT =
FIXME: describe the following names, flags and constants. See DBus spec for definition
0x1
- NAME_FLAG_REPLACE_EXISTING =
0x2
- NAME_FLAG_DO_NOT_QUEUE =
0x4
- REQUEST_NAME_REPLY_PRIMARY_OWNER =
0x1
- REQUEST_NAME_REPLY_IN_QUEUE =
0x2
- REQUEST_NAME_REPLY_EXISTS =
0x3
- REQUEST_NAME_REPLY_ALREADY_OWNER =
0x4
- DBUSXMLINTRO =
'<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd"> <node> <interface name="org.freedesktop.DBus.Introspectable"> <method name="Introspect"> <arg name="data" direction="out" type="s"/> </method> </interface> <interface name="org.freedesktop.DBus"> <method name="RequestName"> <arg direction="in" type="s"/> <arg direction="in" type="u"/> <arg direction="out" type="u"/> </method> <method name="ReleaseName"> <arg direction="in" type="s"/> <arg direction="out" type="u"/> </method> <method name="StartServiceByName"> <arg direction="in" type="s"/> <arg direction="in" type="u"/> <arg direction="out" type="u"/> </method> <method name="Hello"> <arg direction="out" type="s"/> </method> <method name="NameHasOwner"> <arg direction="in" type="s"/> <arg direction="out" type="b"/> </method> <method name="ListNames"> <arg direction="out" type="as"/> </method> <method name="ListActivatableNames"> <arg direction="out" type="as"/> </method> <method name="AddMatch"> <arg direction="in" type="s"/> </method> <method name="RemoveMatch"> <arg direction="in" type="s"/> </method> <method name="GetNameOwner"> <arg direction="in" type="s"/> <arg direction="out" type="s"/> </method> <method name="ListQueuedOwners"> <arg direction="in" type="s"/> <arg direction="out" type="as"/> </method> <method name="GetConnectionUnixUser"> <arg direction="in" type="s"/> <arg direction="out" type="u"/> </method> <method name="GetConnectionUnixProcessID"> <arg direction="in" type="s"/> <arg direction="out" type="u"/> </method> <method name="GetConnectionSELinuxSecurityContext"> <arg direction="in" type="s"/> <arg direction="out" type="ay"/> </method> <method name="ReloadConfig"> </method> <signal name="NameOwnerChanged"> <arg type="s"/> <arg type="s"/> <arg type="s"/> </signal> <signal name="NameLost"> <arg type="s"/> </signal> <signal name="NameAcquired"> <arg type="s"/> </signal> </interface> </node> '
- MSG_BUF_SIZE =
The buffer size for messages.
4096
Instance Attribute Summary collapse
-
#main_message_queue ⇒ Object
Returns the value of attribute main_message_queue.
-
#main_thread ⇒ Object
Returns the value of attribute main_thread.
-
#queue_used_by_thread ⇒ Object
Returns the value of attribute queue_used_by_thread.
-
#read_thread ⇒ Object
Returns the value of attribute read_thread.
-
#rescuemethod ⇒ Object
Returns the value of attribute rescuemethod.
-
#socket ⇒ Object
readonly
The socket that is used to connect with the bus.
-
#thread_waiting_for_message ⇒ Object
Returns the value of attribute thread_waiting_for_message.
-
#unique_name ⇒ Object
readonly
The unique name (by specification) of the message.
Instance Method Summary collapse
-
#add_match(mr, &slot) ⇒ Object
Asks bus to send us messages matching mr, and execute slot when received.
-
#connect ⇒ Object
Connect to the bus and initialize the connection.
-
#connect_to_tcp(params) ⇒ Object
Connect to a bus over tcp and initialize the connection.
-
#connect_to_unix(params) ⇒ Object
Connect to an abstract unix bus and initialize the connection.
-
#emit(service, obj, intf, sig, *args) ⇒ Object
Emit a signal event for the given service, object obj, interface intf and signal sig with arguments args.
-
#glibize ⇒ Object
Tell a bus to register itself on the glib main loop.
-
#initialize(path, threaded_access) ⇒ Connection
constructor
Create a new connection to the bus for a given connect path.
-
#introspect(dest, path) ⇒ Object
Issues a call to the org.freedesktop.DBus.Introspectable.Introspect method dest is the service and path the object path you want to introspect If a code block is given, the introspect call in asynchronous.
- #introspect_data(dest, path, &reply_handler) ⇒ Object
-
#messages ⇒ Object
Retrieve all the messages that are currently in the buffer.
-
#on_return(m, &retc) ⇒ Object
Specify a code block that has to be executed when a reply for message m is received.
-
#poll_messages ⇒ Object
Update the buffer and retrieve all messages using Connection#messages.
-
#pop_message ⇒ Object
Get one message from the bus and remove it from the buffer.
-
#process(m) ⇒ Object
Process a message m based on its type.
-
#proxy ⇒ Object
Set up a ProxyObject for the bus itself, since the bus is introspectable.
- #remove_match(mr) ⇒ Object
-
#request_service(name) ⇒ Object
Attempt to request a service name.
-
#send(buf) ⇒ Object
Send the buffer buf to the bus using Connection#writel.
-
#send_sync(m, &retc) ⇒ Object
Send a message m on to the bus.
-
#send_sync_or_async(message, &reply_handler) ⇒ Object
Send a message.
-
#service(name) ⇒ Object
(also: #[])
Retrieves the Service with the given name.
- #start_read_thread ⇒ Object
-
#update_buffer ⇒ Object
Fill (append) the buffer from data that might be available on the socket.
-
#wait_for_message ⇒ Object
Wait for a message to arrive.
Constructor Details
#initialize(path, threaded_access) ⇒ Connection
Create a new connection to the bus for a given connect path. path format is described in the D-Bus specification: dbus.freedesktop.org/doc/dbus-specification.html#addresses and is something like: “transport1:key1=value1,key2=value2;transport2:key1=value1,key2=value2” e.g. “unix:path=/tmp/dbus-test” or “tcp:host=localhost,port=2687”
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/dbus/bus.rb', line 210 def initialize(path, threaded_access) @path = path @unique_name = nil @buffer = "" @rescuemethod = nil @method_call_replies = Hash.new @method_call_msgs = Hash.new @signal_matchrules = Hash.new @proxy = nil @object_root = Node.new("/") @is_tcp = false @queue_used_by_thread = Hash.new @thread_waiting_for_message = Hash.new @main_message_queue = Queue.new @main_thread = nil @threaded = threaded_access end |
Instance Attribute Details
#main_message_queue ⇒ Object
Returns the value of attribute main_message_queue.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def @main_message_queue end |
#main_thread ⇒ Object
Returns the value of attribute main_thread.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def main_thread @main_thread end |
#queue_used_by_thread ⇒ Object
Returns the value of attribute queue_used_by_thread.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def queue_used_by_thread @queue_used_by_thread end |
#read_thread ⇒ Object
Returns the value of attribute read_thread.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def read_thread @read_thread end |
#rescuemethod ⇒ Object
Returns the value of attribute rescuemethod.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def rescuemethod @rescuemethod end |
#socket ⇒ Object (readonly)
The socket that is used to connect with the bus.
201 202 203 |
# File 'lib/dbus/bus.rb', line 201 def socket @socket end |
#thread_waiting_for_message ⇒ Object
Returns the value of attribute thread_waiting_for_message.
202 203 204 |
# File 'lib/dbus/bus.rb', line 202 def @thread_waiting_for_message end |
#unique_name ⇒ Object (readonly)
The unique name (by specification) of the message.
199 200 201 |
# File 'lib/dbus/bus.rb', line 199 def unique_name @unique_name end |
Instance Method Details
#add_match(mr, &slot) ⇒ Object
Asks bus to send us messages matching mr, and execute slot when received
659 660 661 662 663 664 665 666 667 668 669 |
# File 'lib/dbus/bus.rb', line 659 def add_match(mr, &slot) # check this is a signal. mrs = mr.to_s puts "#{@signal_matchrules.size} rules, adding #{mrs.inspect}" if $DEBUG # don't ask for the same match if we override it unless @signal_matchrules.key?(mrs) puts "Asked for a new match" if $DEBUG proxy.AddMatch(mrs) end @signal_matchrules[mrs] = slot end |
#connect ⇒ Object
Connect to the bus and initialize the connection.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/dbus/bus.rb', line 262 def connect addresses = @path.split ";" # connect to first one that succeeds worked = addresses.find do |a| transport, keyvaluestring = a.split ":" kv_list = keyvaluestring.split "," kv_hash = Hash.new kv_list.each do |kv| key, escaped_value = kv.split "=" value = escaped_value.gsub(/%(..)/) {|m| [$1].pack "H2" } kv_hash[key] = value end case transport when "unix" connect_to_unix kv_hash when "tcp" connect_to_tcp kv_hash else # ignore, report? end end if (@threaded) start_read_thread end worked # returns the address that worked or nil. # how to report failure? end |
#connect_to_tcp(params) ⇒ Object
Connect to a bus over tcp and initialize the connection.
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/dbus/bus.rb', line 292 def connect_to_tcp(params) #check if the path is sufficient if params.key?("host") and params.key?("port") begin #initialize the tcp socket @socket = TCPSocket.new(params["host"],params["port"].to_i) @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) init_connection @is_tcp = true rescue puts "Error: Could not establish connection to: #{@path}, will now exit." exit(0) #a little harsh end else #Danger, Will Robinson: the specified "path" is not usable puts "Error: supplied path: #{@path}, unusable! sorry." end end |
#connect_to_unix(params) ⇒ Object
Connect to an abstract unix bus and initialize the connection.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/dbus/bus.rb', line 312 def connect_to_unix(params) @socket = Socket.new(Socket::Constants::PF_UNIX,Socket::Constants::SOCK_STREAM, 0) @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if ! params['abstract'].nil? if HOST_END == LIL_END sockaddr = "\1\0\0#{params['abstract']}" else sockaddr = "\0\1\0#{params['abstract']}" end elsif ! params['path'].nil? sockaddr = Socket.pack_sockaddr_un(params['path']) end @socket.connect(sockaddr) init_connection end |
#emit(service, obj, intf, sig, *args) ⇒ Object
Emit a signal event for the given service, object obj, interface intf and signal sig with arguments args.
742 743 744 745 746 747 748 749 750 751 752 753 754 |
# File 'lib/dbus/bus.rb', line 742 def emit(service, obj, intf, sig, *args) m = Message.new(DBus::Message::SIGNAL) m.path = obj.path m.interface = intf.name m.member = sig.name m.sender = service.name i = 0 sig.params.each do |par| m.add_param(par.type, args[i]) i += 1 end send(m.marshall) end |
#glibize ⇒ Object
Tell a bus to register itself on the glib main loop
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/dbus/bus.rb', line 334 def glibize require 'glib2' # Circumvent a ruby-glib bug @channels ||= Array.new gio = GLib::IOChannel.new(@socket.fileno) @channels << gio gio.add_watch(GLib::IOChannel::IN) do |c, ch| update_buffer .each do |msg| process(msg) end true end end |
#introspect(dest, path) ⇒ Object
Issues a call to the org.freedesktop.DBus.Introspectable.Introspect method dest is the service and path the object path you want to introspect If a code block is given, the introspect call in asynchronous. If not data is returned
FIXME: link to ProxyObject data definition The returned object is a ProxyObject that has methods you can call to issue somme METHOD_CALL messages, and to setup to receive METHOD_RETURN
497 498 499 500 501 502 503 504 505 506 507 508 |
# File 'lib/dbus/bus.rb', line 497 def introspect(dest, path) if not block_given? # introspect in synchronous ! data = introspect_data(dest, path) pof = DBus::ProxyObjectFactory.new(data, self, dest, path) return pof.build else introspect_data(dest, path) do |async_data| yield(DBus::ProxyObjectFactory.new(async_data, self, dest, path).build) end end end |
#introspect_data(dest, path, &reply_handler) ⇒ Object
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 |
# File 'lib/dbus/bus.rb', line 470 def introspect_data(dest, path, &reply_handler) m = DBus::Message.new(DBus::Message::METHOD_CALL) m.path = path m.interface = "org.freedesktop.DBus.Introspectable" m.destination = dest m.member = "Introspect" m.sender = unique_name if reply_handler.nil? send_sync_or_async(m).first else send_sync_or_async(m) do |*args| # TODO test async introspection, is it used at all? args.shift # forget the message, pass only the text reply_handler.call(*args) nil end end end |
#messages ⇒ Object
Retrieve all the messages that are currently in the buffer.
577 578 579 580 581 582 583 |
# File 'lib/dbus/bus.rb', line 577 def ret = Array.new while msg = ret << msg end ret end |
#on_return(m, &retc) ⇒ Object
Specify a code block that has to be executed when a reply for message m is received.
648 649 650 651 652 653 654 655 |
# File 'lib/dbus/bus.rb', line 648 def on_return(m, &retc) # Have a better exception here if m. != Message::METHOD_CALL raise "on_return should only get method_calls" end @method_call_msgs[m.serial] = m @method_call_replies[m.serial] = retc end |
#poll_messages ⇒ Object
Update the buffer and retrieve all messages using Connection#messages. Return the messages.
590 591 592 593 594 595 596 597 |
# File 'lib/dbus/bus.rb', line 590 def ret = nil r, d, d = IO.select([@socket], nil, nil, 0) if r and r.size > 0 update_buffer end end |
#pop_message ⇒ Object
Get one message from the bus and remove it from the buffer. Return the message.
564 565 566 567 568 569 570 571 572 573 574 |
# File 'lib/dbus/bus.rb', line 564 def return nil if @buffer.empty? ret = nil begin ret, size = Message.new.unmarshall_buffer(@buffer) @buffer.slice!(0, size) rescue IncompleteBufferException => e # fall through, let ret be null end ret end |
#process(m) ⇒ Object
Process a message m based on its type.
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 |
# File 'lib/dbus/bus.rb', line 682 def process(m) return if m.nil? #check if somethings wrong case m. when Message::ERROR, Message::METHOD_RETURN raise InvalidPacketException if m.reply_serial == nil mcs = @method_call_replies[m.reply_serial] if not mcs puts "DEBUG: no return code for mcs: #{mcs.inspect} m: #{m.inspect}" if $DEBUG else if m. == Message::ERROR mcs.call(Error.new(m)) else mcs.call(m) end @method_call_replies.delete(m.reply_serial) @method_call_msgs.delete(m.reply_serial) end when DBus::Message::METHOD_CALL if m.path == "/org/freedesktop/DBus" puts "DEBUG: Got method call on /org/freedesktop/DBus" if $DEBUG end node = @service.get_node(m.path) if not node reply = Message.error(m, "org.freedesktop.DBus.Error.UnknownObject", "Object #{m.path} doesn't exist") send(reply.marshall) # handle introspectable as an exception: elsif m.interface == "org.freedesktop.DBus.Introspectable" and m.member == "Introspect" reply = Message.new(Message::METHOD_RETURN).reply_to(m) reply.sender = @unique_name reply.add_param(Type::STRING, node.to_xml) send(reply.marshall) else obj = node.object return if obj.nil? # FIXME, sends no reply obj.dispatch(m) if obj end when DBus::Message::SIGNAL # the signal can match multiple different rules @signal_matchrules.each do |mrs, slot| if DBus::MatchRule.new.from_s(mrs).match(m) slot.call(m) end end else puts "DEBUG: Unknown message type: #{m.}" if $DEBUG end end |
#proxy ⇒ Object
Set up a ProxyObject for the bus itself, since the bus is introspectable. Returns the object.
536 537 538 539 540 541 542 543 544 |
# File 'lib/dbus/bus.rb', line 536 def proxy if @proxy == nil path = "/org/freedesktop/DBus" dest = "org.freedesktop.DBus" pof = DBus::ProxyObjectFactory.new(DBUSXMLINTRO, self, dest, path) @proxy = pof.build["org.freedesktop.DBus"] end @proxy end |
#remove_match(mr) ⇒ Object
671 672 673 674 675 676 677 678 679 |
# File 'lib/dbus/bus.rb', line 671 def remove_match(mr) mrs = mr.to_s unless @signal_matchrules.delete(mrs).nil? # don't remove nonexisting matches. # FIXME if we do try, the Error.MatchRuleNotFound is *not* raised # and instead is reported as "no return code for nil" proxy.RemoveMatch(mrs) end end |
#request_service(name) ⇒ Object
Attempt to request a service name.
FIXME, NameRequestError cannot really be rescued as it will be raised when dispatching a later call. Rework the API to better match the spec.
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 |
# File 'lib/dbus/bus.rb', line 518 def request_service(name) # Use RequestName, but asynchronously! # A synchronous call would not work with service activation, where # method calls to be serviced arrive before the reply for RequestName # (Ticket#29). proxy.RequestName(name, NAME_FLAG_REPLACE_EXISTING) do |rmsg, r| if rmsg.is_a?(Error) # check and report errors first raise rmsg elsif r != REQUEST_NAME_REPLY_PRIMARY_OWNER raise NameRequestError end end @service = Service.new(name, self) @service end |
#send(buf) ⇒ Object
Send the buffer buf to the bus using Connection#writel.
329 330 331 |
# File 'lib/dbus/bus.rb', line 329 def send(buf) @socket.write(buf) unless @socket.nil? end |
#send_sync(m, &retc) ⇒ Object
Send a message m on to the bus. This is done synchronously, thus the call will block until a reply message arrives.
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 |
# File 'lib/dbus/bus.rb', line 622 def send_sync(m, &retc) # :yields: reply/return message return if m.nil? #check if somethings wrong if (@threaded) @queue_used_by_thread[Thread.current] = Queue.new # Creating Queue message for return @thread_waiting_for_message[m.serial] = Thread.current end send(m.marshall) @method_call_msgs[m.serial] = m @method_call_replies[m.serial] = retc retm = return if retm.nil? #check if somethings wrong process(retm) if (@threaded) @queue_used_by_thread.delete(Thread.current) else while @method_call_replies.has_key? m.serial retm = process(retm) end end end |
#send_sync_or_async(message, &reply_handler) ⇒ Object
Send a message. If reply_handler is not given, wait for the reply and return the reply, or raise the error. If reply_handler is given, it will be called when the reply eventually arrives, with the reply message as the 1st param and its params following
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'lib/dbus/bus.rb', line 447 def send_sync_or_async(, &reply_handler) ret = nil if reply_handler.nil? send_sync() do |rmsg| if rmsg.is_a?(Error) raise rmsg else ret = rmsg.params end end else on_return() do |rmsg| if rmsg.is_a?(Error) reply_handler.call(rmsg) else reply_handler.call(rmsg, * rmsg.params) end end send(.marshall) end ret end |
#service(name) ⇒ Object Also known as: []
Retrieves the Service with the given name.
733 734 735 736 737 |
# File 'lib/dbus/bus.rb', line 733 def service(name) # The service might not exist at this time so we cannot really check # anything Service.new(name, self) end |
#start_read_thread ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/dbus/bus.rb', line 228 def start_read_thread @read_thread = Thread.new{ puts "start the reading thread on socket #{@socket}" if $DEBUG loop do #loop to read if @socket.nil? puts "ERROR: Can't wait for messages, @socket is nil." return end ret = while ret == nil r, d, d = IO.select([@socket]) if r and r[0] == @socket update_buffer ret = end end case ret. when Message::ERROR, Message::METHOD_RETURN if ( @thread_waiting_for_message[ret.reply_serial].nil?) process(ret) # there is no thread, process the message else thread_in_wait = @thread_waiting_for_message[ret.reply_serial] @queue_used_by_thread[thread_in_wait] << ret # puts the message in the queue @thread_waiting_for_message.delete(ret.reply_serial) end else @main_message_queue << ret end end } end |
#update_buffer ⇒ Object
Fill (append) the buffer from data that might be available on the socket.
548 549 550 551 552 553 554 555 556 557 558 559 560 |
# File 'lib/dbus/bus.rb', line 548 def update_buffer @buffer += @socket.read_nonblock(MSG_BUF_SIZE) rescue EOFError if (@threaded) @rescuemethod.call end raise # the caller expects it rescue Exception => e puts "Oops:", e raise if @is_tcp # why? puts "WARNING: read_nonblock failed, falling back to .recv" @buffer += @socket.recv(MSG_BUF_SIZE) end |
#wait_for_message ⇒ Object
Wait for a message to arrive. Return it once it is available.
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 |
# File 'lib/dbus/bus.rb', line 600 def if(@threaded) return @queue_used_by_thread[Thread.current].pop else if @socket.nil? puts "ERROR: Can't wait for messages, @socket is nil." return end ret = while ret == nil r, d, d = IO.select([@socket]) if r and r[0] == @socket update_buffer ret = end end ret end end |