Skip to content

Commit cad6dab

Browse files
committed
added waterfall plot examples
1 parent 9a5b738 commit cad6dab

File tree

2 files changed

+344
-0
lines changed

2 files changed

+344
-0
lines changed

examples/plotting_waterfall_1.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#! /usr/bin/python3
2+
##-------------------------------------------------------------------------------\
3+
# tinySA_python
4+
# './examples/plotting_waterfall_1.py'
5+
# A waterfall plot example using matplotlib to plot multiple SCAN data over time
6+
#
7+
# Last update: June 22, 2025
8+
##-------------------------------------------------------------------------------\
9+
# import tinySA library
10+
# (NOTE: check library path relative to script path)
11+
from src.tinySA_python import tinySA
12+
13+
# imports FOR THE EXAMPLE
14+
import csv
15+
import numpy as np
16+
import matplotlib.pyplot as plt
17+
import time
18+
from datetime import datetime
19+
20+
def convert_data_to_arrays(start, stop, pts, data):
21+
# using the start and stop frequencies, and the number of points,
22+
freq_arr = np.linspace(start, stop, pts) # note that the decimals might go out to many places.
23+
# you can truncate this because its only used
24+
# for plotting in this example
25+
# As of the Jan. 2024 build in some data returned with SWEEP or SCAN calls there is error data.
26+
# https://groups.io/g/tinysa/topic/tinasa_ultra_sweep_command/104194367
27+
# this shows up as "-:.000000e+01".
28+
# TEMP fix - replace the colon character with a -10. This puts the 'filled in' points around the noise floor.
29+
# more advanced filtering should be applied for actual analysis.
30+
31+
data1 = bytearray(data.replace(b"-:.0", b"-10.0"))
32+
33+
# get both values in each row returned (for reference)
34+
#data_arr = [list(map(float, line.split())) for line in data.decode('utf-8').split('\n') if line.strip()]
35+
36+
# get first value in each returned row
37+
data_arr = [float(line.split()[0]) for line in data1.decode('utf-8').split('\n') if line.strip()]
38+
return freq_arr, data_arr
39+
40+
def collect_waterfall_data(tsa, start, stop, pts, outmask, num_scans, scan_interval):
41+
42+
waterfall_data = [] # 2D array of scan data (time x frequency)
43+
timestamps = []
44+
freq_arr = None
45+
46+
print(f"Collecting {num_scans} scans with {scan_interval}s intervals...")
47+
48+
for i in range(num_scans):
49+
print(f"Scan {i+1}/{num_scans}")
50+
51+
# Perform scan
52+
data_bytes = tsa.scan(start, stop, pts, outmask)
53+
54+
# Convert to arrays
55+
if freq_arr is None:
56+
freq_arr, data_arr = convert_data_to_arrays(start, stop, pts, data_bytes)
57+
else:
58+
_, data_arr = convert_data_to_arrays(start, stop, pts, data_bytes)
59+
60+
# Store data and timestamp
61+
waterfall_data.append(data_arr)
62+
timestamps.append(datetime.now())
63+
64+
# Wait before next scan (except for last scan)
65+
if i < num_scans - 1:
66+
time.sleep(scan_interval)
67+
68+
return freq_arr, np.array(waterfall_data), timestamps
69+
70+
def plot_waterfall(freq_arr, waterfall_data, timestamps, start, stop):
71+
# Create figure with subplots
72+
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
73+
74+
# Waterfall plot (main plot)
75+
# Create time array for y-axis (scan number or elapsed time)
76+
time_arr = np.arange(len(timestamps))
77+
78+
# Create meshgrid for pcolormesh
79+
freq_mesh, time_mesh = np.meshgrid(freq_arr, time_arr)
80+
81+
# Plot waterfall
82+
im = ax1.pcolormesh(freq_mesh/1e9, time_mesh, waterfall_data,
83+
shading='nearest', cmap='viridis')
84+
85+
ax1.set_xlabel('Frequency (GHz)')
86+
ax1.set_ylabel('Scan Number')
87+
ax1.set_title(f'Waterfall Plot: {start/1e9:.1f} - {stop/1e9:.1f} GHz')
88+
89+
# Add colorbar
90+
cbar = plt.colorbar(im, ax=ax1)
91+
cbar.set_label('Signal Strength (dBm)')
92+
93+
# Latest scan plot (bottom subplot)
94+
ax2.plot(freq_arr/1e9, waterfall_data[-1])
95+
ax2.set_xlabel('Frequency (GHz)')
96+
ax2.set_ylabel('Signal Strength (dBm)')
97+
ax2.set_title('Latest Scan')
98+
ax2.grid(True, alpha=0.3)
99+
100+
plt.tight_layout()
101+
return fig
102+
103+
# create a new tinySA object
104+
tsa = tinySA()
105+
# set the return message preferences
106+
tsa.set_verbose(True) #detailed messages
107+
tsa.set_error_byte_return(True) #get explicit b'ERROR' if error thrown
108+
109+
# attempt to autoconnect
110+
found_bool, connected_bool = tsa.autoconnect()
111+
112+
# if port closed, then return error message
113+
if connected_bool == False:
114+
print("ERROR: could not connect to port")
115+
else: # if port found and connected, then complete task(s) and disconnect
116+
try:
117+
# set scan values
118+
start = int(1e9) # 1 GHz
119+
stop = int(3e9) # 3 GHz
120+
pts = 450 # sample points
121+
outmask = 2 # get measured data (y axis)
122+
123+
# waterfall parameters
124+
num_scans = 50 # number of scans to collect
125+
scan_interval = 0.5 # seconds between scans
126+
127+
# collect waterfall data
128+
freq_arr, waterfall_data, timestamps = collect_waterfall_data(
129+
tsa, start, stop, pts, outmask, num_scans, scan_interval)
130+
131+
print("Data collection complete!")
132+
133+
# resume and disconnect
134+
tsa.resume() #resume so screen isn't still frozen
135+
tsa.disconnect()
136+
137+
# processing after disconnect
138+
print("Creating waterfall plot...")
139+
140+
# create waterfall plot
141+
fig = plot_waterfall(freq_arr, waterfall_data, timestamps, start, stop)
142+
143+
# Save data out to .csv
144+
filename = "waterfall_1_sample.csv"
145+
146+
# Create CSV with frequency headers and time/scan data
147+
with open(filename, 'w', newline='') as csvfile:
148+
writer = csv.writer(csvfile)
149+
150+
# Write header row with frequencies (in Hz)
151+
header = ['Scan_Number', 'Timestamp'] + [f'{freq:.0f}' for freq in freq_arr]
152+
writer.writerow(header)
153+
154+
# Write data rows
155+
for i, (scan_data, timestamp) in enumerate(zip(waterfall_data, timestamps)):
156+
row = [i+1, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]] + scan_data.tolist()
157+
writer.writerow(row)
158+
159+
print(f"Data saved to {filename}")
160+
print(f"CSV contains {len(waterfall_data)} scans with {len(freq_arr)} frequency points each")
161+
162+
# show plot
163+
plt.show()
164+
165+
except KeyboardInterrupt:
166+
print("\nScan interrupted by user")
167+
tsa.resume()
168+
tsa.disconnect()
169+
except Exception as e:
170+
print(f"Error occurred: {e}")
171+
tsa.resume()
172+
tsa.disconnect()

examples/plotting_waterfall_2.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#! /usr/bin/python3
2+
##-------------------------------------------------------------------------------\
3+
# tinySA_python
4+
# './examples/plotting_waterfall_2.py'
5+
# A waterfall plot example using matplotlib to plot multiple SCAN data over time
6+
#
7+
# Last update: June 22, 2025
8+
##-------------------------------------------------------------------------------\
9+
# import tinySA library
10+
# (NOTE: check library path relative to script path)
11+
from src.tinySA_python import tinySA
12+
13+
# imports FOR THE EXAMPLE
14+
import csv
15+
import numpy as np
16+
import matplotlib.pyplot as plt
17+
import time
18+
from datetime import datetime
19+
20+
def convert_data_to_arrays(start, stop, pts, data):
21+
# using the start and stop frequencies, and the number of points,
22+
freq_arr = np.linspace(start, stop, pts) # note that the decimals might go out to many places.
23+
# you can truncate this because its only used
24+
# for plotting in this example
25+
# As of the Jan. 2024 build in some data returned with SWEEP or SCAN calls there is error data.
26+
# https://groups.io/g/tinysa/topic/tinasa_ultra_sweep_command/104194367
27+
# this shows up as "-:.000000e+01".
28+
# TEMP fix - replace the colon character with a -10. This puts the 'filled in' points around the noise floor.
29+
# more advanced filtering should be applied for actual analysis.
30+
31+
data1 = bytearray(data.replace(b"-:.0", b"-10.0"))
32+
33+
# get both values in each row returned (for reference)
34+
#data_arr = [list(map(float, line.split())) for line in data.decode('utf-8').split('\n') if line.strip()]
35+
36+
# get first value in each returned row
37+
data_arr = [float(line.split()[0]) for line in data1.decode('utf-8').split('\n') if line.strip()]
38+
return freq_arr, data_arr
39+
40+
def collect_waterfall_data(tsa, start, stop, pts, outmask, num_scans, scan_interval):
41+
42+
waterfall_data = []
43+
timestamps = []
44+
freq_arr = None
45+
46+
print(f"Collecting {num_scans} scans with {scan_interval}s intervals...")
47+
48+
for i in range(num_scans):
49+
print(f"Scan {i+1}/{num_scans}")
50+
51+
# Perform scan
52+
data_bytes = tsa.scan(start, stop, pts, outmask)
53+
54+
# Convert to arrays
55+
if freq_arr is None:
56+
freq_arr, data_arr = convert_data_to_arrays(start, stop, pts, data_bytes)
57+
else:
58+
_, data_arr = convert_data_to_arrays(start, stop, pts, data_bytes)
59+
60+
# Store data and timestamp
61+
waterfall_data.append(data_arr)
62+
timestamps.append(datetime.now())
63+
64+
# Wait before next scan (except for last scan)
65+
if i < num_scans - 1:
66+
time.sleep(scan_interval)
67+
68+
return freq_arr, np.array(waterfall_data), timestamps
69+
70+
def plot_waterfall(freq_arr, waterfall_data, timestamps, start, stop):
71+
72+
# Create figure with subplots
73+
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
74+
75+
# Waterfall plot (main plot) - stacked line style
76+
vertical_offset = 10 # dB offset between traces
77+
78+
# Plot each scan as a line with vertical offset
79+
for i, scan_data in enumerate(waterfall_data):
80+
# Add vertical offset to separate traces
81+
offset_data = scan_data + (i * vertical_offset)
82+
ax1.plot(freq_arr/1e9, offset_data, 'b-', linewidth=0.5, alpha=0.8)
83+
84+
ax1.set_xlabel('Frequency (GHz)')
85+
ax1.set_ylabel('Signal Strength + Offset (dBm)')
86+
ax1.set_title(f'Waterfall Plot: {start/1e9:.1f} - {stop/1e9:.1f} GHz')
87+
ax1.grid(True, alpha=0.3)
88+
89+
# Invert y-axis so newest scans are at top (like traditional waterfall)
90+
ax1.invert_yaxis()
91+
92+
# Latest scan plot (bottom subplot)
93+
ax2.plot(freq_arr/1e9, waterfall_data[-1], 'b-', linewidth=1)
94+
ax2.set_xlabel('Frequency (GHz)')
95+
ax2.set_ylabel('Signal Strength (dBm)')
96+
ax2.set_title('Latest Scan')
97+
ax2.grid(True, alpha=0.3)
98+
99+
plt.tight_layout()
100+
return fig
101+
102+
# create a new tinySA object
103+
tsa = tinySA()
104+
# set the return message preferences
105+
tsa.set_verbose(True) #detailed messages
106+
tsa.set_error_byte_return(True) #get explicit b'ERROR' if error thrown
107+
108+
# attempt to autoconnect
109+
found_bool, connected_bool = tsa.autoconnect()
110+
111+
# if port closed, then return error message
112+
if connected_bool == False:
113+
print("ERROR: could not connect to port")
114+
else: # if port found and connected, then complete task(s) and disconnect
115+
try:
116+
# set scan values
117+
start = int(1e9) # 1 GHz
118+
stop = int(3e9) # 3 GHz
119+
pts = 450 # sample points
120+
outmask = 2 # get measured data (y axis)
121+
122+
# waterfall parameters
123+
num_scans = 50 # number of scans to collect
124+
scan_interval = 0.5 # seconds between scans
125+
126+
# collect waterfall data
127+
freq_arr, waterfall_data, timestamps = collect_waterfall_data(
128+
tsa, start, stop, pts, outmask, num_scans, scan_interval)
129+
130+
print("Data collection complete!")
131+
132+
# resume and disconnect
133+
tsa.resume() #resume so screen isn't still frozen
134+
tsa.disconnect()
135+
136+
# processing after disconnect
137+
print("Creating waterfall plot...")
138+
139+
# create waterfall plot
140+
fig = plot_waterfall(freq_arr, waterfall_data, timestamps, start, stop)
141+
142+
143+
# Save the data to CSV
144+
filename = "waterfall_2_sample.csv"
145+
146+
# Create CSV with frequency headers and time/scan data
147+
with open(filename, 'w', newline='') as csvfile:
148+
writer = csv.writer(csvfile)
149+
150+
# Write header row with frequencies (in Hz)
151+
header = ['Scan_Number', 'Timestamp'] + [f'{freq:.0f}' for freq in freq_arr]
152+
writer.writerow(header)
153+
154+
# Write data rows
155+
for i, (scan_data, timestamp) in enumerate(zip(waterfall_data, timestamps)):
156+
row = [i+1, timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]] + scan_data.tolist()
157+
writer.writerow(row)
158+
159+
print(f"Data saved to {filename}")
160+
print(f"CSV contains {len(waterfall_data)} scans with {len(freq_arr)} frequency points each")
161+
162+
# show plot
163+
plt.show()
164+
165+
except KeyboardInterrupt:
166+
print("\nScan interrupted by user")
167+
tsa.resume()
168+
tsa.disconnect()
169+
except Exception as e:
170+
print(f"Error occurred: {e}")
171+
tsa.resume()
172+
tsa.disconnect()

0 commit comments

Comments
 (0)