Source code for xrayutilities.normalize

# This file is part of xrayutilities.
#
# xrayutilities is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# Copyright (c) 2010-2021, 2023 Dominik Kriegner <dominik.kriegner@gmail.com>

"""
module to provide functions that perform block averaging
of intensity arrays to reduce the amount of data (mainly
for PSD and CCD measurements

and

provide functions for normalizing intensities for

* count time
* absorber (user-defined function)
* monitor
* flatfield correction
"""

import numpy

from . import config, cxrayutilities, math, utilities
from .exception import InputError


[docs] def blockAverage1D(data, Nav): """ perform block average for 1D array/list of Scalar values all data are used. at the end of the array a smaller cell may be used by the averaging algorithm Parameters ---------- data : array-like data which should be contracted (length N) Nav : int number of values which should be averaged Returns ------- ndarray block averaged numpy array of data type numpy.double (length ceil(N/Nav)) """ if not isinstance(data, (numpy.ndarray, list)): raise TypeError("first argument data must be of type list or " "numpy.ndarray") data = numpy.array(data, dtype=numpy.double) block_av = cxrayutilities.block_average1d(data, Nav) return block_av
[docs] def blockAverage2D(data2d, Nav1, Nav2, **kwargs): """ perform a block average for 2D array of Scalar values all data are used therefore the margin cells may differ in size Parameters ---------- data2d : ndarray array of 2D data shape (N, M) Nav1, Nav2 : int a field of (Nav1 x Nav2) values is contracted kwargs : dict, optional optional keyword argument roi : tuple or list, optional region of interest for the 2D array. e.g. [20, 980, 40, 960], reduces M, and M! Returns ------- ndarray block averaged numpy array with type numpy.double with shape (ceil(N/Nav1), ceil(M/Nav2)) """ if not isinstance(data2d, (numpy.ndarray)): raise TypeError("first argument data2d must be of type numpy.ndarray") roi = kwargs.get('roi', [0, data2d.shape[0], 0, data2d.shape[1]]) data = numpy.array(data2d[roi[0]:roi[1], roi[2]:roi[3]], dtype=numpy.double) if config.VERBOSITY >= config.DEBUG: N, M = (roi[1] - roi[0], roi[3] - roi[2]) print(f"xu.normalize.blockAverage2D: roi: {str(roi)}") print("xu.normalize.blockAverage2D: Nav1, 2: %d,%d" % (Nav1, Nav2)) print("xu.normalize.blockAverage2D: number of points: (%d,%d)" % (numpy.ceil(N / float(Nav1)), numpy.ceil(M / float(Nav2)))) block_av = cxrayutilities.block_average2d(data, Nav1, Nav2, config.NTHREADS) return block_av
[docs] def blockAveragePSD(psddata, Nav, **kwargs): """ perform a block average for serveral PSD spectra all data are used therefore the last cell used for averaging may differ in size Parameters ---------- psddata : ndarray array of 2D data shape (Nspectra, Nchannels) Nav : int number of channels which should be averaged kwargs : dict, optional optional keyword argument roi : tuple or list region of interest for the 2D array. e.g. [20, 980] Nchannels = 980-20 Returns ------- ndarray block averaged psd spectra as numpy array with type numpy.double of shape (Nspectra , ceil(Nchannels/Nav)) """ if not isinstance(psddata, (numpy.ndarray)): raise TypeError("first argument psddata must be of type numpy.ndarray") roi = kwargs.get('roi', [0, psddata.shape[1]]) data = numpy.array(psddata[:, roi[0]:roi[1]], dtype=numpy.double) block_av = cxrayutilities.block_average_PSD(data, Nav, config.NTHREADS) return block_av
[docs] def blockAverageCCD(data3d, Nav1, Nav2, **kwargs): """ perform a block average for 2D frames inside a 3D array. all data are used therefore the margin cells may differ in size Parameters ---------- data3d : ndarray array of 3D data shape (Nframes, N, M) Nav1, Nav2 : int a field of (Nav1 x Nav2) values is contracted kwargs : dict, optional optional keyword argument roi : tuple or list, optional region of interest for the 2D array. e.g. [20, 980, 40, 960], reduces M, and M! Returns ------- ndarray block averaged numpy array with type numpy.double with shape (Nframes, ceil(N/Nav1), ceil(M/Nav2)) """ if not isinstance(data3d, (numpy.ndarray)): raise TypeError("first argument data3d must be of type numpy.ndarray") roi = kwargs.get('roi', [0, data3d.shape[1], 0, data3d.shape[2]]) data = numpy.array(data3d[:, roi[0]:roi[1], roi[2]:roi[3]], dtype=numpy.double) if config.VERBOSITY >= config.DEBUG: N, M = (roi[1] - roi[0], roi[3] - roi[2]) print(f"xu.normalize.blockAverageCCD: roi: {str(roi)}") print("xu.normalize.blockAverageCCD: Nav1, 2: %d,%d" % (Nav1, Nav2)) print("xu.normalize.blockAverageCCD: number of points: (%d,%d)" % (numpy.ceil(N / float(Nav1)), numpy.ceil(M / float(Nav2)))) block_av = cxrayutilities.block_average_CCD(data, Nav1, Nav2, config.NTHREADS) return block_av
# ##################################### # # Intensity correction class ## # #####################################
[docs] class IntensityNormalizer: """ generic class for correction of intensity (point detector, or MCA, single CCD frames) for count time and absorber factors the class must be supplied with a absorber correction function and works with data structures provided by xrayutilities.io classes or the corresponding objects from hdf5 files """
[docs] def __init__(self, det='', **keyargs): """ initialization of the corrector class Parameters ---------- det : str detector field name of the data structure mon : str, optional monitor field name time: float or str, optional count time field name or count time as float av_mon : float, optional average monitor value (default: data[mon].mean()) smoothmon : int number of monitor values used to get a smooth monitor signal absfun : callable, optional absorber correction function to be used as in ``absorber_corrected_intensity = data[det]*absfun(data)`` flatfield : ndarray flatfield of the detector; shape must be the same as data[det], and is only applied for MCA detectors darkfield : ndarray darkfield of the detector; shape must be the same as data[det], and is only applied for MCA detectors Examples -------- >>> detcorr = IntensityNormalizer("MCA", time="Seconds", ... absfun=lambda d: d["PSDCORR"]/d["PSD"].astype(float)) """ valid_kwargs = {'mon': 'monitor field name', 'time': 'count time field/value', 'smoothmon': 'number of monitor values to average', 'av_mon': 'average monitor value', 'absfun': 'absorber correction function', 'flatfield': 'detector flatfield', 'darkfield': 'detector darkfield'} utilities.check_kwargs(keyargs, valid_kwargs, self.__class__.__name__) # check input arguments self._setdet(det) self._setmon(keyargs.get('mon', None)) self._settime(keyargs.get('time', None)) self._setavmon(keyargs.get('av_mon', None)) self._setabsfun(keyargs.get('absfun', None)) self._setflatfield(keyargs.get('flatfield', None)) self._setdarkfield(keyargs.get('darkfield', None)) self.smoothmon = keyargs.get('smoothmon', 1)
def _getdet(self): """ det property handler returns the detector field name """ return self._det def _setdet(self, det): """ det property handler sets the detector field name """ if isinstance(det, str): self._det = det else: self._det = None raise TypeError("argument det must be of type str") def _gettime(self): """ time property handler returns the count time or the field name of the count time or None if time is not set """ return self._time def _settime(self, time): """ time property handler sets the count time field or value """ if isinstance(time, str): self._time = time elif isinstance(time, (float, int)): self._time = float(time) elif isinstance(time, type(None)): self._time = None else: raise TypeError("argument time must be of type str, float or None") def _getmon(self): """ mon property handler returns the monitor field name or None if not set """ return self._mon def _setmon(self, mon): """ mon property handler sets the monitor field name """ if isinstance(mon, str): self._mon = mon elif isinstance(mon, type(None)): self._mon = None else: raise TypeError("argument mon must be of type str") def _getavmon(self): """ av_mon property handler returns the value of the average monitor or None if average is calculated from the monitor field """ return self._avmon def _setavmon(self, avmon): """ avmon property handler sets the average monitor field name """ if isinstance(avmon, (float, int)): self._avmon = float(avmon) elif isinstance(avmon, type(None)): self._avmon = None else: raise TypeError("argument avmon must be of type float or None") def _getabsfun(self): """ absfun property handler returns the costum correction function or None """ return self._absfun def _setabsfun(self, absfun): """ absfun property handler sets the costum correction function """ if hasattr(absfun, '__call__'): self._absfun = absfun if self._absfun.__code__.co_argcount != 1: raise TypeError("argument absfun must be a function with one " "argument (data object)") elif isinstance(absfun, type(None)): self._absfun = None else: raise TypeError("argument absfun must be of type function or None") def _getflatfield(self): """ flatfield property handler returns the current set flatfield of the detector or None if not set """ return self._flatfield def _setflatfield(self, flatf): """ flatfield property handler sets the flatfield of the detector """ if isinstance(flatf, (list, tuple, numpy.ndarray)): self._flatfield = numpy.array(flatf, dtype=float) self._flatfieldav = numpy.mean(self._flatfield[ self._flatfield.nonzero()]) self._flatfield[self.flatfield < 1.e-5] = 1.0 elif isinstance(flatf, type(None)): self._flatfield = None else: raise TypeError("argument flatfield must be of type list, tuple, " "numpy.ndarray or None") def _getdarkfield(self): """ flatfield property handler returns the current set darkfield of the detector or None if not set """ return self._darkfield def _setdarkfield(self, darkf): """ flatfield property handler sets the darkfield of the detector """ if isinstance(darkf, (list, tuple, numpy.ndarray)): self._darkfield = numpy.array(darkf, dtype=float) self._darkfieldav = numpy.mean(self._darkfield) elif isinstance(darkf, type(None)): self._darkfield = None else: raise TypeError("argument flatfield must be of type list, tuple, " "numpy.ndarray or None") det = property(_getdet, _setdet) time = property(_gettime, _settime) mon = property(_getmon, _setmon) avmon = property(_getavmon, _setavmon) absfun = property(_getabsfun, _setabsfun) flatfield = property(_getflatfield, _setflatfield) darkfield = property(_getdarkfield, _setdarkfield)
[docs] def __call__(self, data, ccd=None): """ apply the correction method which was initialized to the measured data Parameters ---------- data : numpy.recarray data object from xrayutilities.io classes ccd : ndarray, optional optionally CCD data can be given as separate ndarray of shape (len(data), n1, n2), where n1, n2 is the shape of the CCD image. Returns ------- corrint : ndarray corrected intensity as numpy.ndarray of the same shape as data[det] (or ccd.shape) """ if numpy.any(ccd): rawdata = ccd else: rawdata = data[self._det] corrint = numpy.zeros(rawdata.shape, dtype=float) # set needed variables # monitor intensity if self._mon: if self.smoothmon == 1: mon = data[self._mon] else: mon = math.smooth(data[self._mon], self.smoothmon) else: mon = 1. # count time if isinstance(self._time, str): time = data[self._time] elif isinstance(self._time, float): time = self._time else: time = 1. # average monitor counts if self._avmon: avmon = self._avmon else: avmon = numpy.mean(mon) # absorber correction function if self._absfun: abscorr = self._absfun(data) else: abscorr = 1. c = abscorr * avmon / (mon * time) # correct the correction factor if it was evaluated to an incorrect # value if isinstance(c, numpy.ndarray): c[numpy.isnan(c)] = 1.0 c[numpy.isinf(c)] = 1.0 c[c == 0] = 1.0 else: if numpy.isnan(c) or numpy.isinf(c) or c == 0: c = 1.0 if len(rawdata.shape) == 1: corrint = rawdata * c elif len(rawdata.shape) == 2 and isinstance(c, numpy.ndarray): # 1D detector c.shape[0] should be rawdata.shape[0] if self._darkfield is not None: if self._darkfield.shape[0] != rawdata.shape[1]: raise InputError("data[det] second dimension must have " "the same length as darkfield") if isinstance(time, numpy.ndarray): # darkfield correction corrint = (rawdata - self._darkfield[numpy.newaxis, :] * time[:, numpy.newaxis]) elif isinstance(time, float): # darkfield correction corrint = (rawdata - self._darkfield[numpy.newaxis, :] * time) else: print("XU.normalize.IntensityNormalizer: check " "initialization and your input") return None corrint[corrint < 0.] = 0. else: corrint = rawdata corrint = corrint * c[:, numpy.newaxis] if self._flatfield is not None: if self._flatfield.shape[0] != rawdata.shape[1]: raise InputError("data[det] second dimension must have " "the same length as flatfield") # flatfield correction corrint = (corrint / self._flatfield[numpy.newaxis, :] * self._flatfieldav) elif len(rawdata.shape) == 2 and isinstance(c, float): # single 2D detector frame corrint = rawdata * c elif len(rawdata.shape) == 3: # darkfield and flatfield correction is still missing! corrint = rawdata * c[:, numpy.newaxis, numpy.newaxis] else: raise InputError("data[det] must be an array of dimension one " "or two or three") return corrint