# This file is part of xrayutilities.
# xrayutilities is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <>.
# Copyright (C) 2009-2010 Eugen Wintersberger <>
# Copyright (C) 2010-2012,2014-2019
#               Dominik Kriegner <>
# Copyright (C) 2012 Tanja Etzelstorfer <>

# module for handling files stored in the EDF data format developed by the ESRF

import os.path
import re
import struct

import numpy

from .. import config, utilities
from .filedir import FileDirectory
from .helper import xu_h5open, xu_open

edf_kv_split = re.compile(r"\s*=\s*")  # key value sepeartor for header data
edf_eokv = re.compile(r";")  # end of line for a header
# regular expressions for several ASCII representations of numbers
edf_integer_value = re.compile(r"\d+")
edf_float_value = re.compile(r"[+-]*\d+\.*\d*")
edf_float_e_value = re.compile(r"[+-]*\d+\.\d*e[+-]*\d*")
edf_name_start_num = re.compile(r"^\d")

# dictionary mapping EDF data type keywords onto struct data types
DataTypeDict = {"SignedByte": "b",
                "SignedShort": "h",
                "SignedInteger": "i",
                "SignedLong": "i",
                "FloatValue": "f",
                "DoubleValue": "d",
                "UnsignedByte": "B",
                "UnsignedShort": "H",
                "UnsignedInt": "I",
                "UnsignedLong": "L"}
# SignedLong is only 4byte, on my 64bit machine using SignedLong:"l" caused
# troubles
# UnsignedLong is only 4byte, on my 64bit machine using UnsignedLong:"L"
# caused troubles ("I" works)

[docs] class EDFFile:
[docs] def __init__(self, fname, nxkey="Dim_1", nykey="Dim_2", dtkey="DataType", path="", header=True, keep_open=False): """ Parameters ---------- fname : str name of the EDF file of type .edf or .edf.gz nxkey : str, optional name of the header key that holds the number of points in x-direction nykey : str, optional name of the header key that holds the number of points in y-direction dtkey : str, optional name of the header key that holds the datatype for the binary data path : str, optional path to the EDF file header : bool, optional has header (default true) keep_open : bool, optional if True the file handle is kept open between multiple calls which can cause significant speed-ups """ self.filename = fname self.full_filename = os.path.join(path, fname) # evaluate keyword arguments self.nxkey = nxkey self.nykey = nykey self.dtkey = dtkey self.headerflag = header # create attributes for holding data self._data = {} self._headers = [] self._data_offsets = [] self._data_read = False self._dimx = [] self._dimy = [] self._byte_order = [] self._fmt_str = [] self._dtype = [] self.Parse() if keep_open: self.fid = xu_open(self.full_filename, 'rb') else: self.fid = None self.nimages = len(self._data_offsets) self.header = self._headers[0]
[docs] def Parse(self): """ Parse file to find the number of entries and read the respective header information """ header = {} offset = 0 with xu_open(self.full_filename, 'rb') as fid: if config.VERBOSITY >= config.INFO_ALL: print(f" file: {self.full_filename}") if self.headerflag: while True: # until end of file hdr_flag = False ml_value_flag = False # marks a multiline header for line in fid: # until end of header linelength = len(line) offset += linelength line = line.decode('ascii', 'ignore') if config.VERBOSITY >= config.DEBUG: print(line) if line == "": break # remove leading and trailing whitespace symbols line = line.strip() if line == "{" and not hdr_flag: # start with header hdr_flag = True header = {} continue if hdr_flag: # stop reading when the end of the header # is reached if line == "}": # place offset reading here - here we get the # real starting position of the binary data!! break # continue if the line has no content if line == "": continue # split key and value of the header entry if not ml_value_flag: try: key, value = edf_kv_split.split(line, 1) except ValueError: print(f" line: {line}") key = key.strip() value = value.strip() # if the value extends over multiple lines set # the multiline value flag if value[-1] != ";": ml_value_flag = True else: value = value[:-1] value = value.strip() header[key] = value else: value = value + line if value[-1] == ";": ml_value_flag = False value = value[:-1] value = value.strip() header[key] = value else: break # append header to class variables self._byte_order.append(header["ByteOrder"]) self._fmt_str.append(DataTypeDict[header[self.dtkey]]) self._dimx.append(int(header[self.nxkey])) self._dimy.append(int(header[self.nykey])) self._dtype.append(header[self.dtkey]) self._headers.append(header) self._data_offsets.append(offset) # jump over data block tot_nofp = self._dimx[-1] * self._dimy[-1] dsize = tot_nofp * struct.calcsize(self._fmt_str[-1]) + dsize, 0) offset += dsize else: # in case of no header also save one set of defaults self._byte_order.append('LowByteFirst') self._fmt_str.append(DataTypeDict['UnsignedShort']) self._dimx.append(516) self._dimy.append(516) self._dtype.append('UnsignedShort') self._headers.append(header) self._data_offsets.append(offset) # try to parse motor positions and counters from last found header # into separate dictionary if 'motor_mne' in header: tkeys = header['motor_mne'].split() try: tval = numpy.array(header['motor_pos'].split(), dtype=numpy.double) self.motors = dict(zip(tkeys, tval)) except ValueError: print(" Warning: header conversion " "of motor positions failed") if 'counter_mne' in header: tkeys = header['counter_mne'].split() try: tval = numpy.array(header['counter_pos'].split(), dtype=numpy.double) self.counters = dict(zip(tkeys, tval)) except ValueError: print(" Warning: header conversion " "of counter values failed")
[docs] def ReadData(self, nimg=0): """ Read the CCD data of the specified image and return the data this function is called automatically when the 'data' property is accessed, but can also be called manually when only a certain image from the file is needed. Parameters ---------- nimg : int, optional number of the image which should be read (starts with 0) """ if self.fid: binfid = self.fid # move to the data section - jump over the header[nimg], 0) # read the data tot_nofp = self._dimx[nimg] * self._dimy[nimg] fmt_str = self._fmt_str[nimg] bindata = * struct.calcsize(fmt_str)) else: with xu_open(self.full_filename, 'rb') as binfid: # move to the data section - jump over the header[nimg], 0) # read the data tot_nofp = self._dimx[nimg] * self._dimy[nimg] fmt_str = self._fmt_str[nimg] bindata = * struct.calcsize(fmt_str)) if config.VERBOSITY >= config.DEBUG: print(" read binary data: nofp: %d len: %d" % (tot_nofp, len(bindata))) print(f" format: {fmt_str}") try: data = numpy.frombuffer(bindata, count=tot_nofp, dtype=fmt_str) except ValueError: if fmt_str == 'L': fmt_str = 'I' try: data = numpy.frombuffer(bindata, count=tot_nofp, dtype=fmt_str) except ValueError: raise IOError(" data format (%s) has " "different byte-length, from amount of data " "one expects %d bytes per entry" % (fmt_str, len(bindata) / tot_nofp)) else: raise IOError(" data format (%s) has different " "byte-length, from amount of data one expects " "%d bytes per entry" % (fmt_str, len(bindata) / tot_nofp)) data.shape = (self._dimy[nimg], self._dimx[nimg]) if self._byte_order[nimg] != "LowByteFirst": # data = data.byteswap() print(" check byte order - " "not low byte first") return data
@property def data(self): if not self._data_read: for i in range(self.nimages): self._data[i] = self.ReadData(i) self._data_read = True if self.nimages == 1: return self._data[0] return self._data
[docs] def Save2HDF5(self, h5f, group="/", comp=True): """ Saves the data stored in the EDF file in a HDF5 file as a HDF5 array. By default the data is stored in the root group of the HDF5 file - this can be changed by passing the name of a target group or a path to the target group via the "group" keyword argument. Parameters ---------- h5f : file-handle or str a HDF5 file object or name group : str, optional group where to store the data (default to the root of the file) comp : bool, optional activate compression - true by default """ with xu_h5open(h5f, 'a') as h5: if isinstance(group, str): if group == '/': g = h5 else: if group in h5: del h5[group] g = h5.create_group(group) else: g = group # create the array name ca_name = os.path.split(self.filename)[-1] ca_name = os.path.splitext(ca_name)[0] # perform a second time for case of .edf.gz files ca_name = os.path.splitext(ca_name)[0] ca_name = utilities.makeNaturalName(ca_name) if edf_name_start_num.match(ca_name): ca_name = "ccd_" + ca_name if config.VERBOSITY >= config.INFO_ALL: print(ca_name) # create the array description ca_desc = f"EDF CCD data from file {self.filename} " kwds = {'fletcher32': True} if comp: kwds['compression'] = 'gzip' if self.nimages != 1: ca_name += '_{n:04d}' for n in range(self.nimages): d = self.ReadData(n) name = ca_name.format(n=n) try: ca = g.create_dataset(name, data=d, **kwds) except ValueError: del g[name] ca = g.create_dataset(name, data=d, **kwds) ca.attrs['TITLE'] = ca_desc # finally we have to append the attributes for k in self.header: ca.attrs[utilities.makeNaturalName(k)] = self.header[k]
[docs] class EDFDirectory(FileDirectory): """ Parses a directory for EDF files, which can be stored to a HDF5 file for further usage """
[docs] def __init__(self, datapath, ext="edf", **keyargs): """ Parameters ---------- datapath : str directory of the EDF file ext : str, optional extension of the ccd files in the datapath (default: "edf") keyargs : dict, optional further keyword arguments are passed to EDFFile """ super().__init__(datapath, ext, EDFFile, **keyargs)