Source code for

This module contains:

Parsing functions used to extract information from experimental files.

Classes used to help make parsing more modular. These include the NexusBase
class and its children.

# We've gotta access the _value attribute on some NXobjects.
# pylint: disable=protected-access

import json
import os
from typing import List
from abc import abstractmethod

import as nx
from import nxload
import pandas as pd
import numpy as np
import h5py

from .scan import Scan2D
from .image import Image
from .data import Data
from .region import Region
from .debug import debug
from .metadata import Metadata

[docs]class NexusBase(Metadata): """ This class contains *mostly* beamline agnostic nexus parsing convenience stuff. It's worth noting that this class still makes a series of assumptions about how data is laid out in a nexus file that can be broken. Instead of striving for some impossible perfection, this class is practical in its assumptions of how data is laid out in a .nxs file, and will raise if an assumption is violated. All instrument-specific assumptions that one must inevitably make to extract truly meaningful information from a nexus file are made in children of this class. Attrs: file_path: The local path to the file on the local filesystem. nxfile: The object produced by loading the file at file_path with nxload. """ def __init__(self, local_path: str): super().__init__(local_path) self.nxfile = nxload(local_path) @property def src_path(self): """ The name of this nexus file, as it was recorded when the nexus file was written. """ return self.nxfile.file_name @property def detector(self): """ Returns the NXdetector instance stored in this NexusFile. Raises: ValueError if more than one NXdetector is found. """ det, = self.instrument.NXdetector return det @property def instrument(self): """ Returns the NXinstrument instanced stored in this NexusFile. Raises: ValueError if more than one NXinstrument is found. """ instrument, = self.entry.NXinstrument return instrument @property def entry(self) -> nx.NXentry: """ Returns this nexusfile's entry. Raises: ValueError if more than one entry is found. """ entry, = self.nxfile.NXentry return entry @property def default_signal(self) -> np.ndarray: """ The numpy array of intensities pointed to by the signal attribute in the nexus file. """ return self.default_nxdata[self.default_signal_name].nxdata @property def default_axis(self) -> np.ndarray: """ Returns the nxdata associated with the default axis. """ return self.default_nxdata[self.default_axis_name].nxdata @property def default_signal_name(self): """ Returns the name of the default signal. """ return self.default_nxdata.signal @property def default_axis_name(self) -> str: """ Returns the name of the default axis. """ return self.entry[self.entry.default].axes @property def default_nxdata_name(self): """ Returns the name of the default nxdata. """ return self.entry.default @property def default_nxdata(self) -> np.ndarray: """ Returns the default NXdata. """ return self.entry[self.default_nxdata_name] # A hack to tell pylint that this class is still meant to be abstract. @property @abstractmethod def default_axis_type(self) -> str: return super().default_axis_type()
[docs]class I07Nexus(NexusBase): """ This class extends NexusBase with methods useful for scraping information from nexus files produced at the I07 beamline at Diamond. """ excalibur_detector_2021 = "excroi" excalibur_04_2022 = "exr" @property def local_data_path(self) -> str: """ The local path to the data (.h5) file. Note that this isn't in the NexusBase class because it need not be reasonably expected to point at a .h5 file. Raises: FileNotFoundError if the data file cant be found. """ file = _try_to_find_files( [self._src_data_path], [self.local_path])[0] return file @property def detector_name(self) -> str: """ Returns the name of the detector that we're using. Because life sucks, this is a function of time. """ if "excroi" in self.entry: return I07Nexus.excalibur_detector_2021 if "exr" in self.entry: return I07Nexus.excalibur_04_2022 # Couldn't recognise the detector. raise NotImplementedError() @property def default_axis_name(self) -> str: """ Returns the name of the default axis. """ return self.entry[self.entry.default].axes @property def default_axis_type(self) -> str: """ Returns the type of our default axis, either being 'q', 'th' or 'tth'. """ if self.default_axis_name == 'qdcd': return 'q' if self.default_axis_name == 'diff1delta': return 'tth' # It's also possible that self.default_axis_name isn't recorded in some # nexus files. Just in case, let's check the length of diff1delta. if isinstance(self.instrument["diff1delta"].value.nxdata, np.ndarray): return 'tth' def _get_ith_region(self, i: int): """ Returns the ith region of interest found in the .nxs file. Args: i: The region of interest number to return. This number should match the ROI name as found in the .nxs file (generally not 0 indexed). Returns: The ith region of interest found in the .nxs file. """ x_1 = self.detector[self._get_region_bounds_key(i, 'x_1')][0] x_2 = self.detector[self._get_region_bounds_key(i, 'Width')][0] + x_1 y_1 = self.detector[self._get_region_bounds_key(i, 'y_1')][0] y_2 = self.detector[self._get_region_bounds_key(i, 'Height')][0] + y_1 return Region(x_1, x_2, y_1, y_2) @property def signal_regions(self) -> List[Region]: """ Returns a list of region objects that define the location of the signal. Currently there is nothing better to do than assume that this is a list of length 1. """ if self.detector_name == I07Nexus.excalibur_detector_2021: return [self._get_ith_region(i=1)] if self.detector_name == I07Nexus.excalibur_04_2022: # Make sure our code executes for bytes and strings. try: json_str = self.instrument[ "ex_rois/excalibur_ROIs"]._value.decode("utf-8") except AttributeError: json_str = self.instrument[ "ex_rois/excalibur_ROIs"]._value # This is badly formatted and cant be loaded by the json lib. We # need to make a series of modifications. json_str = json_str.replace('u', '') json_str = json_str.replace("'", '"') roi_dict = json.loads(json_str) return [Region.from_dict(roi_dict['Region_1'])] raise NotImplementedError() @property def background_regions(self) -> List[Region]: """ Returns a list of region objects that define the location of background. Currently we just ignore the zeroth region and call the rest of them background regions. """ if self.detector_name == I07Nexus.excalibur_detector_2021: return [self._get_ith_region(i) for i in range(2, self._number_of_regions+1)] if self.detector_name == I07Nexus.excalibur_04_2022: # Make sure our code executes for bytes and strings. try: json_str = self.instrument[ "ex_rois/excalibur_ROIs"]._value.decode("utf-8") except AttributeError: json_str = self.instrument[ "ex_rois/excalibur_ROIs"]._value # This is badly formatted and cant be loaded by the json lib. We # need to make a series of modifications. json_str = json_str.replace('u', '') json_str = json_str.replace("'", '"') roi_dict = json.loads(json_str) bkg_roi_list = list(roi_dict.values())[1:] return [Region.from_dict(x) for x in bkg_roi_list] raise NotImplementedError() @property def probe_energy(self): """ Returns the energy of the probe particle parsed from this NexusFile. """ return float(self.instrument.dcm1energy.value) @property def transmission(self): """ Proportional to the fraction of probe particles allowed by an attenuator to strike the sample. """ return float(self.instrument.filterset.transmission) @property def detector_distance(self): """ Returns the distance between sample and detector. """ return float(self.instrument.diff1detdist.value) @property def _src_data_path(self): """ Returns the raw path to the data file. This is useless if you aren't on site, but used by islatu to guess where you've stored the data file locally. """ # This is far from ideal; there currently seems to be no standard way # to refer to point at information stored outside of the nexus file. # If you're a human, it's easy enough to find, but with code this is # a pretty rubbish task. Here I just grab the first .h5 file I find # and run with it. found_h5_files = [] def recurse_over_nxgroups(nx_object, found_h5_files): """ Recursively looks for nxgroups in nx_object that, when cast to a string, end in .h5. """ for key in nx_object: new_obj = nx_object[key] if key == "data": if new_obj.tree[8:-9].endswith(".h5"): found_h5_files.append(new_obj.tree[8:-9]) if str(new_obj).endswith(".h5"): found_h5_files.append(str(new_obj)) if str(new_obj).endswith(".h5['/data']"): found_h5_files.append(str(new_obj)[:-9]) if isinstance(new_obj, nx.NXgroup): recurse_over_nxgroups(new_obj, found_h5_files) recurse_over_nxgroups(self.nxfile, found_h5_files) return found_h5_files[0] @property def _region_keys(self) -> List[str]: """ Parses all of the detector's dictionary keys and returns all keys relating to regions of interest. """ return [key for key in self.detector.keys() if key.startswith("Region")] @property def _number_of_regions(self) -> int: """ Returns the number of regions of interest described by this nexus file. This *assumes* that the region keys take the form f'region_{an_int}'. """ split_keys = [key.split('_') for key in self._region_keys] return max([int(split_key[1]) for split_key in split_keys]) def _get_region_bounds_key(self, region_no: int, kind: str) -> List[str]: """ Returns the detector key relating to the bounds of the region of interest corresponding to region_no. Args: region_no: An integer corresponding the the particular region of interest we're interested in generating a key for. kind: The kind of region bounds keys we're interested in. This can take the values: 'x_1', 'width', 'y_1', 'height' where '1' can be replaced with 'start' and with/without caps on first letter of width/height. Raises: ValueError if 'kind' argument is not one of the above. Returns: A list of region bounds keys that is ordered by region number. """ # Note that the x, y swapping is a quirk of the nexus standard, and is # related to which axis on the detector varies most rapidly in memory. if kind in ('x_1', 'x_start'): insert = 'X' elif kind in ('width', 'Width'): insert = 'Width' elif kind in ('y_1', 'y_start'): insert = 'Y' elif kind in ('height', 'Height'): insert = 'Height' else: raise ValueError( "Didn't recognise 'kind' argument.") return f"Region_{region_no}_{insert}"
[docs]def i07_dat_to_dict_dataframe(file_path): """ Parses a .dat file recorded by I07, returning a [now mostly obsolete] tuple containing a metadata dictionary and a pandas dataframe of the data. Though outdated, this is still a handy way to parse the DCD normalization .dat file. Args: (:py:attr:`str`): The ``.dat`` file to be read. Returns: :py:attr:`tuple`: Containing: - :py:attr:`dict`: The metadata from the ``.dat`` file. - :py:class:`pandas.DataFrame`: The data from the ``.dat`` file. """ f_open = open(file_path, "r", encoding='utf-8') # Neither the data nor the metadata are being read yet. data_reading = False metadata_reading = False # Create the dictionaries to be populated. data_dict = {} metadata_dict = {} # Create the list to be filled with lists for each line data_lines = [] for line in f_open: # This string incidates the start of the metadata. if "<MetaDataAtStart>" in line: metadata_reading = True # This string indicates the end of the metadata. if "</MetaDataAtStart>" in line: metadata_reading = False # This string indicates the start of the data. if " &END" in line: data_reading = True # Set counter to minus two, such that when is # reaches the data it is 0. count = -2 # When the metadata section is being read populate the metadata_dict if metadata_reading: if "=" in line: metadata_in_line = [] for i in line.split("=")[1:]: try: j = float(i) except ValueError: j = i metadata_in_line.append(j) metadata_dict[line.split("=")[0]] = metadata_in_line # When the data section is being read, make the list of the zeroth line # the titles and everything after is the data_lines list of lists. if data_reading: count += 1 if count == 0: titles = line.split() if count > 0: data_lines.append(line.split()) f_open.close() # Sort the data_lines list of lists to transpore and make into a dict where # the keys are the titles. for j, _ in enumerate(data_lines[0]): list_to_add = [] for i, _ in enumerate(data_lines): try: list_to_add.append(float(data_lines[i][j])) except ValueError: list_to_add.append(data_lines[i][j]) count = 0 if j >= len(titles): data_dict[str(count)] = list_to_add count += 1 else: data_dict[titles[j]] = list_to_add return metadata_dict, pd.DataFrame(data_dict)
[docs]def load_images_from_h5(h5_file_path, transpose=False): """ Loads images from a .h5 file. Args: h5_file_path: Path to the h5 file from which we're loading images. transpose: Should we take the transpose of these images? Defaults to True. """ internal_data_path = 'data' images = [] debug.log("Loading images from file " + h5_file_path, unimportance=0) with h5py.File(h5_file_path, "r") as file_handle: dataset = file_handle[internal_data_path][()] num_images = dataset.shape[0] # Prepare to show a progress bar for image loading. debug.log(f"Loading {num_images} images.", unimportance=2) for i in range(num_images): debug.log("Currently loaded " + str(i+1) + " images.", end="\r") images.append(Image(dataset[i], transpose=transpose)) # This line is necessary to prevent overwriting due to end="\r". debug.log("") debug.log(f"Loaded all {num_images} images.", unimportance=2) return images
[docs]def i07_nxs_parser(file_path: str): """ Parses a .nxs file acquired from the I07 beamline at diamond, returning an instance of Scan2D. This process involves loading the images contained in the .h5 file pointed at by the .nxs file, as well as retrieving the metadata from the .nxs file that is relevant for XRR reduction. Args: file_path: Path to the .nxs file. Returns: An initialized Scan2D object containing all loaded detector frames, as well as the relevant metadata from the .nxs file. """ # Use the magical parser class that does everything for us. i07_nxs = I07Nexus(file_path) # Load the images, taking a transpose if necessary (because which axis is # x and which is why is determined by fast vs slow detector axes in memory). if i07_nxs.detector_name in [ I07Nexus.excalibur_detector_2021, I07Nexus.excalibur_04_2022]: images = load_images_from_h5(i07_nxs.local_data_path, transpose=True) # The dependent variable. rough_intensity = i07_nxs.default_signal rough_intensity_e = np.sqrt(rough_intensity) # The independent variable. axis = i07_nxs.default_axis # We have to load the Data according to what our independent variable is. if i07_nxs.default_axis_type == 'q': data = Data(rough_intensity, rough_intensity_e, i07_nxs.probe_energy, q_vectors=axis) elif i07_nxs.default_axis_type == 'th': data = Data(rough_intensity, rough_intensity_e, i07_nxs.probe_energy, theta=axis) elif i07_nxs.default_axis_type == 'tth': data = Data(rough_intensity, rough_intensity_e, i07_nxs.probe_energy, theta=axis/2) else: raise NotImplementedError( f"{i07_nxs.default_axis_type} is not a supported axis type.") # Returns the Scan2D object return Scan2D(data, i07_nxs, images)
def _try_to_find_files(filenames: List[str], additional_search_paths: List[str]): """ Check that data files exist if the file parsed by parser pointed to a separate file containing intensity information. If the intensity data file could not be found in its original location, check a series of probable locations for the data file. If the data file is found in one of these locations, update file's entry in Returns: :py:attr:`list` of :py:attr:`str`: List of the corrected, actual paths to the files. """ found_files = [] # If we had only one file, make a list out of it. if not hasattr(filenames, "__iter__"): filenames = [filenames] cwd = os.getcwd() start_dirs = [ cwd, # maybe file is stored near the current working dir # To search additional directories, add them in here manually. ] start_dirs.extend(additional_search_paths) local_start_directories = [x.replace('\\', '/') for x in start_dirs] num_start_directories = len(local_start_directories) # Now extend the additional search paths. for i in range(num_start_directories): search_path = local_start_directories[i] split_srch_path = search_path.split('/') for j in range(len(split_srch_path)): extra_path_list = split_srch_path[:-(j+1)] extra_path = '/'.join(extra_path_list) local_start_directories.append(extra_path) # This line allows for a loading bar to show as we check the file. for i, _ in enumerate(filenames): # Better to be safe... Note: windows is happy with / even though it # defaults to \ filenames[i] = str(filenames[i]).replace('\\', '/') # Maybe we can see the file in its original storage location? if os.path.isfile(filenames[i]): found_files.append(filenames[i]) continue # If not, maybe it's stored locally? If the file was stored at # location /a1/a2/.../aN/file originally, for a local directory LD, # check locations LD/aj/aj+1/.../aN for all j<N and all LD's of # interest. This algorithm is a generalization of Andrew McCluskey's # original approach. # now generate a list of all directories that we'd like to check candidate_paths = [] split_file_path = str(filenames[i]).split('/') for j in range(len(split_file_path)): local_guess = '/'.join(split_file_path[j:]) for start_dir in local_start_directories: candidate_paths.append( os.path.join(start_dir, local_guess)) # Iterate over each of the candidate paths to see if any of them contain # the data file we're looking for. found_file = False for candidate_path in candidate_paths: if os.path.isfile(candidate_path): # File found - add the correct file location to found_files found_files.append(candidate_path) found_file = not found_file debug.log("Data file found at " + candidate_path + ".") break # If we didn't find the file, tell the user. if not found_file: raise FileNotFoundError( "The data file with the name " + filenames[i] + " could " "not be found. The following paths were searched:\n" + "\n".join(candidate_paths) ) return found_files