# This file is part of xrayutilities.
#
# xrayutilies is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License and the additonal notes below for
# more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2015-2017 Marcus H. Mendenhall <marcus.mendenhall@nist.gov>
# Copyright (c) 2017-2024 Dominik Kriegner <dominik.kriegner@gmail.com>
# FP_profile was derived from http://dx.doi.org/10.6028/jres.120.014.c
# Original copyright notice:
# @author Marcus H. Mendenhall (marcus.mendenhall@nist.gov)
# @date March, 2015
# The "Fundamental Parameters Python Code" ("software") is provided by the
# National Institute of Standards and Technology (NIST), an agency of the
# United States Department of Commerce, as a public service. This software is
# to be used for non-commercial research purposes only and is expressly
# provided "AS IS." Use of this software is subject to your acceptance of these
# terms and conditions.
#
# NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED, IN FACT OR ARISING BY
# OPERATION OF LAW, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT AND DATA
# ACCURACY. NIST NEITHER REPRESENTS NOR WARRANTS THAT THE OPERATION OF THE
# SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE, OR THAT ANY DEFECTS WILL BE
# CORRECTED. NIST DOES NOT WARRANT OR MAKE ANY REPRESENTATIONS REGARDING THE
# USE OF THE SOFTWARE OR THE RESULTS THEREOF, INCLUDING BUT NOT LIMITED TO THE
# CORRECTNESS, ACCURACY, RELIABILITY, OR USEFULNESS OF THE SOFTWARE.
#
# NIST SHALL NOT BE LIABLE AND YOU HEREBY RELEASE NIST FROM LIABILITY FOR ANY
# INDIRECT, CONSEQUENTIAL, SPECIAL, OR INCIDENTAL DAMAGES (INCLUDING DAMAGES
# FOR LOSS OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF BUSINESS
# INFORMATION, AND THE LIKE), WHETHER ARISING IN TORT, CONTRACT, OR
# OTHERWISE, ARISING FROM OR RELATING TO THE SOFTWARE (OR THE USE OF OR
# INABILITY TO USE THIS SOFTWARE), EVEN IF NIST HAS BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGES.
#
# Software developed by NIST employees is not subject to copyright protection
# within the United States. By using this software, creating derivative works
# or by incorporating this software into another product, you agree that you
# will use the software only for non-commercial research purposes and will
# indemnify and hold harmless the United States Government for any and all
# damages or liabilities that arise out of any use by you.
#
# changelog:
#
# 15 August, 2018 -- MHM
# fixed apparent error in flip of eps0 across twotheta=90 around line 681
#
# 3 April, 2024
# comment debugging code which looks for negative intensities. see issue 179
"""
This module contains the core definitions for the XRD Fundamental Parameneters
Model (FPA) computation in Python. The main computational class is FP_profile,
which stores cached information to allow it to efficiently recompute profiles
when parameters have been modified. For the user an Powder class is available
which can calculate a complete powder pattern of a crystalline material.
The diffractometer line profile functions are calculated by methods from Cheary
& Coelho 1998 and Mullen & Cline paper and 'R' package. Accumulate all
convolutions in Fourier space, for efficiency, except for axial divergence,
which needs to be weighted in real space for I3 integral.
More details about the applied algorithms can be found in the paper by
M. H. Mendelhall et al., `Journal of Research of NIST 120, 223 (2015)
<http://dx.doi.org/10.6028/jres.120.014>`_ to which you should also refer for a
careful definition of all the parameters
"""
# Known bugs/problems:
# in the axial convolver the parameters slit_length_source can not be equal to
# slit_length_target!
# in this file SI units (m) are used for wavelengths, while by default angstrom
# are used in the remaining of the package
import atexit
import copy
import math
import multiprocessing
import numbers
import queue
import sys
import threading
import time
import warnings
from math import cos, pi, sin, sqrt, tan
from multiprocessing.managers import BaseManager
import numpy
from numpy import abs as nabs
from numpy import arcsin as nasin
from numpy import asarray
from numpy import cos as ncos
from numpy import sin as nsin
from numpy import sqrt as nsqrt
from scipy.special import sici # for the sine and cosine integral
# package internal imports
from .. import config, materials, utilities
from ..experiment import PowderExperiment
from ..math import VecAngle
from .smaterials import Powder
# figure out which FFT package we have, and import it
try:
from pyfftw.interfaces import cache, numpy_fft
# recorded variant of real fft that we will use
best_rfft = numpy_fft.rfft
# recorded variant of inverse real fft that we will use
best_irfft = numpy_fft.irfft
cache.enable()
cache.set_keepalive_time(1.0)
except ImportError:
best_rfft = numpy.fft.rfft
best_irfft = numpy.fft.irfft
# create a table of nice factorizations for the FFT package
#
# this is built once, and shared by all instances
# fftw can handle a variety of transform factorizations
# numpy fft is not too efficient for things other than a power of two,
# although my own measurements says it really does fine. For now, leave
# all factors available
ft_factors = [
2*2**i*3**j*5**k for i in range(20) for j in range(10) for k in range(8)
if 2*2**i*3**j*5**k <= 1000000
]
ft_factors.sort()
ft_factors = numpy.array(ft_factors, int)
# used for debugging moments from FP_profile.axial_helper().
moment_list = []
# if this is *True*, compute and save moment errors
collect_moment_errors = False
[docs]
class profile_data:
"""
a skeleton class which makes a combined dict and namespace interface for
easy pickling and data passing
"""
[docs]
def __init__(self, **kwargs):
"""
initialize the class
Parameters
----------
kwargs : dict
keyword=value list to pre-populate the class
"""
mydict = {}
mydict.update(kwargs)
for k, v in mydict.items():
setattr(self, k, v)
# a dictionary which shadows our attributes.
self.dictionary = mydict
[docs]
def add_symbol(self, **kwargs):
"""
add new symbols to both the attributes and dictionary for the class
Parameters
----------
kwargs : dict
keyword=value pairs
"""
self.dictionary.update(kwargs)
for k, v in kwargs.items():
setattr(self, k, v)
[docs]
class FP_profile:
"""
the main fundamental parameters class, which handles a single reflection.
This class is designed to be highly extensible by inheriting new
convolvers. When it is initialized, it scans its namespace for specially
formatted names, which can come from mixin classes. If it finds a function
name of the form conv_xxx, it will call this funtion to create a
convolver. If it finds a name of the form info_xxx it will associate the
dictionary with that convolver, which can be used in UI generation, for
example. The class, as it stands, does nothing significant with it. If it
finds str_xxx, it will use that function to format a printout of the
current state of the convolver conv_xxx, to allow improved report
generation for convolvers.
When it is asked to generate a profile, it calls all known convolvers.
Each convolver returns the Fourier transform of its convolvution. The
transforms are multiplied together, inverse transformed, and after fixing
the periodicity issue, subsampled, smoothed and returned.
If a convolver returns *None*, it is not multipled into the product.
Parameters
----------
max_history_length : int
the number of histories to cache (default=5); can be overridden if
memory is an issue.
length_scale_m : float
length_scale_m sets scaling for nice printing of parameters. if the
units are in mm everywhere, set it to 0.001, e.g. convolvers which
implement their own str_xxx method may use this to format their
results, especially if 'natural' units are not meters. Typical is
wavelengths and lattices in nm or angstroms, for example.
"""
max_history_length = 5 # max number of histories for each convolver
length_scale_m = 1.0
# class attribute to tell if convolvers in this class
# contain anisotropic convolvers
isotropic = True
[docs]
def __init__(self, anglemode,
gaussian_smoother_bins_sigma=1.0,
oversampling=10):
"""
initialize the instance
Parameters
----------
anglemode : {'d', 'twotheta'}
if setup will be in terms of a d-spacing, otherwise 'twotheta' if
setup will be at a fixed 2theta value.
gaussian_smoother_bins_sigma : float
the number of bins for post-smoothing of data. 1.0 is good. *None*
means no final smoothing step.
oversampling : int
the number of bins internally which will get computed for each bin
the the final result.
"""
if anglemode not in ("d", "twotheta"):
raise Exception(
f"invalid angle mode {anglemode}, must be 'd' or 'twotheta'")
# set to either 'd' for d-spacing based position, or 'twotheta' for
# angle-based position
self.anglemode = anglemode
# sigma, in units of bins, for the final smoother.
self.gaussian_smoother_bins_sigma = gaussian_smoother_bins_sigma
# the number of internal bins computed for each bin in the final
# output. 5-10 is usually plenty.
self.oversampling = oversampling
# List of our convolvers, found by introspection of names beginning
# with 'conv_'
self.convolvers = convolvers = [
x for x in dir(self) if x.startswith("conv_")]
# A dictionary which will store all the parameters local to each
# convolution
self.param_dicts = {c: {} for c in convolvers}
# add global parameters, associated with no specific convolver
# A dictionary of bound functions to call to compute convolutions
self.convolver_funcs = {x: getattr(self, x) for x in convolvers}
# If *True*, print cache hit information
self.debug_cache = False
# keep a record of things we don't keep when pickled
self._clean_on_pickle = set()
[docs]
@classmethod
def isequivalent(cls, hkl1, hkl2, crystalsystem):
"""
function to determine if according to the convolvers included in this
class two sets of Miller indices are equivalent. This function is only
called when the class attribute 'isotropic' is False.
Parameters
----------
hkl1, hkl2 : list or tuple
Miller indices to be checked for equivalence
crystalsystem : str
symmetry class of the material which is considered
Returns
-------
bool
"""
return True
[docs]
def get_function_name(self):
"""
return the name of the function that called this. Useful for convolvers
to identify themselves
Returns
-------
str
name of calling function
"""
return sys._getframe(1).f_code.co_name
[docs]
def add_buffer(self, b):
"""
add a numpy array to the list of objects that can be thrown away on
pickling.
Parameters
----------
b : array-like
the buffer to add to the list
Returns
-------
b : array-like
return the same buffer, to make nesting easy.
"""
self._clean_on_pickle.add(id(b))
return b
[docs]
def set_window(self, twotheta_window_center_deg,
twotheta_window_fullwidth_deg, twotheta_output_points):
"""
move the compute window to a new location and compute grids, without
resetting all parameters. Clears convolution history and sets up many
arrays.
Parameters
----------
twotheta_window_center_deg : float
the center position of the middle bin of the window, in degrees
twotheta_window_fullwidth_deg : float
the full width of the window, in degrees
twotheta_output_points : int
the number of bins in the final output
"""
# the saved width of the window, in degrees
self.twotheta_window_fullwidth_deg = twotheta_window_fullwidth_deg
# the saved center of the window, in degrees
self.twotheta_window_center_deg = twotheta_window_center_deg
# the width of the window, in radians
window_fullwidth = math.radians(twotheta_window_fullwidth_deg)
self.window_fullwidth = window_fullwidth
# the center of the window, in radians
twotheta = math.radians(twotheta_window_center_deg)
self.twotheta_window_center = twotheta
# the number of points to compute in the final results
self.twotheta_output_points = twotheta_output_points
# the number of points in Fourier space to compute
nn = self.oversampling * twotheta_output_points // 2 + 1
self.n_omega_points = nn
# build all the arrays
b = self.add_buffer # shortcut
# a real-format scratch buffer
self._rb1 = b(numpy.zeros(nn, float))
# a real-format scratch buffer
self._rb2 = b(numpy.zeros(nn, float))
# a real-format scratch buffer
self._rb3 = b(numpy.zeros(nn, float))
# a complex-format scratch buffer
self._cb1 = b(numpy.zeros(nn, complex))
# a scratch buffer used by the axial helper
self._f0buf = b(numpy.zeros(self.oversampling *
twotheta_output_points, float))
# a scratch buffer used for axial divergence
self._epsb2 = b(numpy.zeros(self.oversampling *
twotheta_output_points, float))
# the I2+ buffer
self._I2p = b(numpy.zeros(self.oversampling *
twotheta_output_points, float))
# the I2- buffer
self._I2m = b(numpy.zeros(self.oversampling *
twotheta_output_points, float))
# another buffer used for axial divergence
self._axial = b(numpy.zeros(self.oversampling *
twotheta_output_points, float))
# the largest frequency in Fourier space
omega_max = self.n_omega_points * 2 * pi / window_fullwidth
# build the x grid and the complex array that is the convolver
# omega is in inverse radians in twotheta space (!)
# i.e. if a transform is written
# I(ds) = integral(A(L) exp(2 pi i L ds) dL
# where L is a real-space length, and s=2 sin(twotheta/2)/lambda
# then ds=2*pi*omega*cos(twotheta/2)/lambda (double check this!)
# The grid in Fourier space, in inverse radians
self.omega_vals = b(numpy.linspace(
0, omega_max, self.n_omega_points, endpoint=True))
# The grid in Fourier space, in inverse degrees
self.omega_inv_deg = b(numpy.radians(self.omega_vals))
# The grid in real space, in radians, with full oversampling
self.twothetasamples = b(numpy.linspace(
twotheta - window_fullwidth/2.0, twotheta + window_fullwidth/2.0,
self.twotheta_output_points * self.oversampling, endpoint=False))
# The grid in real space, in degrees, with full oversampling
self.twothetasamples_deg = b(numpy.linspace(
twotheta_window_center_deg - twotheta_window_fullwidth_deg/2.0,
twotheta_window_center_deg + twotheta_window_fullwidth_deg/2.0,
self.twotheta_output_points * self.oversampling, endpoint=False))
# Offsets around the center of the window, in radians
self.epsilon = b(self.twothetasamples - twotheta)
# A dictionary in which we collect recent state for each convolution.
# whenever the window gets reset, all of these get cleared
self.convolution_history = {x: [] for x in self.convolvers}
# A dictionary of Lorentz widths, used for de-periodizing the final
# result.
self.lor_widths = {}
[docs]
def get_good_bin_count(self, count):
"""
find a bin count close to what we need, which works well for Fourier
transforms.
Parameters
----------
count : int
a number of bins.
Returns
-------
int
a bin count somewhat larger than *count* which is efficient for FFT
"""
return ft_factors[ft_factors.searchsorted(count)]
[docs]
def set_optimized_window(self, twotheta_window_center_deg,
twotheta_approx_window_fullwidth_deg,
twotheta_exact_bin_spacing_deg):
"""
pick a bin count which factors cleanly for FFT, and adjust the window
width to preserve the exact center and bin spacing
Parameters
----------
twotheta_window_center_deg : float
exact position of center bin, in degrees
twotheta_approx_window_fullwidth_deg: float
approximate desired width
twotheta_exact_bin_spacing_deg: float
the exact bin spacing to use
"""
bins = self.get_good_bin_count(
int(1 + twotheta_approx_window_fullwidth_deg /
twotheta_exact_bin_spacing_deg))
window_actwidth = twotheta_exact_bin_spacing_deg * bins
self.set_window(twotheta_window_center_deg=twotheta_window_center_deg,
twotheta_window_fullwidth_deg=window_actwidth,
twotheta_output_points=bins)
[docs]
def set_parameters(self, convolver="global", **kwargs):
"""
update the dictionary of parameters associated with the given convolver
Parameters
----------
convolver : str
the name of the convolver. name 'global', e.g., attaches to
function 'conv_global'
kwargs : dict
keyword-value pairs to update the convolvers dictionary.
"""
self.param_dicts["conv_" + convolver].update(kwargs)
[docs]
def get_conv(self, name, key, format=float):
"""
get a cached, pre-computed convolver associated with the given
parameters, or a newly zeroed convolver if the cache doesn't contain
it. Recycles old cache entries.
This takes advantage of the mutability of arrays. When the contents of
the array are changed by the convolver, the cached copy is implicitly
updated, so that the next time this is called with the same parameters,
it will return the previous array.
Parameters
----------
name : str
the name of the convolver to seek
key : object
any hashable object which identifies the parameters for the
computation
format : numpy.dtype, optional
the type of the array to create, if one is not found.
Returns
-------
bool
flag, which is *True* if valid data were found, or *False* if the
returned array is zero, and *array*, which must be computed by the
convolver if *flag* was *False*.
"""
# previous computed values as a list
history = self.convolution_history[name]
for idx, (k, b) in enumerate(history):
if k == key:
# move to front to mark recently used
history.insert(0, history.pop(idx))
if self.debug_cache:
print(name, True, file=sys.stderr)
return True, b # True says we got a buffer with valid data
if len(history) == self.max_history_length:
buf = history.pop(-1)[1] # re-use oldest buffer
buf[:] = 0
else:
buf = numpy.zeros(self.n_omega_points, format)
history.insert(0, (key, buf))
if self.debug_cache:
print(name, False, file=sys.stderr)
return False, buf # False says buffer is empty, need to recompute
# A dictionary of default parameters for the global namespace,
# used to seed a GUI which can harvest this for names, descriptions, and
# initial values
info_global = dict(
group_name="Global parameters",
help="this should be help information",
param_info=dict(
twotheta0_deg=("Bragg center of peak (degrees)", 30.0),
d=("d spacing (m)", 4.00e-10),
dominant_wavelength=(
"wavelength of most intense line (m)", 1.5e-10)
)
)
def __str__(self):
"""
return a nicely formatted report describing the current state of this
class. this looks for an str_xxx function associated with each conv_xxx
name. If it is found, that function if called to get the state of
conv_xxx. Otherwise, this simply formats the dictionary of parameters
for the convolver, and uses that.
Returns
-------
str
string of formatted information
"""
keys = list(self.convolver_funcs)
keys.sort() # always return info in the same order
# global is always first, anyways!
keys.insert(0, keys.pop(keys.index('conv_global')))
strings = ["", f"***convolver id 0x{id(self):08x}:"]
for k in keys:
strfn = "str_" + k[5:]
if hasattr(self, strfn):
strings.append(getattr(self, strfn)())
else:
dd = self.param_dicts["conv_" + k[5:]]
if dd:
strings.append(k[5:] + ": " + str(dd))
return '\n'.join(strings)
[docs]
def str_global(self):
"""
returns a string representation for the global context.
Returns
-------
str
report on global parameters.
"""
# in case it's not initialized
self.param_dicts["conv_global"].setdefault("d", 0)
return "global: peak center=%(twotheta0_deg).4f, d=%(d).8g, eq. "\
"div=%(equatorial_divergence_deg).3f" \
% self.param_dicts["conv_global"]
[docs]
def conv_global(self):
"""
a dummy convolver to hold global variables and information.
the global context isn't really a convolver, returning *None* means
ignore result
Returns
-------
*None*
always returns None
"""
return None
[docs]
def axial_helper(self, outerbound, innerbound, epsvals, destination,
peakpos=0, y0=0, k=0):
"""
the function F0 from the paper. compute k/sqrt(peakpos-x)+y0 nonzero
between outer & inner (inner is closer to peak) or k/sqrt(x-peakpos)+y0
if reversed (i.e. if outer > peak) fully evaluated on a specified eps
grid, and stuff into destination
Parameters
----------
outerbound : float
the edge of the function farthest from the singularity, referenced
to epsvals
innerbound : float
the edge closest to the singularity, referenced to epsvals
epsvals : array-like
the array of two-theta values or offsets
destination : array-like
an array into which final results are summed. modified in place!
peakpos : float
the position of the singularity, referenced to epsvals.
y0 : float
the constant offset
k : float
the scale factor
Returns
-------
lower_index, upper_index : int
python style bounds for region of `destination` which has been
modified.
"""
if k == 0:
# nothing to do, point at the middle
return len(epsvals) // 2, len(epsvals) // 2 + 1
# bin width for normalizer so sum(result*dx)=exact integral
dx = epsvals[1] - epsvals[0]
# flag for whether tail is to the left or right.
flip = outerbound > peakpos
delta1 = abs(innerbound - peakpos)
delta2 = abs(outerbound - peakpos)
# this is the analytic area the function must have,
# integral(1/sqrt(eps0-eps)) from lower to upper
exactintegral = 2 * k * (sqrt(delta2) - sqrt(delta1))
exactintegral += y0 * (delta2 - delta1)
# exactintegral=max(0, exactintegral) # can never be < 0, beta out of
# range.
exactintegral *= 1 / dx # normalize so sum is right
# compute the exact centroid we need for this
if abs(delta2-delta1) < 1e-12:
exact_moment1 = 0
else:
exact_moment1 = ( # simplified from Mathematica FortranForm
(4*k*(delta2**1.5-delta1**1.5) + 3*y0*(delta2**2-delta1**2)) /
(6.*(2*k*(sqrt(delta2)-sqrt(delta1)) + y0*(delta2-delta1)))
)
if not flip:
exact_moment1 = -exact_moment1
exact_moment1 += peakpos
# note: because of the way the search is done, this produces a result
# with a bias of 1/2 channel to the left of where it should be.
# this is fixed by shifting all the parameters up 1/2 channel
outerbound += dx / 2
innerbound += dx / 2
peakpos += dx / 2 # fix 1/2 channel average bias from search
# note: searchsorted(side="left") always returns the position of the
# bin to the right of the match, or exact bin
idx0, idx1 = epsvals.searchsorted(
(outerbound, innerbound), side='left')
# peak has been squeezed out, nothing to do
if abs(outerbound - innerbound) < (2*dx) or abs(idx1 - idx0) < 2:
# preserve the exact centroid: requires summing into two channels
# for a peak this narrow, no attempt to preserve the width.
# note that x1 (1-f1) + (x1+dx) f1 = mu has solution
# (mu - x1) / dx = f1 thus, we want to sum into a channel that has
# x1<mu by less than dx, and the one to its right
# pick left edge and make sure we are past it
idx0 = min(idx0, idx1) - 1
while exact_moment1 - epsvals[idx0] > dx:
# normally only one step max, but make it a loop in case of
# corner case
idx0 += 1
f1 = (exact_moment1 - epsvals[idx0]) / dx
res = (exactintegral * (1 - f1), exactintegral * f1)
destination[idx0:idx0 + 2] += res
if collect_moment_errors:
centroid2 = (res * epsvals[idx0:idx0 + 2]).sum() / sum(res)
moment_list.append((centroid2 - exact_moment1) / dx)
return [idx0, idx0 + 2] # return collapsed bounds
if not flip:
if epsvals[idx0] != outerbound:
idx0 = max(idx0 - 1, 0)
idx1 = min(idx1 + 1, len(epsvals))
sign = 1
deps = self._f0buf[idx0:idx1]
deps[:] = peakpos
deps -= epsvals[idx0:idx1]
deps[-1] = peakpos - min(innerbound, peakpos)
deps[0] = peakpos - outerbound
else:
idx0, idx1 = idx1, idx0
if epsvals[idx0] != innerbound:
idx0 = max(idx0 - 1, 0)
idx1 = min(idx1 + 1, len(epsvals))
sign = -1
deps = self._f0buf[idx0:idx1]
deps[:] = epsvals[idx0:idx1]
deps -= peakpos
deps[0] = max(innerbound, peakpos) - peakpos
deps[-1] = outerbound - peakpos
dx0 = abs(deps[1] - deps[0])
dx1 = abs(deps[-1] - deps[-2])
# make the numerics accurate: compute average on each bin, which is
# integral of 1/sqrt = 2*sqrt, then difference integral
# do it in place, return value is actually deps too
intg = numpy.sqrt(deps, deps)
intg *= 2 * k * sign
# do difference in place, running forward to avoid self-trampling
intg[:-1] -= intg[1:]
intg[1:-2] += y0 * dx # add constant
# handle narrowed bins on ends carefully
intg[0] += y0 * dx0
intg[-2] += y0 * dx1
# # intensities are never less than zero!
# if min(intg[:-1]) < -1e-10 * max(intg[:-1]):
# print("bad parameters:", (5 * "%10.4f") %
# (peakpos, innerbound, outerbound, k, y0))
# print(len(intg), intg[:-1])
# raise ValueError("Bad axial helper parameters")
# now, make sure the underlying area is the exactly correct
# integral, without bumps due to discretizing the grid.
intg *= (exactintegral / (intg[:-1].sum()))
destination[idx0:idx1 - 1] += intg[:-1]
# This is purely for debugging. If collect_moment_errors is *True*,
# compute exact vs. approximate moments.
if collect_moment_errors:
centroid2 = (intg[:-1] * epsvals[idx0:idx1 - 1]
).sum() / intg[:-1].sum()
moment_list.append((centroid2 - exact_moment1) / dx)
return [idx0, idx1 - 1] # useful info for peak position
[docs]
def full_axdiv_I2(self, Lx=None, Ls=None, Lr=None, R=None, twotheta=None,
beta=None, epsvals=None):
"""
return the *I2* function
Parameters
----------
Lx : float
length of the xray filament
Ls : float
length of the sample
Lr : float
length of the receiver slit
R : float
diffractometer length, assumed symmetrical
twotheta : float
angle, in radians, of the center of the computation
beta : float
offset angle
epsvals : array-like
array of offsets from center of computation, in radians
Returns
-------
epsvals : array-like
array of offsets from center of computation, in radians
idxmin, idxmax : int
the full python-style bounds of the non-zero region of `I2p`
and `I2m`
I2p, I2m : array-like
I2+ and I2- from the paper, the contributions to the intensity
"""
beta1 = (Ls - Lx) / (2 * R) # Ch&Co after eq. 15abcd
beta2 = (Ls + Lx) / (2 * R) # Ch&Co after eq. 15abcd, corrected by KM
eps0 = beta * beta * tan(twotheta) / 2 # after eq. 26 in Ch&Co
if -beta2 <= beta < beta1:
z0p = Lx / 2 + beta * R * (1 + 1 / cos(twotheta))
elif beta1 <= beta <= beta2:
z0p = Ls / 2 + beta * R / cos(twotheta)
else:
raise ValueError("beta out of range")
if -beta2 <= beta <= -beta1:
z0m = -1 * Ls / 2 + beta * R / cos(twotheta)
elif -beta1 < beta <= beta2:
z0m = -1 * Lx / 2 + beta * R * (1 + 1 / cos(twotheta))
else:
raise ValueError("beta out of range")
epsscale = tan(pi / 2 - twotheta) / (2 * R * R) # =cotan(twotheta)...
# Ch&Co 18a&18b, KM sign correction
eps1p = (eps0 - epsscale * ((Lr / 2) - z0p)**2)
eps2p = (eps0 - epsscale * ((Lr / 2) - z0m)**2)
# reversed eps2m and eps1m per KM R
eps2m = (eps0 - epsscale * ((Lr / 2) + z0p)**2)
eps1m = (eps0 - epsscale * ((Lr / 2) + z0m)**2) # flip all epsilons
if twotheta > pi/2:
# this set of inversions from KM 'R' code, simplified here
eps1p, eps2p, eps1m, eps2m = eps1m, eps2m, eps1p, eps2p
# identify ranges per Ch&Co 4.2.2 and table 1 and select parameters
# note table 1 is full of typos, but the minimized
# tests from 4.2.2 with redundancies removed seem fine.
if Lr > z0p - z0m:
if z0p <= Lr/2 and z0m > -1*Lr/2: # beam entirely within slit
rng = 1
ea = eps1p
eb = eps2p
ec = eps1m
ed = eps2m
elif (z0p > Lr/2 and z0m < Lr/2) or \
(z0m < -1*Lr/2 and z0p > -1*Lr/2):
rng = 2
ea = eps2p
eb = eps1p
ec = eps1m
ed = eps2m
else:
rng = 3
ea = eps2p
eb = eps1p
ec = eps1m
ed = eps2m
else:
# beam hanging off both ends of slit, peak centered
if z0m < -1*Lr/2 and z0p > Lr/2:
rng = 1
ea = eps1m
eb = eps2p
ec = eps1p
ed = eps2m
# one edge of beam within slit
elif (-1*Lr/2 < z0m < Lr/2 and z0p > Lr/2) or \
(-1*Lr/2 < z0p < Lr/2 and z0m < -1*Lr/2):
rng = 2
ea = eps2p
eb = eps1m
ec = eps1p
ed = eps2m
else:
rng = 3
ea = eps2p
eb = eps1m
ec = eps1p
ed = eps2m
# now, evaluate function on bounds in table 1 based on ranges
# note: because of a sign convention in epsilon, the bounds all get
# switched
# define them in our namespace so they inherit ea, eb, ec, ed, etc.
def F1(dst, lower, upper, eea, eeb):
return self.axial_helper(destination=dst,
innerbound=upper, outerbound=lower,
epsvals=epsvals, peakpos=eps0,
k=sqrt(abs(eps0-eeb))-sqrt(abs(eps0-eea)),
y0=0)
def F2(dst, lower, upper, eea):
return self.axial_helper(destination=dst,
innerbound=upper, outerbound=lower,
epsvals=epsvals, peakpos=eps0,
k=sqrt(abs(eps0 - eea)), y0=-1)
def F3(dst, lower, upper, eea):
return self.axial_helper(destination=dst,
innerbound=upper, outerbound=lower,
epsvals=epsvals, peakpos=eps0,
k=sqrt(abs(eps0 - eea)), y0=+1)
def F4(dst, lower, upper, eea):
# just like F2 but k and y0 negated
return self.axial_helper(destination=dst,
innerbound=upper, outerbound=lower,
epsvals=epsvals, peakpos=eps0,
k=-sqrt(abs(eps0 - eea)), y0=+1)
I2p = self._I2p
I2p[:] = 0
I2m = self._I2m
I2m[:] = 0
indices = []
if rng == 1:
indices += F1(dst=I2p, lower=ea, upper=eps0, eea=ea, eeb=eb)
indices += F2(dst=I2p, lower=eb, upper=ea, eea=eb)
indices += F1(dst=I2m, lower=ec, upper=eps0, eea=ec, eeb=ed)
indices += F2(dst=I2m, lower=ed, upper=ec, eea=ed)
elif rng == 2:
indices += F2(dst=I2p, lower=ea, upper=eps0, eea=ea)
indices += F3(dst=I2m, lower=eb, upper=eps0, eea=ea)
indices += F1(dst=I2m, lower=ec, upper=eb, eea=ec, eeb=ed)
indices += F2(dst=I2m, lower=ed, upper=ec, eea=ed)
elif rng == 3:
indices += F4(dst=I2m, lower=eb, upper=ea, eea=ea)
indices += F1(dst=I2m, lower=ec, upper=eb, eea=ec, eeb=ed)
indices += F2(dst=I2m, lower=ed, upper=ec, eea=ed)
idxmin = min(indices)
idxmax = max(indices)
return epsvals, idxmin, idxmax, I2p, I2m
[docs]
def full_axdiv_I3(self, Lx=None, Ls=None, Lr=None, R=None,
twotheta=None, epsvals=None, sollerIdeg=None,
sollerDdeg=None, nsteps=10, axDiv=""):
"""
carry out the integral of *I2* over *beta* and the Soller slits.
Parameters
----------
Lx : float
length of the xray filament
Ls : float
length of the sample
Lr : float
length of the receiver slit
R : float
the (assumed symmetrical) diffractometer radius
twotheta : float
angle, in radians, of the center of the computation
epsvals : array-like
array of offsets from center of computation, in radians
sollerIdeg : float
the full-width (both sides) cutoff angle of the incident Soller
slit
sollerDdeg : float
the full-width (both sides) cutoff angle of the detector Soller
slit
nsteps : int
the number of subdivisions for the integral
axDiv : str
not used
Returns
-------
array-like
the accumulated integral, a copy of a persistent buffer *_axial*
"""
beta2 = (Ls + Lx) / (2 * R) # Ch&Co after eq. 15abcd, corrected by KM
if sollerIdeg is not None:
solIrad = math.radians(sollerIdeg) / 2
def solIfunc(x):
return numpy.clip(1.0 - abs(x / solIrad), 0, 1)
beta2 = min(beta2, solIrad) # no point going beyond Soller
else:
def solIfunc(x):
return numpy.ones_like(x)
if sollerDdeg is not None:
solDrad = math.radians(sollerDdeg) / 2
def solDfunc(x):
return numpy.clip(1.0 - abs(x / solDrad), 0, 1)
else:
def solDfunc(x):
return numpy.ones_like(x)
accum = self._axial
accum[:] = 0
if twotheta > pi / 2:
tth1 = pi - twotheta
else:
tth1 = twotheta
for iidx in range(nsteps):
beta = beta2 * iidx / float(nsteps)
_, idxmin, idxmax, I2p, I2m = self.full_axdiv_I2(
Lx=Lx, Lr=Lr, Ls=Ls, beta=beta, R=R,
twotheta=twotheta, epsvals=epsvals)
# after eq. 26 in Ch&Co
eps0 = beta * beta * tan(twotheta) / 2
gamma0 = beta / cos(tth1)
deps = self._f0buf[idxmin:idxmax]
deps[:] = eps0
deps -= epsvals[idxmin:idxmax]
deps *= 2 * tan(twotheta)
# check two channels on each end for negative argument.
deps[-1] = max(deps[-1], 0)
deps[0] = max(deps[0], 0)
if len(deps) >= 2:
deps[-2] = max(deps[-2], 0)
deps[1] = max(deps[1], 0)
gamarg = numpy.sqrt(deps, deps) # do sqrt in place for speed
# still need to convert these to in-place
gamp = gamma0 + gamarg
gamm = gamma0 - gamarg
if iidx == 0 or iidx == nsteps - 1:
weight = 1.0 # trapezoidal rule weighting
else:
weight = 2.0
# sum into the accumulator only channels which can be non-zero
# do scaling in-place to save a lot of slow array copying
I2p[idxmin:idxmax] *= solDfunc(gamp)
I2p[idxmin:idxmax] *= (weight * solIfunc(beta))
accum[idxmin:idxmax] += I2p[idxmin:idxmax]
I2m[idxmin:idxmax] *= solDfunc(gamm)
I2m[idxmin:idxmax] *= (weight * solIfunc(beta))
accum[idxmin:idxmax] += I2m[idxmin:idxmax]
# keep this normalized
K = 2 * R * R * abs(tan(twotheta))
accum *= K
return accum
[docs]
def conv_axial(self):
"""
compute the Fourier transform of the axial divergence component
Returns
-------
array-like
the transform buffer, or *None* if this is being ignored
"""
me = self.get_function_name() # the name of the convolver, as a string
if self.param_dicts[me].get("axDiv", None) is None:
return None
kwargs = {}
kwargs.update(self.param_dicts[me]) # get all of our parameters
kwargs.update(self.param_dicts["conv_global"])
if "equatorial_divergence_deg" in kwargs:
del kwargs["equatorial_divergence_deg"] # not used
flag, axfn = self.get_conv(me, kwargs, complex)
if flag:
return axfn # already up to date if first return is True
# no axial divergence, transform of delta fn
if kwargs["axDiv"] != "full" or kwargs["twotheta0_deg"] == 90.0:
axfn[:] = 1
return axfn
axbuf = self.full_axdiv_I3(
nsteps=kwargs["n_integral_points"],
epsvals=self.epsilon,
Lx=kwargs["slit_length_source"],
Lr=kwargs["slit_length_target"],
Ls=kwargs["length_sample"],
sollerIdeg=kwargs["angI_deg"],
sollerDdeg=kwargs["angD_deg"],
R=kwargs["diffractometer_radius"],
twotheta=kwargs["twotheta0"]
)
axfn[:] = best_rfft(axbuf)
return axfn
[docs]
def conv_tube_tails(self):
"""
compute the Fourier transform of the rectangular tube tails function
Returns
-------
array-like
the transform buffer, or *None* if this is being ignored
"""
me = self.get_function_name() # the name of the convolver, as a string
kwargs = {}
kwargs.update(self.param_dicts[me]) # get all of our parameters
if not kwargs:
return None # no convolver
# we also need the diffractometer radius from the global space
kwargs["diffractometer_radius"] = self.param_dicts[
"conv_global"]["diffractometer_radius"]
flag, tailfn = self.get_conv(me, kwargs, complex)
if flag:
return tailfn # already up to date
# tube_tails is (main width, left width, right width, intensity),
# so peak is raw peak + tophat centered at (left width+ right width)
# with area intensity*(right width-left width)/main_width
# check this normalization!
# note: this widths are as defined by Topas... really I think it should
# be
# x/(2*diffractometer_radius) since the detector is 2R from the source,
# but since this is just a fit parameter, we'll defin it as does Topas
tail_eps = (kwargs["tail_right"] - kwargs["tail_left"]) / \
kwargs["diffractometer_radius"]
main_eps = kwargs["main_width"] / kwargs["diffractometer_radius"]
tail_center = (kwargs["tail_right"] + kwargs["tail_left"]) / \
kwargs["diffractometer_radius"] / 2.0
tail_area = kwargs["tail_intens"] * \
(kwargs["tail_right"] - kwargs["tail_left"]) / kwargs["main_width"]
cb1 = self._cb1
rb1 = self._rb1
cb1.real = 0
cb1.imag = self.omega_vals
cb1.imag *= tail_center # sign is consistent with Topas definition
numpy.exp(cb1, tailfn) # shifted center, computed into tailfn
rb1[:] = self.omega_vals
rb1 *= (tail_eps / 2 / pi)
rb1 = numpy.sinc(rb1)
tailfn *= rb1
tailfn *= tail_area # normalize
rb1[:] = self.omega_vals
rb1 *= (main_eps / 2 / pi)
rb1 = numpy.sinc(rb1)
tailfn += rb1 # add central peak
return tailfn
[docs]
def general_tophat(self, name="", width=None):
"""
a utility to compute a transformed tophat function and save it in a
convolver buffer
Parameters
----------
name : str
the name of the convolver cache buffer to update
width : float
the width in 2-theta space of the tophat
Returns
-------
array-like
the updated convolver buffer, or *None* if the width was *None*
"""
if width is None:
return None # no convolver
flag, conv = self.get_conv(name, width, float)
if flag:
return conv # already up to date
rb1 = self._rb1
rb1[:] = self.omega_vals
rb1 *= (width / 2 / pi)
conv[:] = numpy.sinc(rb1)
return conv
# A dictionary of default parameters for conv_emissions,
# used to seed a GUI which can harvest this for names, descriptions, and
# initial values
info_emission = dict(
group_name="Incident beam and crystal size",
help="this should be help information",
param_info=dict(
emiss_wavelengths=("wavelengths (m)", (1.58e-10,)),
emiss_intensities=("relative intensities", (1.00,)),
emiss_lor_widths=("Lorenztian emission fwhm (m)", (1e-13,)),
emiss_gauss_widths=("Gaussian emissions fwhm (m)", (1e-13,)),
crystallite_size_gauss=(
"Gaussian crystallite size fwhm (m)", 1e-6),
crystallite_size_lor=(
"Lorentzian crystallite size fwhm (m)", 1e-6),
)
)
[docs]
def str_emission(self):
"""
format the emission spectrum and crystal size information
Returns
-------
str
the formatted information
"""
dd = self.param_dicts["conv_emission"]
if not dd:
return "No emission spectrum"
dd.setdefault("crystallite_size_lor", 1e10)
dd.setdefault("crystallite_size_gauss", 1e10)
dd.setdefault("strain_lor", 0)
dd.setdefault("strain_gauss", 0)
spect = numpy.array((
dd["emiss_wavelengths"], dd["emiss_intensities"],
dd["emiss_lor_widths"], dd["emiss_gauss_widths"]))
# convert to angstroms, like Topas
spect[0] *= 1e10 * self.length_scale_m
spect[2] *= 1e13 * self.length_scale_m # milli-angstroms
spect[3] *= 1e13 * self.length_scale_m # milli-angstroms
nm = 1e9 * self.length_scale_m
items = ["emission and broadening:"]
items.append("spectrum=\n" + str(spect.transpose()))
items.append("crystallite_size_lor (nm): "
f"{dd['crystallite_size_lor']*nm:.5g}")
items.append("crystallite_size_gauss (nm): "
f"{dd['crystallite_size_gauss']*nm:.5g}")
items.append(f"strain_lor: {dd['strain_lor']:.5g}")
items.append(f"strain_gauss: {dd['strain_gauss']:.5g}")
return '\n'.join(items)
[docs]
def conv_emission(self):
"""
compute the emission spectrum and (for convenience) the particle size
widths
Returns
-------
array-like
the convolver for the emission and particle sizes
Note:
the particle size and strain stuff here is just to be consistent
with *Topas* and to be vaguely efficient about the computation,
since all of these have the same general shape.
"""
me = self.get_function_name() # the name of the convolver, as a string
kwargs = {}
kwargs.update(self.param_dicts[me]) # get all of our parameters
kwargs.update(self.param_dicts["conv_global"])
# if the crystallite size and strain parameters are not set, set them
# to values that make their corrections disappear
kwargs.setdefault("crystallite_size_lor", 1e10)
kwargs.setdefault("crystallite_size_gauss", 1e10)
kwargs.setdefault("strain_lor", 0)
kwargs.setdefault("strain_gauss", 0)
# convert arrays to lists for key checking
key = {}
key.update(kwargs)
for k, v in key.items():
if hasattr(v, 'tolist'):
key[k] = v.tolist()
flag, emiss = self.get_conv(me, key, complex)
if flag:
return emiss # already up to date
epsilon0s = (
2 * nasin(asarray(kwargs["emiss_wavelengths"])/(2.0*kwargs["d"])) -
kwargs["twotheta0"]
)
theta = kwargs["twotheta0"] / 2
# Emission profile FWHM + crystallite broadening (scale factors are
# Topas choice!) (Lorentzian)
# note: the strain broadenings in Topas are expressed in degrees
# 2theta, must convert to radians(theta) with pi/360
widths = (
(asarray(kwargs["emiss_lor_widths"]) /
asarray(kwargs["emiss_wavelengths"])) *
tan(theta) + math.radians(kwargs["strain_lor"]) / 2 * tan(theta) +
(asarray(kwargs["emiss_wavelengths"]) /
(2*kwargs["crystallite_size_lor"]*cos(theta)))
)
# save weighted average width for future reference in periodicity fixer
self.lor_widths[me] = sum(widths * kwargs["emiss_intensities"]) / \
sum(kwargs["emiss_intensities"])
# gaussian bits add in quadrature
gfwhm2s = (
((2 * asarray(kwargs["emiss_gauss_widths"]) /
asarray(kwargs["emiss_wavelengths"])) *
tan(theta))**2 +
(math.radians(kwargs["strain_gauss"]) / 2 * tan(theta))**2 +
(asarray(kwargs["emiss_wavelengths"]) /
(kwargs["crystallite_size_gauss"]*cos(theta)))**2
)
# note that the Fourier transform of a lorentzian with FWHM 2a
# is exp(-abs(a omega))
# now, the line profiles in Fourier space have to have phases
# carefully handled to put the lines in the right places.
# note that the transform of f(x+dx)=exp(i omega dx) f~(x)
omega_vals = self.omega_vals
for wid, gfwhm2, eps, intens in zip(widths, gfwhm2s, epsilon0s,
kwargs["emiss_intensities"]):
xvals = numpy.clip(omega_vals * (-wid), -100, 0)
sig2 = gfwhm2 / (8 * math.log(2.0)) # convert fwhm**2 to sigma**2
gxv = numpy.clip((sig2 / -2.0) * omega_vals * omega_vals, -100, 0)
emiss += numpy.exp(xvals + gxv + complex(0, -eps) *
omega_vals) * intens
return emiss
[docs]
def conv_flat_specimen(self):
"""
compute the convolver for the flat-specimen correction
Returns
-------
array-like
the convolver
"""
me = self.get_function_name() # the name of the convolver, as a string
equatorial_divergence_deg = self.param_dicts[
"conv_global"].get("equatorial_divergence_deg", None)
if not equatorial_divergence_deg:
return None
twotheta0 = self.param_dicts["conv_global"]["twotheta0"]
key = (twotheta0, equatorial_divergence_deg)
flag, conv = self.get_conv(me, key, complex)
if flag:
return conv # already up to date
# Flat-specimen error, from Cheary, Coelho & Cline 2004 NIST eq. 9 & 10
# compute epsm in radians from eq. divergence in degrees
# to make it easy to use the axial_helper to compute the function
epsm = math.radians(equatorial_divergence_deg)**2 /\
tan(twotheta0/2.0) / 2.0
eqdiv = self._epsb2
eqdiv[:] = 0
dtwoth = (self.twothetasamples[1] - self.twothetasamples[0])
self.axial_helper(destination=eqdiv,
outerbound=-epsm,
innerbound=0,
epsvals=self.epsilon,
peakpos=0, k=dtwoth/(2.0*sqrt(epsm)))
conv[:] = best_rfft(eqdiv)
conv[1::2] *= -1 # flip center
return conv
[docs]
def conv_absorption(self):
"""
compute the sample transparency correction, including the
finite-thickness version
Returns
-------
array-like
the convolver
"""
me = self.get_function_name() # the name of the convolver, as a string
kwargs = {}
kwargs.update(self.param_dicts[me]) # get all of our parameters
if not kwargs:
return None
kwargs["twotheta0"] = self.param_dicts["conv_global"]["twotheta0"]
kwargs["diffractometer_radius"] = self.param_dicts[
"conv_global"]["diffractometer_radius"]
flag, conv = self.get_conv(me, kwargs, complex)
if flag:
return conv # already up to date
# absorption, from Cheary, Coelho & Cline 2004 NIST eq. 12,
# EXCEPT delta = 1/(2*mu*R) instead of 2/(mu*R)
# from Mathematica, unnormalized transform is
# (1-exp(epsmin*(i w + 1/delta)))/(i w + 1/delta)
delta = sin(kwargs["twotheta0"]) / \
(2 * kwargs["absorption_coefficient"] *
kwargs["diffractometer_radius"])
# arg=(1/delta)+complex(0,-1)*omega_vals
cb = self._cb1
cb.imag = self.omega_vals
cb.imag *= -1
cb.real = 1 / delta
numpy.reciprocal(cb, conv) # limit for thick samples=1/(delta*arg)
conv *= 1.0 / delta # normalize
# rest of transform of function with cutoff
if kwargs.get("sample_thickness", None) is not None:
epsmin = -2.0 * kwargs["sample_thickness"] * \
cos(kwargs["twotheta0"] / 2.0) / \
kwargs["diffractometer_radius"]
cb *= epsmin
numpy.expm1(cb, cb)
cb *= -1
conv *= cb
return conv
[docs]
def conv_displacement(self):
"""
compute the peak shift due to sample displacement and the *2theta* zero
offset
Returns
-------
array-like
the convolver
"""
me = self.get_function_name() # the name of the convolver, as a string
kwargs = self.param_dicts[me]
twotheta0 = self.param_dicts["conv_global"]["twotheta0"]
diffractometer_radius = self.param_dicts[
"conv_global"]["diffractometer_radius"]
specimen_displacement = kwargs.get("specimen_displacement", 0.0)
if specimen_displacement is None:
specimen_displacement = 0.0
zero_error_deg = kwargs.get("zero_error_deg", 0.0)
if zero_error_deg is None:
zero_error_deg = 0.0
flag, conv = self.get_conv(me,
(twotheta0, diffractometer_radius,
specimen_displacement, zero_error_deg),
complex)
if flag:
return conv # already up to date
delta = -2 * cos(twotheta0 / 2.0) * \
specimen_displacement / diffractometer_radius
conv.real = 0
conv.imag = self.omega_vals
# convolver *= numpy.exp(complex(0, -delta-zero_error_deg*pi/180.0) *
# omega_vals)
conv.imag *= (-delta - math.radians(zero_error_deg) -
(twotheta0 - self.twotheta_window_center))
numpy.exp(conv, conv)
return conv
[docs]
def conv_receiver_slit(self):
"""
compute the rectangular convolution for the receiver slit or SiPSD
pixel size
Returns
-------
array-like
the convolver
"""
me = self.get_function_name() # the name of the convolver, as a string
# The receiver slit convolution is a top-hat of angular half-width
# a=(slit_width/2)/diffractometer_radius
# which has Fourier transform of sin(a omega)/(a omega)
# NOTE! numpy's sinc(x) is sin(pi x)/(pi x), not sin(x)/x
if self.param_dicts[me].get("slit_width", None) is None:
return None
epsr = (self.param_dicts["conv_receiver_slit"]["slit_width"] /
self.param_dicts["conv_global"]["diffractometer_radius"])
return self.general_tophat(me, epsr)
[docs]
def conv_si_psd(self):
"""
compute the convolver for the integral of defocusing of the face of an
Si PSD
Returns
-------
array-like
the convolver
"""
# omega offset defocussing from Cheary, Coelho & Cline 2004 eq. 15
# expressed in terms of a Si PSD looking at channels with vertical
# offset from the center between psd_window_lower_offset and
# psd_window_upper_offset do this last, because we may ultimately take
# a list of bounds, and return a list of convolutions, for efficiency
me = self.get_function_name() # the name of the convolver, as a string
kwargs = {}
kwargs.update(self.param_dicts[me]) # get all of our parameters
if not kwargs:
return None
kwargs.update(self.param_dicts["conv_global"])
flag, conv = self.get_conv(me, kwargs, float)
if flag:
return conv # already up to date
if not (kwargs["equatorial_divergence_deg"] and
kwargs["si_psd_window_bounds"]):
# if either of these is zero or None, convolution is trivial
conv[:] = 1
return conv
psd_lower_win_pos, psd_upper_win_pos = kwargs["si_psd_window_bounds"]
dthl = psd_lower_win_pos / kwargs["diffractometer_radius"]
dthu = psd_upper_win_pos / kwargs["diffractometer_radius"]
alpha = math.radians(kwargs["equatorial_divergence_deg"])
argscale = alpha / (2.0 * tan(kwargs["twotheta0"] / 2))
# WARNING si(x)=integral(sin(x)/x), not integral(sin(pi x)/(pi x))
# i.e. they sinc function is not consistent with the si function
# whence the missing pi in the denominator of argscale
rb1 = self._rb1
rb2 = self._rb2
rb3 = self._rb3
rb1[:] = self.omega_vals
rb1 *= argscale * dthu
sici(rb1, conv, rb3) # gets both sine and cosine integral, si in conv
if dthl: # no need to do this if the lower bound is 0
rb1[:] = self.omega_vals
rb1 *= argscale * dthl
sici(rb1, rb2, rb3) # gets sine and cosine integral, si in rb2
conv -= rb2
conv[1:] /= self.omega_vals[1:]
conv *= 1 / argscale
conv[0] = dthu - dthl # fix 0/0 with proper area
return conv
[docs]
def conv_smoother(self):
"""
compute the convolver to smooth the final result with a Gaussian before
downsampling.
Returns
-------
array-like
the convolver
"""
# create a smoother for output result, independent of real physics, if
# wanted
me = self.get_function_name() # the name of the convolver, as a string
if not self.gaussian_smoother_bins_sigma:
return None # no smoothing
flag, buf = self.get_conv(me, self.gaussian_smoother_bins_sigma,
format=float)
if flag:
return buf # already computed
buf[:] = self.omega_vals
buf *= (self.gaussian_smoother_bins_sigma * (
self.twothetasamples[1] - self.twothetasamples[0]))
buf *= buf
buf *= -0.5
numpy.exp(buf, buf)
return buf
[docs]
def compute_line_profile(self, convolver_names=None,
compute_derivative=False,
return_convolver=False):
"""
execute all the convolutions; if convolver_names is None, use
everything we have, otherwise, use named convolutions.
Parameters
----------
convolver_names: list
a list of convolvers to select. If *None*, use all found
convolvers.
compute_derivative: bool
if *True*, also return d/dx(function) for peak position fitting
Returns
-------
object
a profile_data object with much information about the peak
"""
# create a function which is the Fourier transform of the
# combined convolutions of all the factors
# ="d" if we are using 'd' space, "twotheta" if using twotheta
anglemode = self.anglemode
# the rough center of the spectrum, used for things which need it.
# Copied from global convolver.
self.dominant_wavelength = dominant_wavelength = self.param_dicts[
"conv_global"].get("dominant_wavelength", None)
if anglemode == "twotheta":
twotheta0_deg = self.param_dicts["conv_global"]["twotheta0_deg"]
twotheta0 = math.radians(twotheta0_deg)
d = dominant_wavelength / (2 * sin(twotheta0 / 2.0))
else:
d = self.param_dicts["conv_global"]["d"]
twotheta0 = 2 * math.asin(dominant_wavelength / (2.0 * d))
twotheta0_deg = math.degrees(twotheta0)
# set these in global namespace
self.set_parameters(d=d, twotheta0=twotheta0,
twotheta0_deg=twotheta0_deg)
if convolver_names is None:
convolver_names = self.convolver_funcs.keys() # get all names
# run through the name list, and call the convolver to harvest its
# result
conv_list = [self.convolver_funcs[x]() for x in convolver_names]
# now, multiply everything together
convolver = self._cb1 # get a complex scratch buffer
convolver[:] = 1 # initialize
for c in conv_list: # accumulate product
if c is not None:
convolver *= c
if convolver[1].real > 0: # recenter peak!
convolver[1::2] *= -1
peak = best_irfft(convolver)
# now, use the trick from Mendenhall JQSRT Voigt paper to remove
# periodic function correction
# JQSRT 105 number 3 July 2007 p. 519 eq. 7
# total lor widths, created by the various colvolvers
correction_width = 2 * sum(self.lor_widths.values())
d2p = 2.0 * pi / self.window_fullwidth
alpha = correction_width / 2.0 # be consistent with convolver
mu = (peak * self.twothetasamples).sum() / peak.sum() # centroid
dx = self.twothetasamples - mu
eps_corr1 = (math.sinh(d2p * alpha) / self.window_fullwidth) / \
(math.cosh(d2p * alpha) - ncos(d2p * dx))
eps_corr2 = (alpha / pi) / (dx * dx + alpha * alpha)
corr = (convolver[0].real / numpy.sum(eps_corr2)) * \
(eps_corr1 - eps_corr2)
peak -= corr
peak *= self.window_fullwidth / \
(self.twotheta_output_points / self.oversampling) # scale to area
if compute_derivative:
# this is useful
convolver *= self.omega_vals
convolver *= complex(0, 1)
deriv = best_irfft(convolver)
deriv *= self.window_fullwidth / \
(self.twotheta_output_points / self.oversampling)
deriv = deriv[::self.oversampling]
else:
deriv = None
result = profile_data(twotheta0_deg=math.degrees(twotheta0),
twotheta=self.twothetasamples[
::self.oversampling],
omega_inv_deg=self.omega_inv_deg[
:self.twotheta_output_points // 2 + 1],
twotheta_deg=self.twothetasamples_deg[
::self.oversampling],
peak=peak[::self.oversampling],
derivative=deriv
)
if return_convolver:
result.add_symbol(
convolver=convolver[:self.twotheta_output_points//2+1])
return result
[docs]
def self_clean(self):
"""
do some cleanup to make us more compact;
Instance can no longer be used after doing this, but can be pickled.
"""
clean = self._clean_on_pickle
pd = dict()
pd.update(self.__dict__)
for thing in pd:
x = getattr(self, thing)
if id(x) in clean:
delattr(self, thing)
# delete extra attributes cautiously, in case we have already been
# cleaned
for k in ('convolver_funcs', 'convolvers',
'factors', 'convolution_history'):
if pd.pop(k, None) is not None:
delattr(self, k)
def __getstate__(self):
"""
return information for pickling. Removes transient data from cache of
shadow copy so resulting object is fairly compact. This does not
affect the state of the actual instance.
Returns
-------
dict
dictionary of sufficient information to reanimate instance.
"""
# do some cleanup on state before we get pickled
# (even if main class not cleaned)
clean = self._clean_on_pickle
pd = dict()
pd.update(self.__dict__)
for thing in pd:
x = getattr(self, thing)
if id(x) in clean:
del pd[thing]
# delete extra attributes cautiously, in case we have already been
# cleaned
for k in ('convolver_funcs', 'convolvers',
'factors', 'convolution_history'):
pd.pop(k, None)
return pd
def __setstate__(self, setdict):
"""
reconstruct class from pickled information
This rebuilds the class instance so it is ready to use on unpickling.
Parameters
----------
self : FP_Profile
an empty class instance
setdict : dict
dictionary from FP_profile.__getstate__()
"""
self.__init__(anglemode=setdict["anglemode"],
gaussian_smoother_bins_sigma=setdict[
"gaussian_smoother_bins_sigma"],
oversampling=setdict["oversampling"]
)
for k, v in setdict.items():
setattr(self, k, v)
try:
s = self
self.set_window(
twotheta_window_center_deg=s.twotheta_window_center_deg,
twotheta_window_fullwidth_deg=s.twotheta_window_fullwidth_deg,
twotheta_output_points=s.twotheta_output_points
)
# override clearing of this by set_window
self.lor_widths = setdict["lor_widths"]
except AttributeError:
pass
[docs]
class convolver_handler:
"""
manage the convolvers of on process
"""
[docs]
def __init__(self):
self.convolvers = []
[docs]
def add_convolver(self, convolver):
self.convolvers.append(convolver)
[docs]
def update_parameters(self, parameters):
for c in self.convolvers:
for k, v in parameters.items():
if k == 'classoptions':
continue
c.set_parameters(convolver=k, **v)
[docs]
def set_windows(self, centers, npoints, flag, width):
for c, cen, np, f in zip(self.convolvers, centers, npoints, flag):
if f:
c.set_window(twotheta_output_points=np,
twotheta_window_center_deg=cen,
twotheta_window_fullwidth_deg=width)
[docs]
def calc(self, run, ttpeaks):
"""
calculate profile function for selected convolvers
Parameters
----------
run : list
list of flags of length of convolvers to tell which convolver needs
to be run
ttpeaks : array-like
peak positions for the convolvers
Returns
-------
list
list of profile_data result objects
"""
results = []
for c, flag, tt in zip(self.convolvers, run, ttpeaks):
if flag:
c.set_parameters(twotheta0_deg=tt)
res = c.compute_line_profile()
del res.twotheta, res.omega_inv_deg
del res.dictionary, res.derivative
results.append(res)
else:
results.append(None)
return results
[docs]
def chunkify(lst, n):
return [lst[i::n] for i in range(n)]
[docs]
class manager(BaseManager):
pass
[docs]
class PowderDiffraction(PowderExperiment):
"""
Experimental class for powder diffraction. This class calculates the
structure factors of powder diffraction lines and uses instances of
FP_profile to perform the convolution with experimental resolution function
calculated by the fundamental parameters approach. This class uses
multiprocessing to speed up calculation. Set config.NTHREADS=1 to restrict
this to one worker process.
"""
_valid_init_kwargs = copy.copy(PowderExperiment._valid_init_kwargs)
_valid_init_kwargs['tt_cutoff'] = '2Theta cut off value in degree'
_valid_init_kwargs['fpclass'] = 'FP_profile derived class'
_valid_init_kwargs['fpsettings'] = 'settings dictionaries'
_valid_init_kwargs['enable_simulation'] = 'flag to enable simulation mode'
del _valid_init_kwargs['sampleor']
[docs]
def __init__(self, mat, **kwargs):
"""
the class is initialized with a xrayutilities.materials.Crystal
instance and calculates the powder intensity and peak positions of the
Crystal up to an angle of tt_cutoff. Results are stored in
data .... array with intensities
ang ..... Bragg angles of the peaks (Theta!)
qpos .... reciprocal space position of intensities
If `enable_simulation` is set to True the `Convolve` and `Calculate`
methods can be used to calculated a powder pattern. Upon such
initialization it is strongly recommend to call the `close` method to
clean up the multiprocessing threads when no further calculations will
be performed.
Parameters
----------
mat : Crystal or Powder
material for the powder calculation
kwargs : dict
optional keyword arguments same as for the Experiment base class
and:
tt_cutoff : float, optional
Powder peaks are calculated up to an scattering angle of tt_cutoff
(deg)
enable_simulation: False, optional
enables the initialization of the convolvers. Call `close()` after
you are finished with your instance!
fpclass : FP_profile
FP_profile derived class with possible convolver mixins.
(default: FP_profile)
fpsettings : dict
settings dictionaries for the convolvers. Default settings are
loaded from the config file.
"""
utilities.check_kwargs(kwargs, self._valid_init_kwargs,
self.__class__.__name__)
if isinstance(mat, materials.Crystal):
self.mat = Powder(mat, 1)
elif isinstance(mat, Powder):
self.mat = mat
else:
raise TypeError("mat must be an instance of class "
"xrayutilities.materials.Crystal or "
"xrayutilities.simpack.Powder")
self.data = dict()
self._tt_cutoff = kwargs.pop('tt_cutoff', 180)
self.fpclass = kwargs.pop('fpclass', FP_profile)
self.settings = self.load_settings_from_config(
kwargs.pop('fpsettings', {}))
self.set_sample_parameters() # initialize local settings
self._enable_sim = kwargs.pop('enable_simulation', False)
PowderExperiment.__init__(self, **kwargs)
# number of significant digits, needed to identify equal floats
self.digits = 5
# determine if convolvers are isotropic
self.isotropic = self.fpclass.isotropic
# calculate powder lines position and intensities
self.init_powder_lines(self._tt_cutoff)
# initialize FP_profile instances (add field to the data dictionary)
for h in self.data:
self.data[h]['conv'] = self._init_fpprofile(self.fpclass)
# set settings in all convolvers
self.update_settings(self.settings)
self.set_sample_parameters()
# set some other class properties
self.__tt = None
self.__ww = None
# initialize multiprocessing
if self._enable_sim:
self._init_multiprocessing()
# set wavelength from class constructor
if 'wl' in kwargs:
self._set_wavelength_pd(kwargs['wl'])
if 'en' in kwargs:
self._set_energy_pd(kwargs['en'])
if self.settings['global']['geometry'] != 'symmetric':
warnings.warn("PowderDiffraction: geometry '%s' not fully "
"supported, proceed with caution!"
% self.settings['global']['geometry'])
def _init_multiprocessing(self):
"""
initialize multiprocessing for powder pattern calculation
"""
# The structure of the multiprocessing code is as follows:
# There are nproc "manager"s which handle the actual convolver code.
# Additionally there are 4 daemon threads which listen for work to be
# distributed to the managers.
np = config.NTHREADS
self.nproc = np if np != 0 else multiprocessing.cpu_count()
self.chunks = chunkify(list(self.data), self.nproc)
self.next_proc = len(self.data) % self.nproc
manager.register("conv", convolver_handler)
self.managers = [manager() for idx in range(self.nproc)]
self.conv_handlers = []
self.threads = []
self.output_queue = queue.Queue()
for idx, mg in enumerate(self.managers):
mg.start()
m = mg.conv()
for h in self.chunks[idx]:
m.add_convolver(self.data[h]['conv'])
self.conv_handlers.append(m)
self.threads.append((
threading.Thread(target=self._send_work, args=(idx, )),
queue.Queue(), self.output_queue))
self._running = True
for th, _, _ in self.threads:
th.daemon = True
th.start()
atexit.register(self.__stop__)
[docs]
def close(self):
self.__stop__()
def __stop__(self):
self._running = False
try: # try/except needed only for python2 compatibility
# end daemon threads which distribute the work load
for th, q1, _ in self.threads:
q1.put(None)
th.join()
delattr(self, 'threads')
# end managers which handle the convolvers
delattr(self, 'conv_handlers')
for m in self.managers:
m.shutdown()
delattr(self, 'managers')
except AttributeError:
pass
atexit.unregister(self.__stop__)
[docs]
def load_settings_from_config(self, settings):
"""
load parameters from the config and update these settings with the
options from the settings parameter
"""
params = dict()
for k in config.POWDER:
params[k] = dict()
params[k].update(config.POWDER[k])
if k in settings:
params[k].update(settings[k])
for k in settings:
if k not in config.POWDER:
params[k] = dict()
params[k].update(settings[k])
return params
def _init_fpprofile(self, fpclass):
"""
configure the default parameters of the FP_profile class and return an
instance with these settings
Parameters
----------
fpclass : FP_profile
class with possible mixins which implement more convolvers
Returns
-------
fpclass
instance of fpclass
"""
classparams = dict()
classparams.update(self.settings['classoptions'])
classparams.pop('window_width')
p = fpclass(**classparams)
p.debug_cache = False
return p
def _set_wavelength_pd(self, wl):
PowderExperiment._set_wavelength(self, wl)
s = {'emission': {'emiss_wavelengths': self.wavelength*1e-10}}
self.update_settings(s)
def _set_energy_pd(self, energy):
PowderExperiment._set_energy(self, energy)
s = {'emission': {'emiss_wavelengths': self.wavelength*1e-10}}
self.update_settings(s)
energy = property(PowderExperiment._get_energy, _set_energy_pd)
wavelength = property(PowderExperiment._get_wavelength, _set_wavelength_pd)
[docs]
def set_wavelength_from_params(self):
"""
sets the wavelenth in the base class from the settings dictionary of
the FP_profile classes and also set it in the 'global' part of the
parameters
"""
if 'emission' in self.settings:
pem = self.settings['emission']
if 'emiss_wavelengths' in pem:
wl = pem['emiss_wavelengths'][0]
self.settings['global']['dominant_wavelength'] = wl
for d in self.data.values():
fp = d['conv']
fp.set_parameters(convolver='global',
**self.settings['global'])
# set wavelength in base class
PowderExperiment._set_wavelength(self, wl*1e10)
[docs]
def set_sample_parameters(self):
"""
load sample parameters from the Powder class and use them in all
FP_profile instances of this object
"""
samplesettings = {}
for prop, default in (('crystallite_size_lor', 1e10),
('crystallite_size_gauss', 1e10),
('strain_lor', 0),
('strain_gauss', 0),
('preferred_orientation', (0, 0, 0)),
('preferred_orientation_factor', 1)):
samplesettings[prop] = getattr(self.mat, prop, default)
self.settings['emission'].update(samplesettings)
for d in self.data.values():
fp = d['conv']
fp.set_parameters(convolver='emission', **samplesettings)
[docs]
def update_settings(self, newsettings=None):
"""
update settings of all instances of FP_profile
Parameters
----------
newsettings : dict, optional
dictionary with new settings. It has to include one subdictionary
for every convolver which should have its settings changed.
"""
if newsettings is None:
return
if 'global' in newsettings:
if 'dominant_wavelength' in newsettings['global']:
print('PowderDiffraction: dominant wavelength is a read only'
'setting \n -> use emission: emiss_wavelength instead')
if 'emission' in newsettings:
nem = newsettings['emission']
for k in ('emiss_wavelengths', 'emiss_intensities',
'emiss_gauss_widths', 'emiss_lor_widths'):
if k in nem:
if isinstance(nem[k], numbers.Number):
nem[k] = (nem[k], )
for k in newsettings:
if k == 'classoptions':
continue
for d in self.data.values():
fp = d['conv']
fp.set_parameters(convolver=k, **newsettings[k])
if k not in self.settings:
self.settings[k] = dict()
self.settings[k].update(newsettings[k])
self.set_wavelength_from_params()
@property
def twotheta(self):
return self.__tt
@twotheta.setter
def twotheta(self, tt):
oldtt = self.__tt
self.__tt = tt
if oldtt is None:
self.set_window()
elif len(oldtt) != len(self.__tt):
self.set_window(force=True)
elif not numpy.all(numpy.equal(oldtt, self.__tt)):
self.set_window(force=True)
@property
def window_width(self):
return self.__ww
@window_width.setter
def window_width(self, ww):
oldww = self.__ww
if ww == 'config':
self.__ww = config.POWDER['classoptions']['window_width']
else:
self.__ww = ww
if oldww != self.__ww:
self.set_window(force=True)
[docs]
def set_window(self, force=False):
"""
sets the calcultion window for all convolvers
"""
ww = self.window_width
tt = self.twotheta
if not ww or tt is None: # not all necessary information is set up
return
npoints = dict()
nset = dict()
ttmax = tt.max()
ttmin = tt.min()
for h, d in self.data.items():
ttpeak = 2 * d['ang']
if ttpeak - ww/2 > ttmax or ttpeak + ww/2 < ttmin:
continue
idx = numpy.argwhere(numpy.logical_and(tt > ttpeak - ww/2,
tt < ttpeak + ww/2))
try:
np = int(math.ceil(len(idx) /
(tt[idx[-1, 0]]-tt[idx[0, 0]]) * ww))
except OverflowError:
np = 1
npoints[h] = np
if hasattr(d['conv'], 'twotheta_window_center_deg'):
fptt = d['conv'].twotheta_window_center_deg
if abs(ttpeak-fptt) / ww < 0.25 and not force:
continue
nset[h] = True
else:
nset[h] = True
# set window in local instances
d['conv'].set_window(twotheta_output_points=np,
twotheta_window_center_deg=ttpeak,
twotheta_window_fullwidth_deg=ww)
# set multiprocessing instances
for chunk, handler in zip(self.chunks, self.conv_handlers):
handler.set_windows([2 * self.data[h]['ang'] for h in chunk],
[npoints.get(h, 0) for h in chunk],
[nset.get(h, False) for h in chunk], ww)
def _send_work(self, idx):
"""
a threaded block which watches for data and runs computation
"""
_, qinput, qoutput = self.threads[idx]
while self._running:
try:
settings, run, ttpeaks = qinput.get(True)
except TypeError:
break
handler = self.conv_handlers[idx]
handler.update_parameters(settings)
results = handler.calc(run, ttpeaks)
qoutput.put((idx, results)) # put results on output queue
self._running = False
[docs]
def reflection_strength(self, tt_cutoff):
"""
determine structure factors/reflection strength of all Bragg peaks up
to tt_cutoff. This function also implements the March-Dollase model for
preferred orientation in the symmetric reflection mode. Note that
although this means the sample has anisotropic properties the various
lines can still be merged together since at the moment no anisotropic
crystal shape is supported.
Parameters
----------
tt_cutoff : float
upper cutoff value of 2theta until which the reflection strength
are calculated
Returns
-------
ndarray
numpy array with field for 'hkl' (Miller indices of the peaks), 'q'
(q-position), and 'r' (reflection strength) of the Bragg peaks
"""
mat = self.mat.material
pref_or = self.settings['emission']['preferred_orientation']
r = self.settings['emission']['preferred_orientation_factor']
mode = self.settings['global']['geometry']
ai = self.settings['global']['geometry_incidence_angle']
qmax = 2 * self.k0 * sin(math.radians(tt_cutoff/2))
# get allowed Bragg peaks
hkl = tuple(mat.lattice.get_allowed_hkl(qmax))
q = mat.Q(hkl)
qnorm = numpy.linalg.norm(q, axis=1)
# March-Dollase model for preferred orientation
# see http://www.crl.nitech.ac.jp/ar/2013/0711_acrc_ar2013_review.pdf
def fdsum(alpha, delta, r, N=8):
alpha[alpha == 0] += config.EPSILON # alpha = 0 unstable
alpha[alpha == pi] -= config.EPSILON # alpha = pi unstable
r3m1 = r**3 - 1
xi0 = -ncos(alpha-delta)/nsqrt(1+(r3m1)*ncos(alpha-delta)**2)
xi1 = -ncos(alpha+delta)/nsqrt(1+(r3m1)*ncos(alpha+delta)**2)
sad2 = (nsin(alpha) * nsin(delta))**2
cad = ncos(alpha) * ncos(delta)
def h(xi):
return 1 / nsqrt(sad2 - (cad + xi/nsqrt(1-(r3m1)*xi**2))**2)
def w(j, N):
return pi/(2*N)*sin((j+0.5)*pi/N)
def xi(j, N):
return (xi1+xi0)/2 + (xi1-xi0)/2*cos((j+0.5)*pi/N)
s = numpy.sum([w(j, N)*h(xi(j, N)) for j in range(N)], axis=0)
return r**(3/2) / pi * (xi1-xi0) * s
if numpy.all(pref_or == (0, 0, 0)) or r == 1:
f = 1
else:
alpha = VecAngle(q, mat.Q(pref_or))
if mode == 'symmetric':
f = ((r * ncos(alpha))**2 + nsin(alpha)**2/r)**(-3/2)
elif mode == 'capillary':
f = fdsum(alpha, pi/2, r)
elif mode == 'asymmetric':
if not isinstance(ai, numbers.Number):
raise ValueError("'geometry_incidence_angle' must be a "
"number")
th = self.Q2Ang(qnorm, deg=False)
f = fdsum(alpha, nabs(th-math.radians(ai)), r)
else:
raise ValueError("xu.simpack.PowderDiffraction: invalid "
"geometry mode (%s)" % mode)
# assemble return value
data = numpy.zeros(len(hkl), dtype=[('q', numpy.double),
('r', numpy.double),
('hkl', numpy.ndarray)])
data['q'] = qnorm
data['r'] = nabs(mat.StructureFactorForQ(q, self.energy)) ** 2 * f
data['hkl'] = hkl
return data
[docs]
def merge_lines(self, data):
"""
if calculation is isotropic lines at the same q-position can be merged
to one line to reduce the calculational effort
Parameters
----------
data : ndarray
numpy field array with values of 'hkl' (Miller indices of the
peaks), 'q' (q-position), and 'r' (reflection strength) as produced
by the `reflection_strength` method
Returns
-------
hkl, q, ang, r : array-like
Miller indices, q-position, diffraction angle (Theta), and
reflection strength of the material
"""
data.sort(order=['q', 'hkl'])
qpos = []
refstrength = []
hkl = []
def add_lines(q, ref, chkl):
for R, m in zip(ref, chkl):
qpos.append(q)
refstrength.append(R)
hkl.append(m)
currq = -1
curref = []
currhkl = []
for r in data:
if not numpy.isclose(r[0] - currq, 0):
add_lines(currq, curref, currhkl)
currq = r[0]
curref = [r[1], ]
currhkl = [r[2], ]
else:
if self.isotropic:
curref[-1] += r[1]
currhkl[-1] = r[2]
else:
# merge lines which are equal according to the crystal
# and convolver symmetries
added = False
for i, m in enumerate(currhkl):
if self.mat.material.lattice.isequivalent(m, r[2]):
if self.fpclass.isequivalent(
m, r[2],
self.mat.material.lattice.crystal_system):
curref[i] += r[1]
added = True
if not added:
curref.append(r[1])
currhkl.append(r[2])
# add remaining lines
add_lines(currq, curref, currhkl)
return (hkl,
numpy.array(qpos, dtype=numpy.double),
self.Q2Ang(qpos),
numpy.array(refstrength, dtype=numpy.double))
[docs]
def correction_factor(self, ang):
"""
calculate the correction factor for the diffracted intensities. This
contains the polarization effects and the Lorentz factor
Parameters
----------
ang : aray-like
theta diffraction angles for which the correction should be
calculated
Returns
-------
f : array-like
array of the same shape as ang containing the correction factors
"""
# correct data for polarization and lorentzfactor and unit cell volume
# see L.S. Zevin : Quantitative X-Ray Diffractometry
# page 18ff
polarization_factor = (1 +
ncos(numpy.radians(2 * ang)) ** 2) / 2
lorentz_factor = 1. / (nsin(numpy.radians(ang)) ** 2 *
ncos(numpy.radians(ang)))
unitcellvol = self.mat.material.lattice.UnitCellVolume()
return polarization_factor * lorentz_factor / unitcellvol ** 2
[docs]
def init_powder_lines(self, tt_cutoff):
"""
calculates the powder intensity and positions up to an angle of
tt_cutoff (deg) and stores the result in the data dictionary whose
structure is as follows:
The data dictionary has one entry per line with a unique identifier
as key of the entry. The entries themself are dictionaries which
have the following entries:
* hkl : (h, k, l), Miller indices of the Bragg peak
* r : reflection strength of the line
* ang : Bragg angle of the peak (theta = 2theta/2!)
* qpos : reciprocal space position
"""
tmp_data = self.reflection_strength(tt_cutoff)
hkl, qpos, ang, rs = self.merge_lines(tmp_data)
corrfact = self.correction_factor(ang)
rs *= corrfact
ids = [tuple(idx) for idx in hkl]
for i, q, a, r in zip(ids, qpos, ang, rs):
active = True if r/rs.max() > config.EPSILON else False
self.data[i] = {'qpos': q, 'ang': a, 'r': r,
'active': active}
[docs]
def update_powder_lines(self, tt_cutoff):
"""
calculates the powder intensity and positions up to an angle of
tt_cutoff (deg) and updates the values in:
* ids: list of unique identifiers of the powder line
* data: array with intensities
* ang: bragg angles of the peaks (theta=2theta/2!)
* qpos: reciprocal space position of intensities
"""
tmp_data = self.reflection_strength(tt_cutoff)
hkl, qpos, ang, rs = self.merge_lines(tmp_data)
corrfact = self.correction_factor(ang)
rs *= corrfact
ids = [tuple(idx) for idx in hkl]
rsmax = rs.max()
for h, q, a, r in zip(ids, qpos, ang, rs):
active = True if r/rsmax > config.EPSILON else False
if h in self.data:
self.data[h]['qpos'] = q
self.data[h]['ang'] = a
self.data[h]['r'] = r
self.data[h]['active'] = active
else:
# new peak needs a new convolver
fp = self._init_fpprofile(self.fpclass)
for k, v in self.settings.items():
if k == 'classoptions':
continue
fp.set_parameters(convolver=k, **v)
self.data[h] = {'qpos': q, 'ang': a, 'r': r,
'conv': fp, 'active': active}
if self._enable_sim:
self.conv_handlers[self.next_proc].add_convolver(fp)
self.chunks[self.next_proc].append(h)
self.next_proc = (self.next_proc + 1) % self.nproc
for h in self.data:
if h not in ids:
# make entry inactive
self.data[h]['active'] = False
[docs]
def Convolve(self, twotheta, window_width='config', mode='multi'):
"""
convolute the powder lines with the resolution function and map them
onto the twotheta positions. This calculates the powder pattern
excluding any background contribution
Parameters
----------
twotheta : array-like
two theta values at which the powder pattern should be calculated.
window_width : float, optional
width of the calculation window of a single peak
mode : {'multi, 'local'}, optional
multiprocessing mode, either 'multi' to use multiple processes or
'local' to restrict the calculation to a single process
Note:
Bragg peaks are only included up to tt_cutoff set in the class
constructor!
Returns
-------
output intensity values for the twotheta values given in the input
"""
if self._enable_sim:
t_start = time.time()
out = numpy.zeros_like(twotheta)
tt = self.twotheta = twotheta
self.window_width = window_width
ww = self.window_width
ttmax = tt.max()
ttmin = tt.min()
# check if twotheta range extends above tt_cutoff
if ttmax > self._tt_cutoff:
warnings.warn('twotheta range is larger then tt_cutoff. '
'Possibly Bragg peaks in the convolution range '
'are not considered!')
if mode == 'local':
for h, d in self.data.items():
if not d['active']:
continue
ttpeak = 2 * d['ang']
# check if peak is in data range to be calculated
if ttpeak - ww/2 > ttmax or ttpeak + ww/2 < ttmin:
continue
idx = numpy.argwhere(numpy.logical_and(tt > ttpeak - ww/2,
tt < ttpeak + ww/2))
d['conv'].set_parameters(twotheta0_deg=ttpeak)
result = d['conv'].compute_line_profile()
out[idx] += numpy.interp(tt[idx], result.twotheta_deg,
result.peak*d['r'], left=0,
right=0)
else:
# prepare multiprocess calculation
for idx, chunk in enumerate(self.chunks):
run = []
ttpeaks = []
for h in chunk:
ttpeak = 2 * self.data[h]['ang']
ttpeaks.append(ttpeak)
if (ttpeak - ww/2 > ttmax or
ttpeak + ww/2 < ttmin):
run.append(False)
else:
run.append(True)
if not self.data[h]['active']:
run[-1] = False
# start calculation in other processes
self.threads[idx][1].put((self.settings, run, ttpeaks))
gotit = set(range(self.nproc))
while gotit:
# receive ready calculations
idx, res = self.output_queue.get(True)
chunk = self.chunks[idx]
for h, r in zip(chunk, res):
if r is None:
continue
ttpeak = 2 * self.data[h]['ang']
mask = numpy.argwhere(
numpy.logical_and(tt > ttpeak - ww/2,
tt < ttpeak + ww/2))
out[mask] += numpy.interp(tt[mask], r.twotheta_deg,
r.peak*self.data[h]['r'],
left=0, right=0)
gotit.discard(idx) # got that result, don't expect more
if config.VERBOSITY >= config.INFO_ALL:
print("XU.Powder.Convolute: exec time=", time.time() - t_start)
return out
print("XU.Powder: not initialized for calculation -> exiting!")
return None
[docs]
def Calculate(self, twotheta, **kwargs):
"""
calculate the powder diffraction pattern including convolution with the
resolution function and map them onto the twotheta positions. This also
performs the calculation of the peak intensities from the internal
material object
Parameters
----------
twotheta : array-like
two theta values at which the powder pattern should be calculated.
kwargs : dict
additional keyword arguments are passed to the Convolve function
Returns
-------
array-like
output intensity values for the twotheta values given in the input
Notes
-----
Bragg peaks are only included up to tt_cutoff set in the class
constructor!
"""
if self._enable_sim:
self.set_sample_parameters()
self.update_powder_lines(self._tt_cutoff)
self.set_window()
return self.Convolve(twotheta, **kwargs)
print("XU.Powder: not initialized for calculation -> exiting!")
return None
def __str__(self):
"""
Prints out available information about the material and reflections
"""
ostr = "Powder diffraction object\n"
ostr += "-------------------------\n"
ostr += self.mat.__repr__() + "\n"
ostr += "Lattice:\n" + self.mat.material.lattice.__str__()
rmax = 0
for d in self.data.values():
if d['r'] > rmax:
rmax = d['r']
ostr += "\nReflections:\n"
ostr += "------------\n"
ostr += (" h k l | tth | |Q| |"
"Int | Int (%)\n")
ostr += (" ------------------------------------"
"---------------------------\n")
for h, d in sorted(self.data.items(), key=lambda t: t[1]['ang']):
if d['active']:
ostr += ("%15s %8.4f %8.3f %10.2f %10.2f\n"
% (h.__str__(), 2 * d['ang'],
d['qpos'], d['r'], d['r'] / rmax * 100.))
if self._enable_sim:
ostr += "Settings: " + str(self.settings)
return ostr