1
+ #! /usr/bin/python3
2
+ ##-------------------------------------------------------------------------------\
3
+ # tinySA_python
4
+ # './examples/plotting_waterfall_1.py'
5
+ # A waterfall plot example using matplotlib to plot multiple SCAN data over time
6
+ #
7
+ # Last update: June 22, 2025
8
+ ##-------------------------------------------------------------------------------\
9
+ # import tinySA library
10
+ # (NOTE: check library path relative to script path)
11
+ from src .tinySA_python import tinySA
12
+
13
+ # imports FOR THE EXAMPLE
14
+ import csv
15
+ import numpy as np
16
+ import matplotlib .pyplot as plt
17
+ import time
18
+ from datetime import datetime
19
+
20
+ def convert_data_to_arrays (start , stop , pts , data ):
21
+ # using the start and stop frequencies, and the number of points,
22
+ freq_arr = np .linspace (start , stop , pts ) # note that the decimals might go out to many places.
23
+ # you can truncate this because its only used
24
+ # for plotting in this example
25
+ # As of the Jan. 2024 build in some data returned with SWEEP or SCAN calls there is error data.
26
+ # https://groups.io/g/tinysa/topic/tinasa_ultra_sweep_command/104194367
27
+ # this shows up as "-:.000000e+01".
28
+ # TEMP fix - replace the colon character with a -10. This puts the 'filled in' points around the noise floor.
29
+ # more advanced filtering should be applied for actual analysis.
30
+
31
+ data1 = bytearray (data .replace (b"-:.0" , b"-10.0" ))
32
+
33
+ # get both values in each row returned (for reference)
34
+ #data_arr = [list(map(float, line.split())) for line in data.decode('utf-8').split('\n') if line.strip()]
35
+
36
+ # get first value in each returned row
37
+ data_arr = [float (line .split ()[0 ]) for line in data1 .decode ('utf-8' ).split ('\n ' ) if line .strip ()]
38
+ return freq_arr , data_arr
39
+
40
+ def collect_waterfall_data (tsa , start , stop , pts , outmask , num_scans , scan_interval ):
41
+
42
+ waterfall_data = [] # 2D array of scan data (time x frequency)
43
+ timestamps = []
44
+ freq_arr = None
45
+
46
+ print (f"Collecting { num_scans } scans with { scan_interval } s intervals..." )
47
+
48
+ for i in range (num_scans ):
49
+ print (f"Scan { i + 1 } /{ num_scans } " )
50
+
51
+ # Perform scan
52
+ data_bytes = tsa .scan (start , stop , pts , outmask )
53
+
54
+ # Convert to arrays
55
+ if freq_arr is None :
56
+ freq_arr , data_arr = convert_data_to_arrays (start , stop , pts , data_bytes )
57
+ else :
58
+ _ , data_arr = convert_data_to_arrays (start , stop , pts , data_bytes )
59
+
60
+ # Store data and timestamp
61
+ waterfall_data .append (data_arr )
62
+ timestamps .append (datetime .now ())
63
+
64
+ # Wait before next scan (except for last scan)
65
+ if i < num_scans - 1 :
66
+ time .sleep (scan_interval )
67
+
68
+ return freq_arr , np .array (waterfall_data ), timestamps
69
+
70
+ def plot_waterfall (freq_arr , waterfall_data , timestamps , start , stop ):
71
+ # Create figure with subplots
72
+ fig , (ax1 , ax2 ) = plt .subplots (2 , 1 , figsize = (12 , 10 ))
73
+
74
+ # Waterfall plot (main plot)
75
+ # Create time array for y-axis (scan number or elapsed time)
76
+ time_arr = np .arange (len (timestamps ))
77
+
78
+ # Create meshgrid for pcolormesh
79
+ freq_mesh , time_mesh = np .meshgrid (freq_arr , time_arr )
80
+
81
+ # Plot waterfall
82
+ im = ax1 .pcolormesh (freq_mesh / 1e9 , time_mesh , waterfall_data ,
83
+ shading = 'nearest' , cmap = 'viridis' )
84
+
85
+ ax1 .set_xlabel ('Frequency (GHz)' )
86
+ ax1 .set_ylabel ('Scan Number' )
87
+ ax1 .set_title (f'Waterfall Plot: { start / 1e9 :.1f} - { stop / 1e9 :.1f} GHz' )
88
+
89
+ # Add colorbar
90
+ cbar = plt .colorbar (im , ax = ax1 )
91
+ cbar .set_label ('Signal Strength (dBm)' )
92
+
93
+ # Latest scan plot (bottom subplot)
94
+ ax2 .plot (freq_arr / 1e9 , waterfall_data [- 1 ])
95
+ ax2 .set_xlabel ('Frequency (GHz)' )
96
+ ax2 .set_ylabel ('Signal Strength (dBm)' )
97
+ ax2 .set_title ('Latest Scan' )
98
+ ax2 .grid (True , alpha = 0.3 )
99
+
100
+ plt .tight_layout ()
101
+ return fig
102
+
103
+ # create a new tinySA object
104
+ tsa = tinySA ()
105
+ # set the return message preferences
106
+ tsa .set_verbose (True ) #detailed messages
107
+ tsa .set_error_byte_return (True ) #get explicit b'ERROR' if error thrown
108
+
109
+ # attempt to autoconnect
110
+ found_bool , connected_bool = tsa .autoconnect ()
111
+
112
+ # if port closed, then return error message
113
+ if connected_bool == False :
114
+ print ("ERROR: could not connect to port" )
115
+ else : # if port found and connected, then complete task(s) and disconnect
116
+ try :
117
+ # set scan values
118
+ start = int (1e9 ) # 1 GHz
119
+ stop = int (3e9 ) # 3 GHz
120
+ pts = 450 # sample points
121
+ outmask = 2 # get measured data (y axis)
122
+
123
+ # waterfall parameters
124
+ num_scans = 50 # number of scans to collect
125
+ scan_interval = 0.5 # seconds between scans
126
+
127
+ # collect waterfall data
128
+ freq_arr , waterfall_data , timestamps = collect_waterfall_data (
129
+ tsa , start , stop , pts , outmask , num_scans , scan_interval )
130
+
131
+ print ("Data collection complete!" )
132
+
133
+ # resume and disconnect
134
+ tsa .resume () #resume so screen isn't still frozen
135
+ tsa .disconnect ()
136
+
137
+ # processing after disconnect
138
+ print ("Creating waterfall plot..." )
139
+
140
+ # create waterfall plot
141
+ fig = plot_waterfall (freq_arr , waterfall_data , timestamps , start , stop )
142
+
143
+ # Save data out to .csv
144
+ filename = "waterfall_1_sample.csv"
145
+
146
+ # Create CSV with frequency headers and time/scan data
147
+ with open (filename , 'w' , newline = '' ) as csvfile :
148
+ writer = csv .writer (csvfile )
149
+
150
+ # Write header row with frequencies (in Hz)
151
+ header = ['Scan_Number' , 'Timestamp' ] + [f'{ freq :.0f} ' for freq in freq_arr ]
152
+ writer .writerow (header )
153
+
154
+ # Write data rows
155
+ for i , (scan_data , timestamp ) in enumerate (zip (waterfall_data , timestamps )):
156
+ row = [i + 1 , timestamp .strftime ('%Y-%m-%d %H:%M:%S.%f' )[:- 3 ]] + scan_data .tolist ()
157
+ writer .writerow (row )
158
+
159
+ print (f"Data saved to { filename } " )
160
+ print (f"CSV contains { len (waterfall_data )} scans with { len (freq_arr )} frequency points each" )
161
+
162
+ # show plot
163
+ plt .show ()
164
+
165
+ except KeyboardInterrupt :
166
+ print ("\n Scan interrupted by user" )
167
+ tsa .resume ()
168
+ tsa .disconnect ()
169
+ except Exception as e :
170
+ print (f"Error occurred: { e } " )
171
+ tsa .resume ()
172
+ tsa .disconnect ()
0 commit comments