Remote Spectrum Monitor User Guide : Programming with SCPI : I/Q Capture Block Mode : IQ Capture Data to Absolute Power Level
 
IQ Capture Data to Absolute Power Level
This is a sample Matlab/Octave program that shows how Raw IQ capture data can be related to an Absolute power level.
%Copy data into captureData array
%Separate the data and build the complex IQ vector.
%First column contains Q and the second I
quadphase = captureData(:,1);
inphase = captureData(:,2);
IQData = (inphase+1i*quadphase);
%Send SCPI Command [:SENSe]:iQ:SAMPle:CALibration:CONFiguration?
%and get absolute reference offset
absolute_ref_offset = -2.007958;
fs = 13.3e6;%Sampling frequency
n = 1024; %number of samples
%Perform fft
y = abs(fft(IQData, n));
y = fftshift(y);
%Scale fft output
y = y/n;
%To power
y = 20 * log10(y);
%To Absolute power level
y = y + absolute_ref_offset;
%Peak Value
peak = max(y);
f = fs*(-n/2:n/2-1)/n;
plot(f, y);
xlabel("Frequency in Hz"); % x-axis label
ylabel("Power in dBm"); % y-axis label
Stand Alone IQ
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 8 14:21:47 2015
 
@author: austin
"""
from rawsocketconnection import RawSocketConnection
from scpi_wrappers.sleepyspawrapper import SleepySpaWrapper
from helpers.unitconversions import *
 
from helpers.iqhelpers import getDeltas
import time
 
import itertools
from multiprocessing import Pool
try:
import matplotlib
matplotlib.rcParams['axes.formatter.limits'] = [-4,4]
from matplotlib import pyplot as plt
import numpy as np
 
plottingAvailable = True
except:
plottingAvailable = False
 
try:
from mayavi.mlab import quiver3d
from mpl_toolkits.mplot3d import axes3d
mayaviAvailable = True
except:
mayaviAvailable = False
 
mayaviAvailable = False
 
def plotIQData(samples, timestep, figureText):
print("Getting deltas")
indexes, timevals, dt, inphase, di, quadphase, dq = getDeltas(samples)
 
print("Sum of time steps: " + str(sum(dt[:-1])))
 
pointLow = len(timevals) -60
pointHigh = pointLow + 30
for point in range(pointLow, pointHigh):
print("Time: " + str(timevals[point] ) + " I: " + str(inphase[point]) + " Q: " + str(quadphase[point]))
 
assert(len(inphase) == len(quadphase))
 
minPoint =0
maxPoint = -1
step = 1
fig = plt.figure()
ax1 = fig.add_subplot(321)
ax1.scatter(inphase[minPoint:maxPoint:step], quadphase[minPoint:maxPoint:step], marker = '.')
ax1.set_xlabel("I")
ax1.set_ylabel("Q")
ax1.axis('equal')
 
ax2 = fig.add_subplot(322)
ax2.plot(indexes[minPoint:maxPoint:step], timevals[minPoint:maxPoint:step])
ax2.set_xlabel("sample index")
ax2.set_ylabel("Time since start in ns")
 
ax3 = fig.add_subplot(323)
ax3.scatter( indexes[minPoint:maxPoint:step], dt[minPoint:maxPoint:step],marker='.')
ax3.set_xlabel("sample index")
ax3.set_ylabel("time step in ns")
 
points = [x.Q + 1j * x.I for x in samples[:2**17]]
 
#averageTimeStep = np.mean(dt)
 
w = np.fft.fft(points)
 
freqs = np.fft.fftfreq(len(points), d=timestep )
 
ax4 = fig.add_subplot(324)
ax4.plot( freqs,w.real)
ax4.set_title("FFT")
ax4.set_xlabel("Frequency Hz")
ax4.set_ylabel("Amplitude (real part from FFT)")
 
ax5 = fig.add_subplot(325)
ax5.psd( points)
 
ax6 = fig.add_subplot(326)
ax6.text(0,0, figureText)
 
ax6.set_axis_off()
plt.tight_layout()
 
global mayaviAvailable
if mayaviAvailable:
step =1
minPoint = 0
 
obj = quiver3d( np.array(timevals[minPoint:maxPoint:step])*1 , inphase[minPoint:maxPoint:step],
quadphase[minPoint:maxPoint:step], np.array(dt[minPoint:maxPoint:step])*1,
di[minPoint:maxPoint:step], dq[minPoint:maxPoint:step])
 
"""
obj = plot3d( np.array(timevals[minPoint:maxPoint:step])*10000 , inphase[minPoint:maxPoint:step],
quadphase[minPoint:maxPoint:step])
 
obj = quiver3d( np.array(timevals[minPoint:maxPoint:step])*1 , inphase[minPoint:maxPoint:step],
quadphase[minPoint:maxPoint:step])
"""
 
plt.show()
 
def test_iqTransfer(dut):
"""
Verifies that the instrument can send IQ data
"""
assert isinstance(dut, SleepySpaWrapper)
#dut.preset()
#dut._connection.sendQueryCommand("*opc?")
#print(dut._connection.sendQueryCommand("*idn?"))
iqBandwidth = 3 *MHz
dut.setIqBandwidth(iqBandwidth)
dut._connection.sendWriteCommand("IQ:BITS 24")
dut._connection.sendWriteCommand("IQ:LENGth 0.1 s")
 
dut.triggerIqCapture()
dut.waitForOperationComplete()
sampleSource = dut.queryIQData()
 
timestep = sampleSource._nanosecondsPerSample *1e-9
print ("Timestep: " + str(timestep))
 
samples = [sample for sample in sampleSource.getSamples()]
 
figureText = "Number of samples: " +str(len(samples)) +'\n
figureText +="Time per tick in ns: {:.3f}\n".format(sampleSource._nanosecondsPerTick)
figureText +="DecimatedSampleRate: "+ str(sampleSource._decimatedSampleRate)+ '\n'
figureText +="Ticks per sample: " + str(sampleSource._tickPerSample)+ '\n'
figureText +="Time per sample in nanoseconds: {:.3f}\n".format(sampleSource._nanosecondsPerSample)
 
print (figureText)
plotIQData(samples, timestep, figureText)
 
def test_streamPlot(dut):
"""
Verifies that the instrument can send IQ data
"""
assert isinstance(dut, SleepySpaWrapper)
 
# This test requires a GPS fix since we need it for timestamp precision
dut.waitForGpsFix()
dut._connection.sendWriteCommand("IQ:BITS 8")
 
iqBandwidth = 1330 *kHz
dut.setIqBandwidth(iqBandwidth)
dut.setIqStreaming()
 
dut.triggerIqCapture()
 
numChunks =10
 
sampleSources = []
for chunkNumber in range(numChunks):
sampleSources.append( dut.queryIQData())
 
dut.abortSweep()
sampleSource = sampleSources[0]
 
timestep = sampleSource._nanosecondsPerSample *1e-9
print ("Timestep: " + str(timestep))
 
samples = []
for sampleSource in sampleSources:
for sample in sampleSource.getSamples():
samples.append(sample)
 
figureText = "Number of samples: " +str(len(samples)) +'\n'
figureText +="Time per tick in ns: " + str(sampleSource._nanosecondsPerTick)+ '\n'
figureText +="DecimatedSampleRate: "+ str(sampleSource._decimatedSampleRate)+ '\n'
figureText +="Ticks per sample: " + str(sampleSource._tickPerSample)+ '\n'
figureText +="Time per sample in nanoseconds: " + str(sampleSource._nanosecondsPerSample)+ '\n'
 
print ("plotting")
plotIQData(samples, timestep, figureText)
 
 
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Runs iq plotting test')
parser.add_argument('--address', dest='address', default= "172.26.202.128" )
args = parser.parse_args()
# edit this
ipAddress = args.address + ":9001"
connection = RawSocketConnection(ipAddress)
 
dut = SleepySpaWrapper(connection, None)
# Change this to test_streamPlot(dut) if you want the streaming example
test_iqTransfer(dut)
 
Histogram
from socketconnection import SocketConnection
from time import time
from math import log, ceil
connection = SocketConnection("172.26.201.208")
#connection._verbose = True
 
hist = {0:0}
count = 0
while True:
start = time()
connection.sendQueryCommand(":FREQ:START?")
connection.sendQueryCommand(":FREQ:STOP?")
connection.sendQueryCommand(":TRAC? 1")
connection.sendQueryCommand(":TRAC? 1")
connection.sendQueryCommand(":TRAC? 1")
stop = time()
delta = stop - start
#print( delta)
bucket = ceil(log(delta,2 ))
#print( bucket)
if bucket not in hist:
hist[bucket] = 1
else:
hist[bucket] += 1
count += 1
if count == 10000:
count = 0
print("\nTime bucket, count")
for key in sorted(hist):
print("{},{}".format(2**key, hist[key] ))