# This file is part of xrayutilities.
#
# xrayutilities is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2009-2010 Eugen Wintersberger <eugen.wintersberger@desy.de>
# Copyright (c) 2009-2023 Dominik Kriegner <dominik.kriegner@gmail.com>
# Copyright (C) 2019 Daniel Schick <schick.daniel@gmail.com>
"""
a class for observing a SPEC data file
Motivation:
SPEC files can become quite large. Therefore, subsequently reading
the entire file to extract a single scan is a quite cumbersome procedure.
This module is a proof of concept code to write a file observer starting
a reread of the file starting from a stored offset (last known scan position)
"""
import os.path
import re
import numpy
from .. import config, utilities
from ..exception import InputError
# relative imports from xrayutilities
from .helper import xu_h5open, xu_open
# define some uesfull regular expressions
SPEC_time_format = re.compile(r"\d\d:\d\d:\d\d")
SPEC_multi_blank = re.compile(r"\s+")
SPEC_multi_blank2 = re.compile(r"\s\s+")
# denotes a numeric value
SPEC_int_value = re.compile(r"[+-]?\d+")
SPEC_num_value = re.compile(
r"([+-]?\d*\.*\d*[eE]*[+-]*\d+|[+-]?[Ii][Nn][Ff]|[Nn][Aa][Nn])")
SPEC_dataline = re.compile(r"^[+-]*\d.*")
SPEC_scan = re.compile(r"^#S")
SPEC_initmoponames = re.compile(r"#O\d+")
SPEC_initmopopos = re.compile(r"#P\d+")
SPEC_datetime = re.compile(r"^#D")
SPEC_exptime = re.compile(r"^#T")
SPEC_nofcols = re.compile(r"^#N")
SPEC_colnames = re.compile(r"^#L")
SPEC_MCAFormat = re.compile(r"^#@MCA ")
SPEC_MCAChannels = re.compile(r"^#@CHANN")
SPEC_MCAChannelNames = re.compile(r"^#@DET_\d+")
SPEC_MCAline = re.compile(r"^@A")
SPEC_headerline = re.compile(r"^#")
SPEC_scanbroken = re.compile(r"#C[a-zA-Z0-9: .]*Scan aborted")
SPEC_scanresumed = re.compile(r"#C[a-zA-Z0-9: .]*Scan resumed")
SPEC_commentline = re.compile(r"#C")
SPEC_newheader = re.compile(r"^#E")
SPEC_errorbm20 = re.compile(r"^MI:")
scan_status_flags = ["OK", "NODATA", "ABORTED", "CORRUPTED"]
[docs]
class SPECScan:
"""
Represents a single SPEC scan. This class is usually not called by the
user directly but used via the SPECFile class.
"""
[docs]
def __init__(self, name, scannr, command, date, time, itime, colnames,
hoffset, doffset, fname, imopnames, imopvalues, scan_status):
"""
Constructor for the SPECScan class.
Parameters
----------
name : str
name of the scan
scannr : int
Number of the scan in the specfile
command : str
command used to write the scan
date : str
starting date of the scan
time : str
starting time of the scan
itime : int
integration time
colnames : list
list of names of the data columns
hoffset : int
file byte offset to the header of the scan
doffset : int
file byte offset to the data section of the scan
fname : str
file name of the SPEC file the scan belongs to
imopnames : list of str
motor names for the initial motor positions array
imopvalues : list
intial motor positions array
scan_status : {'OK', 'NODATA', 'CORRUPTED', 'ABORTED'}
scan status as string
"""
self.name = name # name of the scan
self.nr = scannr # number of the scan
self.command = command # command used to record the data
self.date = date # date the command has been sent
self.time = time # time the command has been sent
self.itime = itime # integration time of the scan
self.colnames = colnames # list with column names
self.hoffset = hoffset # file offset where the header data starts
self.doffset = doffset # file offset where the data section starts
self.fname = fname # full file name of the file holding the data
# flag to force resave to hdf5 file in Save2HDF5()
self.ischanged = True
self.header = []
self.fid = None
if scan_status in scan_status_flags:
self.scan_status = scan_status
else:
self.scan_status = "CORRUPTED"
if config.VERBOSITY >= config.INFO_ALL:
print("XU.io.spec.SPECScan: unknown scan status flag - "
"set to CORRUPTED")
# setup the initial motor positions dictionary - set the motor names
# dictionary holding the initial motor positions
self.init_motor_pos = {}
if len(imopnames) == len(imopvalues):
for i in range(len(imopnames)):
natmotname = utilities.makeNaturalName(imopnames[i])
self.init_motor_pos[
"INIT_MOPO_" + natmotname] = float(imopvalues[i])
else:
print("XU.io.spec.SPECScan: Warning: incorrect number of "
"initial motor positions in scan %03d" % (self.nr))
if config.VERBOSITY >= config.INFO_ALL:
print(imopnames)
print(imopvalues)
# ASSUME ORDER DID NOT CHANGE!! (which might be wrong)
# in fact this is sign for a broken spec file
# number of initial motor positions should not change without new
# file header!
for i in range(min(len(imopnames), len(imopvalues))):
natmotname = utilities.makeNaturalName(imopnames[i])
self.init_motor_pos[
"INIT_MOPO_" + natmotname] = float(imopvalues[i])
# read the rest of the positions into dummy INIT_MOPO__NONAME__%03d
for i in range(len(imopnames), len(imopvalues)):
self.init_motor_pos["INIT_MOPO___NONAME__%03d" % (i)] = \
float(imopvalues[i])
# some additional attributes for the MCA data
# False if scan contains no MCA data, True otherwise
self.has_mca = False
self.has_sardana_mca = False
self.mca_column_format = 0 # number of columns used to save MCA data
self.mca_channels = 0 # number of channels stored from the MCA
self.mca_channel_names = ['MCA'] # name of MCAs
self.mca_nof_lines = 0 # number of lines used to store MCA data
self.mca_start_channel = 0 # first channel of the MCA that is stored
self.mca_stop_channel = 0 # last channel of the MCA that is stored
# a numpy record array holding the data - this is set using by the
# ReadData method.
self.data = None
# check for duplicate values in column names
for i in range(len(self.colnames)):
name = self.colnames[i]
cnt = self.colnames.count(name)
if cnt > 1:
# have multiple entries
cnt = 1
for j in range(self.colnames.index(name) + 1,
len(self.colnames)):
if self.colnames[j] == name:
self.colnames[j] = name + "_%i" % cnt
cnt += 1
[docs]
def SetMCAParams(self, mca_column_format, mca_channels,
mca_start, mca_stop, mca_channel_names, has_sardana_mca):
"""
Set the parameters used to save the MCA data to the file. This method
calculates the number of lines used to store the MCA data from the
number of columns and the
Parameters
----------
mca_column_format : int
number of columns used to save the data
mca_channels : int
number of MCA channels stored
mca_start : int
first channel that is stored
mca_stop : int
last channel that is stored
mca_channel_names : list(str)
names of mca channels
has_sardana_mca : bool
True for Sardana MCA, False for SPEC MCA
"""
self.has_mca = True
self.has_sardana_mca = has_sardana_mca
self.mca_column_format = mca_column_format
self.mca_channels = mca_channels
self.mca_start_channel = mca_start
self.mca_stop_channel = mca_stop
if mca_channel_names != []:
self.mca_channel_names = mca_channel_names
self.mca_nb = len(self.mca_channel_names)
# calculate the number of lines per data point for the mca
self.mca_nof_lines = int(mca_channels / mca_column_format)
if mca_channels % mca_column_format != 0:
# some additional values have to be read
self.mca_nof_lines = self.mca_nof_lines + 1
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECScan.SetMCAParams: channel names: %s"
% self.mca_channel_names)
print("XU.io.SPECScan.SetMCAParams: number of channels: %d"
% self.mca_channels)
print("XU.io.SPECScan.SetMCAParams: number of columns: %d"
% self.mca_column_format)
print("XU.io.SPECScan.SetMCAParams: number of lines to read "
"for MCA: %d" % self.mca_nof_lines)
def __str__(self):
# build a proper string to print the scan information
str_rep = "|%4i|" % (self.nr)
str_rep = str_rep + "%50s|%10s|%10s|" % (self.command, self.time,
self.date)
if self.has_mca:
str_rep = str_rep + "MCA: %5i" % (self.mca_channels)
str_rep = str_rep + "\n"
return str_rep
[docs]
def ClearData(self):
"""
Delete the data stored in a scan after it is no longer
used.
"""
self.__delattr__("data")
self.data = None
[docs]
def ReadData(self):
"""
Set the data attribute of the scan class.
"""
if self.scan_status == "NODATA":
if config.VERBOSITY >= config.INFO_LOW:
print("XU.io.SPECScan.ReadData: %s has been aborted - "
"no data available!" % self.name)
self.data = None
return None
if not self.has_mca:
if config.VERBOSITY >= config.INFO_ALL:
print("XU.io.SPECScan.ReadData: scan %d contains no MCA data"
% self.nr)
with xu_open(self.fname) as self.fid:
# read header lines
self.fid.seek(self.hoffset, 0)
self.header = []
while self.fid.tell() < self.doffset:
line = self.fid.readline().decode('ascii', 'ignore')
self.header.append(line.strip())
self.fid.seek(self.doffset, 0)
# create dictionary to hold the data
if self.has_mca and self.has_sardana_mca:
type_desc = {"names": self.colnames + self.mca_channel_names,
"formats": len(self.colnames) * [numpy.float64] +
len(self.mca_channel_names)
* [(numpy.float64, self.mca_channels)]}
elif self.has_mca and not self.has_sardana_mca:
type_desc = {"names": self.colnames + self.mca_channel_names,
"formats": len(self.colnames) * [numpy.float64] +
len(self.mca_channel_names)
* [(numpy.uint32, self.mca_channels)]}
else:
type_desc = {"names": self.colnames,
"formats": len(self.colnames) * [numpy.float32]}
if config.VERBOSITY >= config.DEBUG:
print("xu.io.SPECScan.ReadData: type descriptor: "
f"{repr(type_desc)}")
record_list = [] # from this list the record array while be built
scalars_list = []
mca_tmp_list = []
mca_counter = 0
mca_list_counter = 0
scan_aborted_flag = False
for line in self.fid:
line = line.decode('ascii', 'ignore')
line = line.strip()
if not line:
continue
# check if scan is broken
if (SPEC_scanbroken.findall(line) != [] or
scan_aborted_flag):
# need to check next line(s) to know if scan is resumed
# read until end of comment block or end of file
if not scan_aborted_flag:
scan_aborted_flag = True
self.scan_status = "ABORTED"
if config.VERBOSITY >= config.INFO_ALL:
print(f"XU.io.SPECScan.ReadData: {self.name} "
"aborted")
continue
if SPEC_scanresumed.match(line):
self.scan_status = "OK"
scan_aborted_flag = False
if config.VERBOSITY >= config.INFO_ALL:
print(f"XU.io.SPECScan.ReadData: {self.name} "
"resumed")
continue
if SPEC_commentline.match(line):
continue
if SPEC_errorbm20.match(line):
print(line)
continue
break
if SPEC_headerline.match(line) or \
SPEC_commentline.match(line):
if SPEC_scanresumed.match(line):
continue
if SPEC_commentline.match(line):
continue
break
line_list = SPEC_num_value.findall(line)
line_list = map(numpy.float64, line_list)
if self.has_mca:
if SPEC_MCAline.match(line) \
and ((mca_list_counter == 0) and (mca_counter == 0)):
mca_list_counter += 1
mca_counter += 1
mca_tmp = []
mca_tmp_list = []
if (mca_list_counter > 0):
mca_counter += 1
mca_tmp += line_list
if mca_counter > self.mca_nof_lines:
mca_counter = 1
mca_tmp_list.append(mca_tmp.copy())
mca_tmp = []
mca_list_counter += 1
if mca_list_counter > self.mca_nb:
mca_list_counter = 0
else:
scalars_list = line_list
# check if the data should be written to the records
if (scalars_list != []) \
and (mca_tmp_list != []) \
and (mca_list_counter == 0):
record_list.append(tuple(list(scalars_list)
+ mca_tmp_list))
scalars_list = []
mca_tmp_list = []
mca_counter = 0
else:
record_list.append(tuple(line_list))
continue
# convert the data to numpy arrays
ncol = len(record_list[0])
if config.VERBOSITY >= config.INFO_LOW:
print("XU.io.SPECScan.ReadData: %s: %d %d %d"
% (self.name, len(record_list), ncol,
len(type_desc["names"])))
if ncol == len(type_desc["names"]):
try:
self.data = numpy.rec.fromrecords(record_list,
dtype=type_desc)
except ValueError:
self.scan_status = 'NODATA'
print("XU.io.SPECScan.ReadData: %s exception while "
"parsing data" % self.name)
else:
self.scan_status = 'NODATA'
[docs]
def plot(self, *args, **keyargs):
"""
Plot scan data to a matplotlib figure. If newfig=True a new
figure instance will be created. If logy=True (default is False)
the y-axis will be plotted with a logarithmic scale.
Parameters
----------
args : list
arguments for the plot: first argument is the name of x-value
column the following pairs of arguments are the y-value names and
plot styles allowed are 3, 5, 7,... number of arguments
keyargs : dict, optional
newfig : bool, optional
if True a new figure instance will be created otherwise an existing
one will be used
logy : bool, optional
if True a semilogy plot will be done
"""
flag, plt = utilities.import_matplotlib_pyplot('XU.io.SPECScan')
if not flag:
return
newfig = keyargs.get('newfig', True)
logy = keyargs.get('logy', False)
try:
xname = args[0]
xdata = self.data[xname]
except ValueError:
raise InputError("name of the x-axis is invalid!")
alist = args[1:]
leglist = []
if len(alist) % 2 != 0:
raise InputError("wrong number of yname/style arguments!")
if newfig:
plt.figure()
plt.subplots_adjust(left=0.08, right=0.95)
for i in range(0, len(alist), 2):
yname = alist[i]
ystyle = alist[i + 1]
try:
ydata = self.data[yname]
except ValueError:
raise InputError(f"no column with name {yname} exists!")
continue
if logy:
plt.semilogy(xdata, ydata, ystyle)
else:
plt.plot(xdata, ydata, ystyle)
leglist.append(yname)
plt.xlabel(f"{xname}")
plt.legend(leglist)
plt.title("scan %i %s\n%s %s"
% (self.nr, self.command, self.date, self.time))
# need to adjust axis limits properly
lim = plt.axis()
plt.axis([xdata.min(), xdata.max(), lim[2], lim[3]])
[docs]
def Save2HDF5(self, h5f, group="/", title="", optattrs=None, comp=True):
"""
Save a SPEC scan to an HDF5 file. The method creates a group with the
name of the scan and stores the data there as a table object with name
"data". By default the scan group is created under the root group of
the HDF5 file. The title of the scan group is ususally the scan
command. Metadata of the scan are stored as attributes to the scan
group. Additional custom attributes to the scan group can be passed as
a dictionary via the optattrs keyword argument.
Parameters
----------
h5f : file-handle or str
a HDF5 file object or its filename
group : str, optional
name or group object of the HDF5 group where to store the data
title : str, optional
a string with the title for the data, defaults to the name of scan
if empty
optattrs : dict, optional
a dictionary with optional attributes to store for the data
comp : bool, optional
activate compression - true by default
"""
if optattrs is None:
optattrs = {}
with xu_h5open(h5f, 'a') as h5:
# check if data object has been already written
if self.data is None:
raise InputError("XU.io.SPECScan.Save2HDF5: No data has been"
"read so far - call ReadData method of the "
"scan")
# parse keyword arguments:
if isinstance(group, str):
rootgroup = h5.get(group)
else:
rootgroup = group
if title != "":
group_title = title
else:
group_title = self.name
group_title = group_title.replace(".", "_")
# create the dataset and fill it
copy_count = 0
if self.ischanged and group_title in rootgroup:
del rootgroup[group_title]
raw_grp_title = group_title
# if the group already exists the name must be changed and
# another will be made to create the group.
while group_title in rootgroup:
group_title = raw_grp_title + f"_{copy_count}"
copy_count = copy_count + 1
g = rootgroup.create_group(group_title)
kwds = {'fletcher32': True}
if comp:
kwds['compression'] = 'gzip'
g.create_dataset("data", data=self.data, **kwds)
# write attribute data for the scan
g.attrs['ScanNumber'] = numpy.uint(self.nr)
g.attrs['Command'] = self.command
g.attrs['Date'] = self.date
g.attrs['Time'] = self.time
g.attrs['scan_status'] = self.scan_status
# write the initial motor positions as attributes
for key, val in self.init_motor_pos.items():
g.attrs[key] = float(val)
# if scan contains MCA data write also MCA parameters
g.attrs['has_mca'] = self.has_mca
g.attrs['mca_start_channel'] = numpy.uint(self.mca_start_channel)
g.attrs['mca_stop_channel'] = numpy.uint(self.mca_stop_channel)
g.attrs['mca_nof_channels'] = numpy.uint(self.mca_channels)
for k in optattrs:
g.attrs[k] = optattrs[k]
h5.flush()
[docs]
class SPECFile:
"""
This class represents a single SPEC file. The class provides
methodes for updateing an already opened file which makes it particular
interesting for interactive use.
"""
[docs]
def __init__(self, filename, path=""):
"""
SPECFile init routine
Parameters
----------
filename : str
filename of the spec file
path : str, optional
path to the specfile
"""
self.full_filename = os.path.join(path, filename)
self.filename = os.path.basename(self.full_filename)
# list holding scan objects
self.scan_list = []
self.fid = None
self.last_offset = 0
# initially parse the file
self.init_motor_names_fh = [] # this list will hold the names of the
# motors saved in initial motor positions given in the file header
self.init_motor_names_sh = [] # this list will hold the names of the
# motors saved in initial motor positions given in the scan header
self.init_motor_names = [] # this list will hold the names of the
# motors saved in initial motor positions from either the file or
# scan header
self.Parse()
def __getitem__(self, index):
"""
function to return the n-th scan in the spec-file. be aware that
numbering starts at 0! If scans are missing the relation between the
given number and the "number" of the returned scan might be not
trivial.
See also
--------
scanI
attributes of the SPECFile object, where 'I' is the scan number
"""
return self.scan_list[index]
def __getattr__(self, name):
"""
return scanX objects where X stands for the scan number in the SPECFile
which for this purpose is assumed to be unique. (otherwise the first
instance of scan number X is returned)
"""
if name.startswith("scan"):
index = name[4:]
try:
scannr = int(index)
except ValueError:
raise AttributeError("scannumber needs to be convertable to "
"integer")
# try to find the scan in the list of scans
s = None
for scan in self.scan_list:
if scan.nr == scannr:
s = scan
break
if s is not None:
return s
raise AttributeError("requested scan-number not found")
raise AttributeError(f"SPECFile has no attribute '{name}'")
def __len__(self):
return self.scan_list.__len__()
def __str__(self):
ostr = ""
for i, scan in enumerate(self.scan_list):
ostr = ostr + f"{i:5d}"
ostr = ostr + str(scan)
return ostr
[docs]
def Save2HDF5(self, h5f, comp=True, optattrs=None):
"""
Save the entire file in an HDF5 file. For that purpose a group is set
up in the root group of the file with the name of the file without
extension and leading path. If the method is called after an previous
update only the scans not written to the file meanwhile are saved.
Parameters
----------
h5f : file-handle or str
a HDF5 file object or its filename
comp : bool, optional
activate compression - true by default
"""
if optattrs is None:
optattrs = {}
with xu_h5open(h5f, 'a') as h5:
groupname = os.path.splitext(os.path.splitext(self.filename)[0])[0]
try:
g = h5.create_group(groupname)
except ValueError:
g = h5.get(groupname)
g.attrs['TITLE'] = f"Data of SPEC - File {self.filename}"
for k in optattrs:
g.attrs[k] = optattrs[k]
for s in self.scan_list:
if (((s.name not in g) or s.ischanged) and
s.scan_status != "NODATA"):
s.ReadData()
if s.data is not None:
s.Save2HDF5(h5, group=g, comp=comp)
s.ClearData()
s.ischanged = False
[docs]
def Update(self):
"""
reread the file and add newly added files. The parsing starts at the
data offset of the last scan gathered during the last parsing run.
"""
# reparse the SPEC file
if config.VERBOSITY >= config.INFO_LOW:
print("XU.io.SPECFile.Update: reparsing file for new scans ...")
# mark last found scan as not saved to force reread
idx = len(self.scan_list)
if idx > 0:
lastscan = self.scan_list[idx - 1]
lastscan.ischanged = True
self.Parse()
[docs]
def Parse(self):
"""
Parses the file from the starting at last_offset and adding found scans
to the scan list.
"""
with xu_open(self.full_filename) as self.fid:
# move to the last read position in the file
self.fid.seek(self.last_offset, 0)
scan_started = False
scan_has_mca = False
scan_has_sardana_mca = False
# list with the motors from whome the initial
# position is stored.
init_motor_values = []
mca_channel_names = []
if config.VERBOSITY >= config.DEBUG:
print('XU.io.SPECFile: start parsing')
for line in self.fid:
linelength = len(line)
line = line.decode('ascii', 'ignore')
if config.VERBOSITY >= config.DEBUG:
print(f'parsing line: {line}')
# remove trailing and leading blanks from the read line
line = line.strip()
# fill the list with the initial motor names in the header
if SPEC_newheader.match(line):
self.init_motor_names_fh = []
elif SPEC_initmoponames.match(line) and not scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found initial motor "
"names in file header")
line = SPEC_initmoponames.sub("", line)
line = line.strip()
self.init_motor_names_fh = self.init_motor_names_fh + \
SPEC_multi_blank2.split(line)
# if the line marks the beginning of a new scan
elif SPEC_scan.match(line) and not scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found scan")
line_list = SPEC_multi_blank.split(line)
scannr = int(line_list[1])
scancmd = "".join(" " + x + " " for x in line_list[2:])
scan_started = True
scan_has_mca = False
scan_header_offset = self.last_offset
scan_status = "OK"
# define some necessary variables which could be missing in
# the scan header
itime = numpy.nan
time = ""
date = ""
if config.VERBOSITY >= config.INFO_ALL:
print("XU.io.SPECFile.Parse: processing scan nr. %d "
"..." % scannr)
# set the init_motor_names to the ones found in
# the file header
self.init_motor_names_sh = []
self.init_motor_names = self.init_motor_names_fh
# if the line contains the date and time information
elif SPEC_datetime.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found date and time")
# fetch the time from the line data
time = SPEC_time_format.findall(line)[0]
line = SPEC_time_format.sub("", line)
line = SPEC_datetime.sub("", line)
date = SPEC_multi_blank.sub(" ", line).strip()
# if the line contains the integration time
elif SPEC_exptime.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found exposure time")
itime = float(SPEC_num_value.findall(line)[0])
# read the initial motor names in the scan header if present
elif SPEC_initmoponames.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found initial motor "
"names in scan header")
line = SPEC_initmoponames.sub("", line)
line = line.strip()
self.init_motor_names_sh = self.init_motor_names_sh + \
SPEC_multi_blank2.split(line)
self.init_motor_names = self.init_motor_names_sh
# read the initial motor positions
elif SPEC_initmopopos.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found initial motor "
"positions")
line = SPEC_initmopopos.sub("", line)
line = line.strip()
line_list = SPEC_multi_blank.split(line)
# sometimes initial motor position are simply empty and
# this should not lead to an error
try:
for value in line_list:
init_motor_values.append(float(value))
except ValueError:
pass
# if the line contains the number of colunmns
elif SPEC_nofcols.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found number of columns")
line = SPEC_nofcols.sub("", line)
line = line.strip()
nofcols = int(line)
# if the line contains the column names
elif SPEC_colnames.match(line) and scan_started:
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found column names")
line = SPEC_colnames.sub("", line)
line = line.strip()
col_names = SPEC_multi_blank.split(line)
# this is a fix in the case that blanks are allowed in
# motor and detector names (only a single balanks is
# supported meanwhile)
if len(col_names) > nofcols:
col_names = SPEC_multi_blank2.split(line)
elif SPEC_MCAFormat.match(line) and scan_started:
mca_col_number = int(SPEC_num_value.findall(
line)[0])
scan_has_mca = True
elif SPEC_MCAChannelNames.match(line) and scan_started:
mca_channel_names.append(line.split(" ")[1])
scan_has_sardana_mca = True
elif SPEC_MCAChannels.match(line) and scan_started:
line_list = SPEC_num_value.findall(line)
mca_channels = int(line_list[0])
mca_start = int(line_list[1])
mca_stop = int(line_list[2])
elif (SPEC_scanbroken.findall(line) != [] and
scan_started):
# this is the case when a scan is broken and no data has
# been written, but nevertheless a comment is in the file
# that tells us that the scan was aborted
scan_data_offset = self.last_offset
s = SPECScan("scan_%i" % (scannr), scannr, scancmd,
date, time, itime, col_names,
scan_header_offset, scan_data_offset,
self.full_filename, self.init_motor_names,
init_motor_values, "NODATA")
self.scan_list.append(s)
# reset control flags
scan_started = False
scan_has_mca = False
scan_has_sardana_mca = False
# reset initial motor positions flag
init_motor_values = []
mca_channel_names = []
elif (SPEC_dataline.match(line) or SPEC_MCAline.match(line)) \
and scan_started:
# this is now the real end of the header block. at this
# point we know that there is enough information about the
# scan
# save the data offset
scan_data_offset = self.last_offset
# create an SPECFile scan object and add it to the scan
# list the name of the group consists of the prefix scan
# and the number of the scan in the file - this shoule make
# it easier to find scans in the HDF5 file.
s = SPECScan("scan_%i" % (scannr), scannr, scancmd, date,
time, itime, col_names, scan_header_offset,
scan_data_offset, self.full_filename,
self.init_motor_names, init_motor_values,
scan_status)
if scan_has_mca:
s.SetMCAParams(mca_col_number, mca_channels, mca_start,
mca_stop, mca_channel_names,
scan_has_sardana_mca)
self.scan_list.append(s)
# reset control flags
scan_started = False
scan_has_mca = False
scan_has_sardana_mca = False
# reset initial motor positions flag
init_motor_values = []
mca_channel_names = []
elif SPEC_scan.match(line) and scan_started:
# this should only be the case when there are two
# consecutive file headers in the data file without any
# data or abort notice of the first scan; first store
# current scan as aborted then start new scan parsing
s = SPECScan("scan_%i" % (scannr), scannr, scancmd,
date, time, itime, col_names,
scan_header_offset, None,
self.full_filename, self.init_motor_names,
init_motor_values, "NODATA")
self.scan_list.append(s)
# reset control flags
scan_started = False
scan_has_mca = False
scan_has_sardana_mca = False
# reset initial motor positions flag
init_motor_values = []
mca_channel_names = []
# start parsing of new scan
if config.VERBOSITY >= config.DEBUG:
print("XU.io.SPECFile.Parse: found scan "
"(after aborted scan)")
line_list = SPEC_multi_blank.split(line)
scannr = int(line_list[1])
scancmd = "".join(" " + x + " " for x in line_list[2:])
scan_started = True
scan_has_mca = False
scan_has_sardana_mca = False
scan_header_offset = self.last_offset
scan_status = "OK"
self.init_motor_names_sh = []
self.init_motor_names = self.init_motor_names_fh
# store the position of the file pointer
self.last_offset += linelength
# if reading of the file is finished store the data offset of the
# last scan as the last offset for the next parsing run of the file
self.last_offset = self.scan_list[-1].doffset
[docs]
class SPECCmdLine:
[docs]
def __init__(self, n, prompt, cmdl, out=""):
self.linenumber = n
self.prompt = prompt
self.command = cmdl
self.out = out
def __str__(self):
ostr = "%i.%s> %s" % (self.linenumber, self.prompt, self.command)
return ostr
[docs]
class SPECLog:
"""
class to parse a SPEC log file to find the command history
"""
[docs]
def __init__(self, filename, prompt, path=""):
"""
init routine for a class to read a SPEC log file
Parameters
----------
filename : str
SPEC log file name
prompt : str
SPEC command prompt (e.g. 'PSIC' or 'SPEC')
path : str, optional
directory where the SPEC log can be found
"""
self.filename = filename
self.full_filename = os.path.join(path, self.filename)
self.prompt = prompt
self.prompt_re = re.compile(r"%s>" % self.prompt)
self.cmdl_list = []
self.line_counter = 0
self.Parse()
[docs]
def Parse(self):
with xu_open(self.full_filename, 'r') as fid:
for line in fid:
line = line.decode('ascii', 'ignore')
self.line_counter += 1
line = line.strip()
if self.prompt_re.findall(line):
[line, cmd] = self.prompt_re.split(line)
self.cmdl_list.append(SPECCmdLine(int(float(line)),
self.prompt, cmd))
def __getitem__(self, index):
"""
function to return the n-th cmd in the spec-log.
"""
return self.cmdl_list[index]
def __str__(self):
ostr = "%s with %d lines\n" % (self.filename, self.line_counter)
for cmd in self.cmdl_list:
ostr = ostr + cmd.__str__() + "\n"
return ostr
[docs]
def geth5_scan(h5f, scans, *args, **kwargs):
"""
function to obtain the angular cooridinates as well as intensity values
saved in an HDF5 file, which was created from a spec file by the Save2HDF5
method. Especially useful for reciprocal space map measurements.
further more it is possible to obtain even more positions from
the data file if more than two string arguments with its names are given
Parameters
----------
h5f : file-handle or str
file object of a HDF5 file opened using h5py or its filename
scans : int, tuple or list
number of the scans of the reciprocal space map
args : str, optional
names of the motors. to read reciprocal space maps measured in coplanar
diffraction give:
- omname: name of the omega motor (or its equivalent)
- ttname: name of the two theta motor (or its equivalent)
kwargs : dict, optional
samplename: str, optional
string with the hdf5-group containing the scan data if ommited the
first child node of h5f.root will be used
rettype: {'list', 'numpy'}, optional
how to return motor positions. by default a list of arrays is returned.
when rettype == 'numpy' a record array will be returned.
Returns
-------
[ang1, ang2, ...] : list
angular positions of the center channel of the position sensitive
detector (numpy.ndarray 1D), this list is omitted if no `args` are
given
MAP : ndarray
the data values as stored in the data file (includes the intensities
e.g. MAP['MCA']).
Examples
--------
>>> [om, tt], MAP = geth5_scan("h5file", 36,
... 'omega', 'gamma') # doctest: +SKIP
"""
with xu_h5open(h5f) as h5:
gname = kwargs.get("samplename", list(h5.keys())[0])
h5g = h5.get(gname)
if numpy.iterable(scans):
scanlist = scans
else:
scanlist = list([scans])
angles = dict.fromkeys(args)
for key in angles:
if not isinstance(key, str):
raise InputError("*arg values need to be strings with "
"motornames")
angles[key] = numpy.zeros(0)
buf = numpy.zeros(0)
MAP = numpy.zeros(0)
for nr in scanlist:
h5scan = h5g.get("scan_%d" % nr)
sdata = numpy.asarray(h5scan.get('data'))
if MAP.dtype == numpy.float64:
MAP.dtype = sdata.dtype
# append scan data to MAP, where all data are stored
MAP = numpy.append(MAP, sdata)
# check type of scan
notscanmotors = []
for i in range(len(args)):
motname = args[i]
try:
buf = sdata[motname]
scanshape = buf.shape
angles[motname] = numpy.concatenate((angles[motname], buf))
except ValueError:
notscanmotors.append(i)
if len(notscanmotors) == len(args):
scanshape = len(sdata)
for i in notscanmotors:
motname = args[i]
natmotname = utilities.makeNaturalName(motname)
buf = numpy.ones(scanshape) * \
h5scan.attrs[f"INIT_MOPO_{natmotname}"]
angles[motname] = numpy.concatenate((angles[motname], buf))
# create return values in correct order
def create_retval():
retval = []
for motname in args:
retval.append(angles[motname])
return retval
rettype = kwargs.get('rettype', 'list')
if rettype == 'numpy':
retval = numpy.core.records.fromarrays([angles[m] for m in args],
names=args)
else:
retval = create_retval()
if not args:
return MAP
return retval, MAP
[docs]
def getspec_scan(specf, scans, *args, **kwargs):
"""
function to obtain the angular cooridinates as well as intensity values
saved in a SPECFile. Especially useful to combine the data from multiple
scans.
further more it is possible to obtain even more positions from
the data file if more than two string arguments with its names are given
Parameters
----------
specf : SPECFile
file object
scans : int, tuple or list
number of the scans
args : str
names of the motors and counters
rettype : {'list', 'numpy'}, optional
how to return motor positions. by default a list of arrays is returned.
when rettype == 'numpy' a record array will be returned.
Returns
-------
[ang1, ang2, ...] : list
coordinates and counters from the SPEC file
Examples
--------
>>> [om, tt, cnt2] = getspec_scan(s, 36, 'omega', 'gamma',
... 'Counter2') # doctest: +SKIP
"""
if not args:
return InputError("no motor or counter names given")
if numpy.iterable(scans):
scanlist = scans
else:
scanlist = list([scans])
angles = dict.fromkeys(args)
for key in angles:
if not isinstance(key, str):
raise InputError("*arg values need to be strings with "
"motornames")
angles[key] = numpy.zeros(0)
buf = numpy.zeros(0)
for nr in scanlist:
sscan = specf.__getattr__("scan%d" % nr)
sscan.ReadData()
sdata = sscan.data
# check type of scan
notscanmotors = []
for i in range(len(args)):
motname = args[i]
try:
buf = sdata[motname]
scanshape = buf.shape
angles[motname] = numpy.concatenate((angles[motname], buf))
except ValueError:
notscanmotors.append(i)
if len(notscanmotors) == len(args):
scanshape = len(sdata)
for i in notscanmotors:
motname = args[i]
buf = (numpy.ones(scanshape) *
sscan.init_motor_pos["INIT_MOPO_%s"
% utilities.makeNaturalName(motname)])
angles[motname] = numpy.concatenate((angles[motname], buf))
# create return values in correct order
def create_retval():
retval = []
for motname in args:
retval.append(angles[motname])
return retval
rettype = kwargs.get('rettype', 'list')
if rettype == 'numpy':
retval = numpy.core.records.fromarrays([angles[m] for m in args],
names=args)
else:
retval = create_retval()
return retval