-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathradar_mode.py
More file actions
135 lines (116 loc) · 5.26 KB
/
radar_mode.py
File metadata and controls
135 lines (116 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from PyPicoScenes.PyPicoScenes import *
from PyPicoScenes.buildFrames import *
import time
import random
import math
def get_call_back_dump(fileName="testCSI"):
# Python callback receives frame and saves it to file
def py_call_back_dump(frame):
print(f"dump a frame to {fileName}")
# Save frame to file
FrameDumper.getInstanceWithoutTime(fileName).dumpRxFrame(frame)
return True
return py_call_back_dump
def get_call_back_plot(nicName:str = "4"):
# Get CSILivePlotter instance for NIC with PHYPath_ID "4"
CSILivePlotter.getInstance("4").startPlotService()
# Set CSI magnitude to absolute value
absoluteStyle = CSILivePlotter.MagnitudePlotStyle(CSILivePlotter.MagnitudePlotStyle.Absolute)
# Alternative: Set CSI magnitude to logarithmic scale
# logStyle = CSILivePlotter.MagnitudePlotStyle(CSILivePlotter.MagnitudePlotStyle.Log)
CSILivePlotter.getInstance("4").setMagnitudePlotStyle(absoluteStyle)
plotter = CSILivePlotter.getInstance("4")
# Python callback receives frame and plot it
def py_call_back_plot(frame):
if plotter and plotter.isPlotServiceOn():
# Must call this function before plotting the frame
frame.csiSegment.getCSI().removeCSDAndInterpolateCSI()
plotter.plotFrameAsync(frame)
return True
return py_call_back_plot
def radar_mode(nicName:str="usrp", parameters=None):
assert parameters, "parameters can't be None"
# Start PicoScenes platform
picoscenes_start()
# Get network interface card
nic = getNic(nicName)
frontEnd = nic.getTypedFrontEnd[AbstractSDRFrontEnd]()
## Set transmission parameters
txChannelList = [0]
frontEnd.setTxChannels(txChannelList)
nic.getUserSpecifiedTxParameters().txcm = frontEnd.getTxChainMask()
if (frontEnd.getTxChainMask() != 0):
frontEnd.setTxpower(0.1)
frontEnd.setTxAntennas(["TX/RX"])
## Set reception parameters
rxChannelList = [0]
frontEnd.setRxChannels(rxChannelList)
if (frontEnd.getRxChainMask() != 0):
frontEnd.setRxGain(0.65)
frontEnd.setRxAntennas(["RX2"])
frontEnd.setNumThreads4RxDecoding(1)
## Apply preset configurations
frontEnd.applyPreset("TR_CBW_20_EHTSU", False)
frontEnd.setClockSource("internal")
frontEnd.setTimeSource("internal")
frontEnd.setCarrierFrequency(5955e6)
if (frontEnd.getRxChainMask() != 0):
frontEnd.setAGC(False)
## Enable radar mode
tmp = std.vector[std.array[std.uint8_t, 6]]()
tmp.push_back(MagicIntel123456)
nic.getFrontEnd().setDestinationMACAddressFilter(tmp)
nic.getTypedFrontEnd[AbstractSDRFrontEnd]().setFullDuplex(True)
nic.startRxService()
nic.startTxService()
# Register callbacks
# Register Python callbacks
call_backs = {
"call_back_dump" : get_call_back_dump(),
"call_back_plot" : get_call_back_plot(nicName),
}
for call_back_name, call_back in call_backs.items():
nic.registerGeneralHandler(call_back_name, call_back)
## Configure transmission parameters
frontEnd = nic.getFrontEnd()
round_repeat = parameters.round_repeat.value_or(1)
cf_repeat = parameters.cf_repeat.value_or(100)
tx_delay_us = parameters.tx_delay_us
tx_delayed_start = parameters.delayed_start_seconds.value_or(0)
sfList = enumerateSamplingRates(nic, parameters)
cfList = enumerateArbitraryCarrierFrequencies(nic, parameters)
prebuiltFrames = []
sessionId = random.randint(9999, 65535)
if tx_delayed_start > 0:
time.sleep(tx_delayed_start)
if parameters.useBatchAPI:
prebuiltFrames = buildBatchFrames(EchoProbePacketFrameType.SimpleInjectionFrameType, nic, parameters)
for ri in range(round_repeat):
for sf_value in sfList:
for cf_value in cfList:
if sf_value != frontEnd.getSamplingRate():
frontEnd.setSamplingRate(sf_value)
if parameters.delay_after_cf_change_ms.has_value():
time.sleep(parameters.delay_after_cf_change_ms.value() / 1e3)
if cf_value != frontEnd.getCarrierFrequency():
frontEnd.setCarrierFrequency(cf_value)
if parameters.delay_after_cf_change_ms.has_value():
time.sleep(parameters.delay_after_cf_change_ms.value() / 1e3)
if parameters.useBatchAPI:
framePoints = [cppyy.addressof(frame) for frame in prebuiltFrames]
repeats = math.ceil(1.0 * cf_repeat / framePoints.size())
nic.getFrontEnd().transmitFramesInBatch(framePoints, repeats)
else:
for _ in range(cf_repeat):
taskId = random.randint(9999, 65535)
txframe = buildBasicFrame(taskId, EchoProbePacketFrameType.SimpleInjectionFrameType, nic, parameters)
nic.transmitPicoScenesFrameSync(txframe)
time.sleep(parameters.tx_delay_us / 1e6)
time.sleep(10)
nic.stopRxService()
nic.stopTxService()
picoscenes_stop()
picoscenes_wait()
if __name__ == "__main__":
parameters = EchoProbeParameters()
radar_mode("usrp", parameters)