-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdemo-simple.py
306 lines (204 loc) · 8 KB
/
demo-simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import time
import numpy as np
import matplotlib.pyplot as plt
import avmu
'''
This is a small example that demonstrates how to use the AKELA AVMU to
acquire range data, in a maximally simple manner. The actual AVMU interface
code does not do any error handling, but for short acquisitions, this is
generally fine.
The structure is minimal:
- get_sweeps() acquires a set of frequency-domain data
- plot_sweeps() handles converting the data to time-domain, and then does some plotting.
- phase_correct_ifft() is the core of how to convert the AVMU's return data (which
is frequency-domain data) to time-domain data that is useful for ranging
purposes.
'''
AVMU_IP_ADDRESS = "192.168.1.223"
HOP_RATE = "HOP_15K"
START_F = 250
STOP_F = 2100
NUM_POINTS = 1024
SWEEP_COUNT = 100
# Cable delays are 0.65 nanoseconds each way (tx cable + rx cable)
CABLE_DELAYS = 0.65 * 2
def log_mag(inarr):
return 20 * np.log10(np.absolute(inarr))
def phase_correct_ifft(data, start_f, stop_f, npts, cable_delays, fft_window=np.hanning):
'''
To properly convert the frequency domain data returned by the AVMU into meaninful
time-domain data, we have to do a little bit of work.
Basically, the AVMU returns a set of I/Q samples for the return signal at each
measured frequency point. These can be converted to time-domain magnitude/phase
data using an inverse FFT.
However, you can't directly just shove the AVMU data into a iFFT, as the AVMU
data does not start at 0 Hz. Therefore, we have to zero pad the beginning
of the dataset so the frequency points returned by the AVMU are properly
aligned in the iFFT input.
Basically:
AVMU Output
V Sweep Start V Sweep End
| < n points > |
As we know the start frequency, stop frequency, and the number of points, we
can therefore calculate the effective step per frequency:
(stop freq - start freq) n points.
Then, we can calculate how many points we need to pad the beginning of the FFT with:
(start freq - 0) / step-per-point = zero-padding
We then wind up with:
V 0 hz V Sweep Start V Sweep End
| <zero padding> | < n points > |
As FFT calculations are generally MUCH more efficent when the FFT (or
iFFT) length is a size that is a power of two, we then zero pad the
end of the array as well, to make the overall point size equal to
the next-nearest-larger power of two:
V 0 hz V Sweep Start V Sweep End V 2^n pts
| <zero padding> | < n points > | < zero padding |
We can then perform a iFFT and get meanignful results.
The output of the iFFT is complex time-domain data. The time-step per bin
is a function of the frequency-step per bin in the input. Basically:
time_step = 1 / (frequency_step * fft_size * 2).
Finally, since the data is now in the time-domain, we shift the time axis by
the length of the delays in the cables, to get the real distance.
'''
data_len = data.shape[0]
# Apply windowing (needs to be an elementwise multiplication)
data = np.multiply(data, fft_window(data_len))
# Pad the start of the array for phase-correctness, and
# the end to make the calculation a power of N
step_val = abs(start_f - stop_f) / npts
start_padding = max(int(start_f/step_val), 0)
startsize = start_padding + data_len
sizes = [128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65535]
start_idx = 0
output_size = 0
while output_size < startsize and start_idx < len(sizes):
output_size = sizes[start_idx]
start_idx += 1
end_padding = max(output_size - startsize, 0)
# Default padding value is "0"
arr = np.pad(data, (start_padding, end_padding), mode='constant')
# Since we acquire directly as frequency domain, and we want to convert
# back to time-domain, we use a iFFT, rather then a normal FFT
fft_data = np.fft.ifft(arr)
# Chop off the negative time component (we don't care about it here)
# Someone doing fancier stuff could put it back.
fft_data = fft_data[:output_size//2]
fft_data = np.absolute(fft_data)
if start_f == stop_f:
return fft_data, np.array(range(fft_data.shape[0]))
pts = np.array(range(fft_data.shape[0]))
# Convert to hertz
step_val = step_val * 1e6
# And then to time
pts = pts * (1 / (len(pts) * step_val * 2))
# Finally to nanoseconds
pts = pts * 1e9
# And then we subtract off the cable delays.
# This will shift the zero time to a negative value, but that's fine, since
# we're treating the antenna plane as the "zero" time
pts = pts - cable_delays
return fft_data, pts
def waterfall_plot(y_axis_set, x_axis, time_per_frame):
# Since we want zero slowtime to be along the upper edge, we need to reverse the input array.
y_axis_set = np.flip(y_axis_set, 0)
y_axis_min = 0
y_axis_max = y_axis_set.shape[0] * time_per_frame
fig, ax = plt.subplots()
ax.imshow(y_axis_set,
aspect = 'auto',
extent = (x_axis[0], x_axis[-1], y_axis_max, y_axis_min),
)
ax.set(xlabel='Time (ns)', ylabel='Time Ago (s)', title='Time vs Slow-Time')
def plot_sweeps(frequencies, sweeps_set, time_per_frame):
# Messily unpack the sweep data structure.
# There are a unch of assumptions here, mostly based around the fact that
# we're only ever acquiring one path.
complex_sweeps = [
tmp[0][1]['data']
for
tmp
in
sweeps_set
]
fft_data = [
phase_correct_ifft(
data = tmp,
start_f = frequencies[0],
stop_f = frequencies[-1],
npts = len(frequencies),
cable_delays = CABLE_DELAYS
)
for
tmp
in
complex_sweeps
]
# Since we're not changing the sweep parameters, the time axis for the
# FFT data is constant across the entire dataset. Therefore, we can just pick
# any time-axis set, and use that everywhere.
fft_time_axis = fft_data[0][1]
fft_mag = [log_mag(tmp[0]) for tmp in fft_data]
sweeps_mag = [log_mag(tmp) for tmp in complex_sweeps]
fft_mag = np.vstack(fft_mag)
sweeps_mag = np.vstack(sweeps_mag)
fig, ax = plt.subplots()
for idx, sweep_array in enumerate(sweeps_mag[:10,...]):
ax.plot(frequencies, sweep_array, label='Sweep %s' % idx)
ax.set(xlabel='Frequency (MHz)', ylabel='Magnitude (dB)', title='Frequency vs Magnitude')
plt.legend()
fig, ax = plt.subplots()
for idx, fft_magnitude in enumerate(fft_mag[:10]):
ax.plot(fft_time_axis, fft_magnitude, label='FFT Sweep %s' % idx)
ax.set(xlabel='Time (ns)', ylabel='Magnitude (dB)', title='Time vs Magnitude')
plt.legend()
waterfall_plot(fft_mag, fft_time_axis, time_per_frame)
plt.show()
def get_sweeps(count):
'''
Acquire `count` frames from the AVMU, and return them (as well as some
frequency/time frame metadata)
'''
device = avmu.AvmuInterface()
device.setIPAddress(AVMU_IP_ADDRESS)
device.setIPPort(1027)
device.setTimeout(500)
device.setMeasurementType("PROG_ASYNC")
device.initialize()
device.setHopRate(HOP_RATE)
device.addPathToMeasure('AVMU_TX_PATH_0', 'AVMU_RX_PATH_1')
device.utilGenerateLinearSweep(startF_mhz=START_F, stopF_mhz=STOP_F, points=NUM_POINTS)
# Get the freqency plan that utilGenerateLinearSweep calculated given the
# hardware constraints.
frequencies = device.getFrequencies()
device.start()
assert device.getPreciseTimePerFrame() > 0
state = device.getState()
print("AVMU State", state)
sweeps = []
device.beginAsync()
start_time = time.time()
for _ in range(count):
device.measure()
sweep_data = device.extractAllPaths()
sweeps.append(sweep_data)
print("Acquired sweep (%s)" % (len(sweeps), ))
stop_time = time.time()
device.haltAsync()
time_per_frame = device.getPreciseTimePerFrame()
device.stop()
print("Acquired %s sweeps! Time per sweep: %s. Total time: %s (calculated: %s)" % (
len(sweeps),
time_per_frame,
stop_time - start_time,
len(sweeps) * time_per_frame
)
)
return frequencies, sweeps, time_per_frame
def run():
print("Running minimal demo!")
frequencies, sweeps, time_per_frame = get_sweeps(SWEEP_COUNT)
plot_sweeps(frequencies, sweeps, time_per_frame)
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.INFO)
run()