# This file is part of the scanning-squid package.
#
# Copyright (c) 2018 Logan Bishop-Van Horn
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#: Various Python utilities
from typing import Dict, List, Sequence, Any, Union, Tuple
#: Qcodes for running measurements and saving data
import qcodes as qc
#: NI DAQ library
import nidaqmx
from nidaqmx.constants import AcquisitionType
#: scanning-squid modules
from instruments.daq import DAQAnalogInputs
from plots import ScanPlot
from .microscope import Microscope
import utils
#: Pint for manipulating physical units
from pint import UnitRegistry
ureg = UnitRegistry()
#: Tell UnitRegistry instance what a Phi0 is, and that Ohm = ohm
with open('squid_units.txt', 'w') as f:
f.write('Phi0 = 2.067833831e-15 * Wb\n')
f.write('Ohm = ohm\n')
ureg.load_definitions('./squid_units.txt')
import logging
log = logging.getLogger(__name__)
[docs]class SusceptometerMicroscope(Microscope):
"""Scanning SQUID susceptometer microscope class.
"""
def __init__(self, config_file: str, temp: str, ureg: Any=ureg, log_level: Any=logging.INFO,
log_name: str=None, **kwargs) -> None:
"""
Args:
config_file: Path to microscope configuration JSON file.
temp: 'LT' or 'RT', depending on whether the microscope is cold or not.
Sets the voltage limits for the scanner and Attocubes.
ureg: pint UnitRegistry for managing physical units.
log_level: e.g. logging.DEBUG or logging.INFO
log_name: Log file will be saved as logs/{log_name}.log.
Default is the name of the microscope configuration file.
**kwargs: Keyword arguments to be passed to Station constructor.
"""
super().__init__(config_file, temp, ureg, log_level, log_name, **kwargs)
[docs] def get_prefactors(self, measurement: Dict[str, Any], update: bool=True) -> Dict[str, Any]:
"""For each channel, calculate prefactors to convert DAQ voltage into real units.
Args:
measurement: Dict of measurement parameters as defined
in measurement configuration file.
update: Whether to query instrument parameters or simply trust the
latest values (should this even be an option)?
Returns:
Dict[str, pint.Quantity]: prefactors
Dict of {channel_name: prefactor} where prefactor is a pint Quantity.
"""
mod_width = self.Q_(self.SQUID.metadata['modulation_width'])
prefactors = {}
for ch in measurement['channels']:
prefactor = 1
if ch == 'MAG':
prefactor /= mod_width
elif ch in ['SUSCX', 'SUSCY']:
r_lead = self.Q_(measurement['channels'][ch]['r_lead'])
snap = getattr(self, 'SUSC_lockin').snapshot(update=update)['parameters']
susc_sensitivity = snap['sensitivity']['value']
amp = snap['amplitude']['value'] * self.ureg(snap['amplitude']['unit'])
#: The factor of 10 here is because SR830 output gain is 10/sensitivity
prefactor *= (r_lead / amp) / (mod_width * 10 / susc_sensitivity)
elif ch == 'CAP':
snap = getattr(self, 'CAP_lockin').snapshot(update=update)['parameters']
cap_sensitivity = snap['sensitivity']['value']
#: The factor of 10 here is because SR830 output gain is 10/sensitivity
prefactor /= (self.Q_(self.scanner.metadata['cantilever']['calibration']) * 10 / cap_sensitivity)
prefactor /= measurement['channels'][ch]['gain']
prefactors.update({ch: prefactor.to('{}/V'.format(measurement['channels'][ch]['unit']))})
return prefactors
[docs] def scan_surface(self, scan_params: Dict[str, Any]) -> None:
"""
Scan the current surface while acquiring data in the channels defined in
measurement configuration file (e.g. MAG, SUSCX, SUSCY, CAP).
Args:
scan_params: Dict of scan parameters as defined
in measuremnt configuration file.
Returns:
None
"""
if not self.atto.surface_is_current:
raise RuntimeError('Surface is not current. Aborting scan.')
surface_type = scan_params['surface_type'].lower()
if surface_type not in ['plane', 'surface']:
raise ValueError('surface_type must be "plane" or "surface".')
old_pos = self.scanner.position()
daq_config = self.config['instruments']['daq']
ao_channels = daq_config['channels']['analog_outputs']
ai_channels = daq_config['channels']['analog_inputs']
meas_channels = scan_params['channels']
channels = {}
for ch in meas_channels:
channels.update({ch: ai_channels[ch]})
nchannels = len(channels.keys())
daq_name = daq_config['name']
#: DAQ AI sampling rate is divided amongst all active AI channels
daq_rate = self.Q_(daq_config['rate']).to('Hz').magnitude / nchannels
fast_ax = scan_params['fast_ax'].lower()
slow_ax = 'x' if fast_ax == 'y' else 'y'
pix_per_line = scan_params['scan_size'][fast_ax]
line_duration = pix_per_line * self.ureg('pixels') / self.Q_(scan_params['scan_rate'])
pts_per_line = int(daq_rate * line_duration.to('s').magnitude)
height = self.Q_(scan_params['height']).to('V').magnitude
if 1 / self.Q_(scan_params['scan_rate']).to('pixels/s').magnitude < self.SUSC_lockin.time_constant():
warning = 'Averaging time per pixel is less than the SUSC_lockin time constant. '
warning += 'For reliable susceptibility data, averaging time per pixel should be '
warning += 'significantly greater than the SUSC_lockin time constant.'
log.warning(warning)
scan_vectors = utils.make_scan_vectors(scan_params, self.ureg)
#scan_grids = utils.make_scan_grids(scan_vectors, slow_ax, fast_ax,
# pts_per_line, plane, height)
plane = self.scanner.metadata['plane']
if surface_type == 'plane':
scan_grids = utils.make_scan_surface(surface_type, scan_vectors, slow_ax, fast_ax,
pts_per_line, plane, height)
else:
scan_grids = utils.make_scan_surface(surface_type, scan_vectors, slow_ax, fast_ax,
pts_per_line, plane, height, interpolator=self.scanner.surface_interp)
utils.validate_scan_params(self.scanner.metadata, scan_params, scan_grids,
pix_per_line, pts_per_line, self.temp, self.ureg, log)
self.scanner.goto([scan_grids[axis][0][0] for axis in ['x', 'y', 'z']])
self.set_lockins(scan_params)
#: get channel prefactors in pint Quantity form
prefactors = self.get_prefactors(scan_params)
#: get channel prefactors in string form so they can be saved in metadata
prefactor_strs = {}
for ch, prefac in prefactors.items():
unit = scan_params['channels'][ch]['unit']
pre = prefac.to('{}/V'.format(unit))
prefactor_strs.update({ch: '{} {}'.format(pre.magnitude, pre.units)})
ai_task = nidaqmx.Task('scan_plane_ai_task')
self.remove_component('daq_ai')
if hasattr(self, 'daq_ai'):
#self.daq_ai.clear_instances()
self.daq_ai.close()
self.daq_ai = DAQAnalogInputs('daq_ai', daq_name, daq_rate, channels, ai_task,
samples_to_read=pts_per_line, target_points=pix_per_line,
#: Very important to synchronize AOs and AIs
clock_src='ao/SampleClock')
self.add_component(self.daq_ai)
slow_ax_position = getattr(self.scanner, 'position_{}'.format(slow_ax))
slow_ax_start = scan_vectors[slow_ax][0]
slow_ax_end = scan_vectors[slow_ax][-1]
slow_ax_step = scan_vectors[slow_ax][1] - scan_vectors[slow_ax][0]
#: There is probably a counter built in to qc.Loop, but I couldn't find it
loop_counter = utils.Counter()
scan_plot = ScanPlot(scan_params, self.ureg)
loop = qc.Loop(slow_ax_position.sweep(start=slow_ax_start,
stop=slow_ax_end,
step=slow_ax_step), delay=0.1
).each(
#: Create AO task and queue data to be written to AOs
qc.Task(self.scanner.scan_line, scan_grids, ao_channels, daq_rate, loop_counter),
#: Start AI task; acquisition won't start until AO task is started
qc.Task(ai_task.start),
#: Start AO task
qc.Task(self.scanner.control_ao_task, 'start'),
#: Acquire voltage from all active AI channels
self.daq_ai.voltage,
qc.Task(ai_task.wait_until_done),
qc.Task(self.scanner.control_ao_task, 'wait_until_done'),
qc.Task(ai_task.stop),
#: Stop and close AO task so that AOs can be used for goto
qc.Task(self.scanner.control_ao_task, 'stop'),
qc.Task(self.scanner.control_ao_task, 'close'),
qc.Task(self.scanner.goto_start_of_next_line, scan_grids, loop_counter),
#: Update and save plot
qc.Task(scan_plot.update, qc.loops.active_data_set, loop_counter),
qc.Task(scan_plot.save),
qc.Task(loop_counter.advance)
).then(
qc.Task(ai_task.stop),
qc.Task(ai_task.close),
qc.Task(self.daq_ai.close),
#qc.Task(self.daq_ai.clear_instances),
qc.Task(self.scanner.goto, old_pos),
#qc.Task(self.CAP_lockin.amplitude, 0.004),
#qc.Task(self.SUSC_lockin.amplitude, 0.004)
)
#: loop.metadata will be saved in DataSet
loop.metadata.update(scan_params)
loop.metadata.update({'prefactors': prefactor_strs})
for idx, ch in enumerate(meas_channels):
loop.metadata['channels'][ch].update({'idx': idx})
data = loop.get_data_set(name=scan_params['fname'])
#: Run the loop
try:
loop.run()
log.info('Scan completed. DataSet saved to {}.'.format(data.location))
#: If loop is aborted by user:
except KeyboardInterrupt:
log.warning('Scan aborted by user. Going to [0,0,0] V. DataSet saved to {}.'.format(data.location))
finally:
try:
#: Stop 'scan_plane_ai_task' so that we can read our current position
ai_task.stop()
ai_task.close()
#: If there's an active AO task, close it so that we can use goto
self.scanner.control_ao_task('stop')
self.scanner.control_ao_task('close')
self.remove_component('daq_ai')
except:
pass
self.scanner.goto([0, 0, 0])
#self.CAP_lockin.amplitude(0.004)
#self.SUSC_lockin.amplitude(0.004)
utils.scan_to_mat_file(data, real_units=True, interpolator=self.scanner.surface_interp)
#return data, scan_plot