pants

<img src=“https://travis-ci.org/turboladen/pants.png?branch=master” alt=“Build Status” /> <img src=“https://codeclimate.com/badge.png” />

DESCRIPTION:

Pants redirects IO using EventMachine from one input source to many different destinations. In some senses, pants is like a *nix pipe that (works on Windows and) allows for duplicating data across many pipes (like splice and tee).

FEATURES/PROBLEMS:

Read raw data from a

  • file

  • UDP socket (unicast or multicast)

…and write to any number and combination of

  • files

  • UDP sockets (unicast or multicast)

  • pants seams

Also:

  • All readers can write to any other writer

  • Pluggable: write your own reader or writer

SYNOPSIS:

Concepts

Readers & Writers

The core concept of pants is that without using threads and system calls, it’s difficult to duplicate data. Pants simplifies duplicating data from a single source by using “readers” to read data from a source, then packetize it in some form that “writers” can consume and use for their individual needs. These readers and writers are similar to read and write ends of a *nix pipe, but with pants there can be more than one write end of the pipe.

For example, you can use a UDP reader to read data on an IP and port, then simultaneously write that data to a file and forward it on to another IP and port using writers.

Seams

Next, pants also uses “seams” to act as a middle-man for readers and writers. A seam is just like another reader, but it can read from other readers. Seams are primarily useful for doing something to the data from a reader before passing it on to writers. Think of it like a pipe with one read end and the ability to have many write ends.

For example, the rtp gem is responsible for (amongst other things) pulling an A/V stream out of an A/V file, making sure those chunks of data (A/V frames) are sized well for sending out via UDP, then adding some headers to each on of those chunks (packets), then actually sending each of those packets to some number of UDP endpoints. To do this with pants, it would:

  • Use a reader (well, a demuxer to be clear) to read the A/V stream in the file

  • Use a seam to accept the A/V stream data chunks, make sure the chunks are sized right, then add an RTP header to each chunk

  • Use a writer (or many writers) to read from the seam and send the data out to a UDP IP/port (or many UDP IPs/ports)

Pros and Cons

Pros

If you plan to do the above for just one UDP client, the benefits of pants might not quite shine through (although it will do this quite quickly, and only in a maybe three lines of code); if you intend to be sending the RTP-encoded data to a number of UDP clients though, pants shines in that you will have only had to spend system resources for reading the file once and encoding the data (which can be an expensive operation) once before sending it out over the network to all of your clients. Quite often, network servers will use multicast to achieve this, but pants can do this with both unicast and multicast–or both at the same time, while writing to a file (<– maybe for debug purposes, for example?).

Also, since pants uses eventmachine, you don’t have to worry about dealing with threads. If you’re writing your own reader or writer, you do, however, need to consider if you should defer (eventmachine’s means of putting your code into a thread) the code you want to do the reading/writing. This is just a matter of wrapping that code in block though, while eventmachine handles the threading for you.

Cons

The amount of data you can replicate really depends, of course, on system resources. On an i7 MacBook Pro with 16GB RAM and 2 wired NICs, I’ve been able to duplicate a single 720p video + audio stream over unicast UDP 200 times (pushing ~1.4 Gbps out) with almost no quality loss on client ends. If you plan to duplicate more than 20 streams, you’ll need to start tweaking EventMachine thread pool settings; generally you should set EventMachines threadpool_size to the number of output streams so it can process all of the data concurrently. If you’re on OSX or *nix, you might benefit from using EventMachine’s .epoll and .kqueue feature. More on that here.

Just like in any case when dealing with I/O, file reading/writing also depends on many factors, such as the number and size of files, disk capabilities, and general I/O capabilities of your system. If you’d like to benchmark, there’s a Thor task in tasks/ that will compare pants to FileUtils.cp and ‘cp` in copying a file some number of times (default to 100).

Example:

$ tasks/pantsmark.thor file_copy some_song.mp3 --times=50

If you’re wanting to write your own readers/writers/seams and you’re not familiar with eventmachine, getting plugged in to some of its concepts can bek frustrating, as it’s a different paradigm to work off than the standard paradigm for doing I/O. Some may say that this is a con, and some may not.

Examples

As a library

Read unicast UDP inbound data and write to a number of other unicast UDP clients:

Pants.read('udp://0.0.0.0:1234') do |reader|
  reader.write_to 'udp://10.0.0.10:1234'
  reader.write_to 'udp://10.0.0.11:1234'
end

Read multicast UDP inbound data and write to a number of other unicast UDP clients:

Pants.read('udp://239.0.0.1:1234') do |reader|
  reader.write_to 'udp://10.0.0.10:1234'
  reader.write_to 'udp://10.0.0.11:1234'
end

Read unicast UDP inbound data and write to a UDP client and a file:

Pants.read('udp://0.0.0.0:1234') do |reader|
  reader.write_to 'udp://10.0.0.10:1234'
  reader.write_to 'socket_data.raw'
end

Read a file and send out via UDP:

Pants.read('socket_data.raw') do |reader|
  reader.write_to 'udp://10.0.0.10:1234'
  reader.write_to 'udp://239.0.0.1:1234'
end

Get kray-kray:

EM.threadpool_size = 110
EM.kqueue         # This has been problematic for me...
EM.epoll

Pants.read('udp://0.0.0.0:1234') do |reader|
  100.times do |i|
    reader.write_to "udp://10.0.0.10:#{1235 + i}"
  end

  10.times do |i|
    reader.write_to "socket_data_#{i}.raw"
  end
end

The block form above is really just some syntactic sugar for doing:

core = Pants::Core.new
reader = core.read 'udp://0.0.0.0:1234'
reader.write_to 'udp://10.0.0.10:1234'
reader.write_to 'udp://10.0.0.11:1234'
core.run

…and that’s actually short for for:

core = Pants::Core.new
reader = core.add_reader(Pants::Readers::UDPReader, '0.0.0.0', 1234)
reader.add_writer(Pants::Writers::UDPWriter, '10.0.0.10', 1234)
reader.add_writer(Pants::Writers::UDPWriter, '10.0.0.11', 1234)
core.run

…which can be made even longer (but potentially more helpful):

core = Pants::Core.new
reader = Pants::Readers::UDPReader.new('0.0.0.0', 1234, core.callback)
core.add_reader(reader)

writer1 = Pants::Writers::UDPWriter.new('10.0.0.10', 1234, reader.write_to_channel)
reader.add_writer(writer1)

writer2 = Pants::Writers::UDPWriter.new('10.0.0.11', 1234, reader.write_to_channel)
reader.add_writer(writer2)

core.run

Using this unsugared form, does give you some flexibility though to use within your code. If, for example, you’re writing a server of some sort that needs to handle a varying number of clients, you can add and remove writers at will:

require 'sinatra'

core = Pants::Core.new
reader = core.read 'udp://0.0.0.0:1234'

post '/:client' do
  reader.write_to "udp://#{params[:client]}"
end

delete '/:client' do
  reader.remove_writer("udp://#{params[:client]}")
end

core.run

As an executable

The examples from above can also be run via the command-line app like this:

$ pants udp://0.0.0.0:1234 --dest=udp://10.0.0.10:1234 udp://10.0.0.11:1234 udp://10.0.0.11:2345
$ pants udp://239.0.0.1:1234 --dest=udp://10.0.0.10:1234 udp://10.0.0.11:1234 udp://10.0.0.11:2345
$ pants udp://0.0.0.0:1234 --dest=udp://10.0.0.10:1234 file:///home/me/socket_data.raw
$ pants /home/me/socket_data.raw --dest=udp://10.0.0.10:1234 udp://239.0.0.1:1234

Use a seam to inspect and alter packets

require 'pants'

class PantsInspector < Pants::Seam
  def initialize(core_callback, reader_channel, host)
    @host = host

    super(core_callback, reader_channel)
  end

  # Pants will call this for you when it starts the reader that the seam is
  # reading from.
  def start

    # You need to define a callback here so pants can call you back after its
    # made sure that the seam's writers have been started.  This makes sure
    # the seam doesn't start reading and pushing out data before the writers
    # are ready to receive it.
    callback = EM.Callback do
      puts "Starting #{self.class}..."

      read_items do |data|
        if data.match(/pants/mi)
          data << "Pants party at #{@host}!!!!!!"
        end

        write data
      end
    end

    super(callback)
  end
end

# Assuming you have data coming in on this IP and UDP port, this reads each
# packet and hands it over to its writers--in this case, the file dump file
# writer, the UDP writer that sends to 10.0.0.50:9001, and our PacketInspector
# seam.
Pants.read('udp://127.0.0.1:1234') do |reader|

  # Dump UDP packets to a file
  reader.write_to 'file_dump_of_udp_packets.udp'

  # Redirect the UDP packets to another socket
  reader.write_to 'udp://127.0.0.1:9000'

  # Inspect the UDP packets and notify about the party going on
  pants_inspector = reader.add_seam(PantsInspector, 'localhost')

  # The packet inspector seam behaves like a reader, which lets writers read
  # from it.  Dump those party packets to a file...
  pants_inspector.write_to 'file_dump_of_party_packets.party'

  # Forward our partying packets on to another host.
  pants_inspector.write_to 'udp://10.0.0.1:1234'
end

If you actually run this, you’ll have to ctrl-c out of it, as pants will read on that UDP socket indefinitely. After doing that, you’ll see that you’ve now got a ‘file_dump_of_udp_packets.udp’, which contains the raw UDP data that came in on port 1234, and a ‘file_dump_of_party_packets.party’ file, which contains those UDP packets with our party message at the end of each packet. If you had a client listening at 127.0.0.1:9000 and 10.0.0.1:1234, it would received the exact same data that was written to the respective files.

REQUIREMENTS:

  • Rubies (tested)

    • MRI 1.9.3

    • JRuby 1.7.2 (failing due to what looks like EventMachine bugs)

    • Rubinius (failing due to what looks like EventMachine bugs)

  • Gems

    • eventmachine (>=1.0.0)

    • log_switch

    • thor

NOTE: Multicasting with JRuby doesn’t seem to work; EM fails to allow setting socket options, which is necessary to do multicasting.

INSTALL:

  • (sudo) gem install

DEVELOPERS:

After checking out the source, run:

$ bundle install

This task will install any missing dependencies, run the tests/specs, and generate the RDoc.

LICENSE:

(The MIT License)

Copyright © 2013 Steve Loveless

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the ‘Software’), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED ‘AS IS’, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.