Source code for instruments.daq
# This file is part of the scanning-squid package.
#
# Copyright (c) 2018 Logan Bishop-Van Horn
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from qcodes.instrument.base import Instrument
from qcodes.instrument.parameter import Parameter, ArrayParameter
import nidaqmx
from nidaqmx.constants import AcquisitionType, TaskMode
from typing import Dict, Optional, Sequence, Any, Union
import numpy as np
[docs]class DAQAnalogInputVoltages(ArrayParameter):
"""Acquires data from one or several DAQ analog inputs.
"""
def __init__(self, name: str, task: Any, samples_to_read: int,
shape: Sequence[int], timeout: Union[float, int], **kwargs) -> None:
"""
Args:
name: Name of parameter (usually 'voltage').
task: nidaqmx.Task with appropriate analog inputs channels.
samples_to_read: Number of samples to read. Will be averaged based on shape.
shape: Desired shape of averaged array, i.e. (nchannels, target_points).
timeout: Acquisition timeout in seconds.
**kwargs: Keyword arguments to be passed to ArrayParameter constructor.
"""
super().__init__(name, shape, **kwargs)
self.task = task
self.nchannels, self.target_points = shape
self.samples_to_read = samples_to_read
self.timeout = timeout
[docs] def get_raw(self):
"""Averages data to get `self.target_points` points per channel.
If `self.target_points` == `self.samples_to_read`, no averaging is done.
"""
data_raw = np.array(self.task.read(number_of_samples_per_channel=self.samples_to_read, timeout=self.timeout))
return np.mean(np.reshape(data_raw, (self.nchannels, self.target_points, -1)), 2)
[docs]class DAQAnalogInputs(Instrument):
"""Instrument to acquire DAQ analog input data in a qcodes Loop or measurement.
"""
def __init__(self, name: str, dev_name: str, rate: Union[int, float], channels: Dict[str, int],
task: Any, min_val: Optional[float]=-5, max_val: Optional[float]=5,
clock_src: Optional[str]=None, samples_to_read: Optional[int]=2,
target_points: Optional[int]=None, timeout: Optional[Union[float, int]]=60, **kwargs) -> None:
"""
Args:
name: Name of instrument (usually 'daq_ai').
dev_name: NI DAQ device name (e.g. 'Dev1').
rate: Desired DAQ sampling rate per channel in Hz.
channels: Dict of analog input channel configuration.
task: fresh nidaqmx.Task to be populated with ai_channels.
min_val: minimum of input voltage range (-0.1, -0.2, -0.5, -1, -2, -5 [default], or -10)
max_val: maximum of input voltage range (0.1, 0.2, 0.5, 1, 2, 5 [default], or 10)
clock_src: Sample clock source for analog inputs. Default: None
samples_to_read: Number of samples to acquire from the DAQ
per channel per measurement/loop iteration.
Default: 2 (minimum number of samples DAQ will acquire in this timing mode).
target_points: Number of points per channel we want in our final array.
samples_to_read will be averaged down to target_points.
timeout: Acquisition timeout in seconds. Default: 60.
**kwargs: Keyword arguments to be passed to Instrument constructor.
"""
super().__init__(name, **kwargs)
if target_points is None:
if samples_to_read == 2: #: minimum number of samples DAQ will read in this timing mode
target_points = 1
else:
target_points = samples_to_read
self.rate = rate
nchannels = len(channels)
self.samples_to_read = samples_to_read
self.task = task
self.metadata.update({
'dev_name': dev_name,
'rate': '{} Hz'.format(rate),
'channels': channels})
for ch, idx in channels.items():
channel = '{}/ai{}'.format(dev_name, idx)
self.task.ai_channels.add_ai_voltage_chan(channel, ch, min_val=min_val, max_val=max_val)
if clock_src is None:
#: Use default sample clock timing: ai/SampleClockTimebase
self.task.timing.cfg_samp_clk_timing(
rate,
sample_mode=AcquisitionType.FINITE,
samps_per_chan=samples_to_read)
else:
#: Clock the inputs on some other clock signal, e.g. 'ao/SampleClock'
self.task.timing.cfg_samp_clk_timing(
rate,
source=clock_src,
sample_mode=AcquisitionType.FINITE,
samps_per_chan=samples_to_read
)
#: We need a parameter in order to acquire voltage in a qcodes Loop or Measurement
self.add_parameter(
name='voltage',
parameter_class=DAQAnalogInputVoltages,
task=self.task,
samples_to_read=samples_to_read,
shape=(nchannels, target_points),
timeout=timeout,
label='Voltage',
unit='V'
)
[docs] def clear_instances(self):
"""Clear instances of DAQAnalogInputs Instruments.
"""
for instance in self.instances():
self.remove_instance(instance)
[docs]class DAQAnalogOutputVoltage(Parameter):
"""Writes data to one or several DAQ analog outputs.
"""
def __init__(self, name: str, dev_name: str, idx: int, **kwargs) -> None:
"""
Args:
name: Name of parameter (usually 'voltage').
dev_name: DAQ device name (e.g. 'Dev1').
idx: AO channel inde.
**kwargs: Keyword arguments to be passed to ArrayParameter constructor.
"""
super().__init__(name, **kwargs)
self.dev_name = dev_name
self.idx = idx
self.voltage = '?'
def set_raw(self, voltage: Union[int, float]) -> None:
with nidaqmx.Task('daq_ao_task') as ao_task:
channel = '{}/ao{}'.format(self.dev_name, self.idx)
ao_task.ao_channels.add_ao_voltage_chan(channel, self.name)
ao_task.write(voltage, auto_start=True)
self.voltage = voltage
[docs]class DAQAnalogOutputs(Instrument):
"""Instrument to write DAQ analog output data in a qcodes Loop or measurement.
"""
def __init__(self, name: str, dev_name: str, channels: Dict[str, int], **kwargs) -> None:
"""
Args:
name: Name of instrument (usually 'daq_ao').
dev_name: NI DAQ device name (e.g. 'Dev1').
channels: Dict of analog output channel configuration.
**kwargs: Keyword arguments to be passed to Instrument constructor.
"""
super().__init__(name, **kwargs)
self.metadata.update({
'dev_name': dev_name,
'channels': channels})
#: We need parameters in order to write voltages in a qcodes Loop or Measurement
for ch, idx in channels.items():
self.add_parameter(
name='voltage_{}'.format(ch.lower()),
dev_name=dev_name,
idx=idx,
parameter_class=DAQAnalogOutputVoltage,
label='Voltage',
unit='V'
)
[docs] def clear_instances(self):
"""Clear instances of DAQAnalogOutputs Instruments.
"""
for instance in self.instances():
self.remove_instance(instance)