Module: PWN::SDR::Decoder::RDS

Defined in:
lib/pwn/sdr/decoder/rds.rb

Overview

RDS Decoder Module for FM Radio Signals

Class Method Summary collapse

Class Method Details

.authorsObject

Author(s)

0day Inc. <[email protected]>



123
124
125
126
127
# File 'lib/pwn/sdr/decoder/rds.rb', line 123

public_class_method def self.authors
  "AUTHOR(S):
    0day Inc. <[email protected]>
  "
end

.decode(opts = {}) ⇒ Object

Supported Method Parameters

rds_resp = PWN::SDR::Decoder::RDS.decode(

freq_obj: 'required - GQRX socket object returned from #connect method'

)



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/pwn/sdr/decoder/rds.rb', line 16

public_class_method def self.decode(opts = {})
  freq_obj = opts[:freq_obj]
  gqrx_sock = freq_obj[:gqrx_sock]

  # freq_obj = freq_obj.dup
  freq_obj.delete(:gqrx_sock)
  skip_freq_char = "\n"
  puts JSON.pretty_generate(freq_obj)
  puts "\n*** FM Radio RDS Decoder ***"
  puts 'Press [ENTER] to continue to next frequency...'

  # Toggle RDS off and on to reset the decoder
  PWN::SDR::GQRX.cmd(
    gqrx_sock: gqrx_sock,
    cmd: 'U RDS 0',
    resp_ok: 'RPRT 0'
  )

  PWN::SDR::GQRX.cmd(
    gqrx_sock: gqrx_sock,
    cmd: 'U RDS 1',
    resp_ok: 'RPRT 0'
  )

  # Spinner setup with dynamic terminal width awareness
  spinner = TTY::Spinner.new(
    '[:spinner] :status',
    format: :arrow_pulse,
    clear: true,
    hide_cursor: true
  )

  # Conservative overhead for spinner animation, colors, and spacing
  spinner_overhead = 12
  max_title_length = [TTY::Screen.width - spinner_overhead, 50].max

  initial_title = 'INFO: Decoding FM radio RDS data...'
  initial_title = initial_title[0...max_title_length] if initial_title.length > max_title_length
  spinner.update(title: initial_title)
  spinner.auto_spin

  last_resp = {}

  loop do
    rds_resp = {
      rds_pi: PWN::SDR::GQRX.cmd(gqrx_sock: gqrx_sock, cmd: 'p RDS_PI').to_s.strip.chomp.delete('.'),
      rds_ps_name: PWN::SDR::GQRX.cmd(gqrx_sock: gqrx_sock, cmd: 'p RDS_PS_NAME').to_s.strip.chomp,
      rds_radiotext: PWN::SDR::GQRX.cmd(gqrx_sock: gqrx_sock, cmd: 'p RDS_RADIOTEXT').to_s.strip.chomp
    }

    # Only update when we have valid new data
    if rds_resp[:rds_pi] != '0000' && rds_resp != last_resp
      # --- Enforce RDS specification bounds and clean formatting ---
      # PI: 16-bit code >>> exactly 4 uppercase hex digits, zero-padded
      rds_pi = rds_resp[:rds_pi].upcase
      rds_pi = rds_pi.rjust(4, '0')[0, 4]

      # PS: exactly 8 ASCII characters (pad short with spaces, truncate long)
      rds_ps = "#{rds_resp[:rds_ps_name]}        "[0, 8]

      # RadioText: strip trailing spaces (stations often pad to clear)
      rds_rt = rds_resp[:rds_radiotext].rstrip

      # Fixed prefix: always exactly 28 characters for predictable layout
      # Breakdown: "PI: " (4) + 4 hex (4) + " | PS: " (7) + 8 chars (8) + " | RT: " (7) = 28
      prefix = "Program ID: #{rds_pi} | Station Name: #{rds_ps} | Radio Txt: "

      # minimum visibility
      available_for_term = max_title_length - prefix.length
      available_for_term = [available_for_term, 10].max

      rt_display = rds_rt
      rt_display = "#{rt_display[0...available_for_term]}..." if rt_display.length > available_for_term

      msg = "#{prefix}#{rt_display}"
      spinner.update(status: msg)
      last_resp = rds_resp.dup
    end

    # Non-blocking check for ENTER key to exit
    if $stdin.wait_readable(0)
      begin
        char = $stdin.read_nonblock(1)
        break if char == skip_freq_char
      rescue IO::WaitReadable, EOFError
        # No-op
      end
    end

    sleep 0.01
  end
rescue StandardError => e
  spinner.error('Decoding failed') if defined?(spinner)
  raise e
ensure
  # Toggle RDS off and on to reset the decoder
  PWN::SDR::GQRX.cmd(
    gqrx_sock: gqrx_sock,
    cmd: 'U RDS 0',
    resp_ok: 'RPRT 0'
  )

  spinner.stop if defined?(spinner) && spinner
end

.helpObject

Display Usage for this Module



131
132
133
134
135
136
137
138
139
# File 'lib/pwn/sdr/decoder/rds.rb', line 131

public_class_method def self.help
  puts "USAGE:
    #{self}.decode(
      freq_obj: 'required - freq_obj returned from PWN::SDR::GQRX.init_freq method'
    )

    #{self}.authors
  "
end