Source code for snowprofile.io._caaml6_xml_write

# -*- coding: utf-8 -*-

import logging
import datetime
import xml.etree.ElementTree as ET
import sys

import numpy as np

# Note to developpers:
# XML elements are prefixed by e_
# Snowprofile data is prefixes by s_

table_versions_uri = {
    '6.0.6': 'http://caaml.org/Schemas/SnowProfileIACS/v6.0.6',
    '6.0.5': 'http://caaml.org/Schemas/SnowProfileIACS/v6.0.4'}

uri_gml = 'http://www.opengis.net/gml'


[docs] def write_caaml6_xml(snowprofile, filename, version='6.0.5', indent=False): """ Write a SnowProfile object into a CAAML 6 XML-based IACS Snow Profile document. Currently supported versions: - 6.0.6 - 6.0.5 (default) :param snowprofile: A SnowProfile object to dump to a CAAML file :type snowprofile: SnowProfile :param filename: The filename to write into. If already exists, will be overwritten. :type filename: str :param indent: Visually indent the output (default: False, provide the more compact outut available) :type indent: bool or string (spaces for indentation) """ if version not in table_versions_uri: raise ValueError(f'Unsupported CAAML version {version}.') uri = table_versions_uri[version] ns = '{' + uri + '}' ns_gml = '{' + uri_gml + '}' # Namespaces ET.register_namespace('caaml', uri) ET.register_namespace('gml', uri_gml) # Id management id_list = [] def _gen_id(id, default=None): if id is None and default is not None: return _gen_id(default) elif id is None and default is None: return _gen_id('id') else: i = 1 id_test = id while id_test in id_list: id_test = f'{id}{i}' i += 1 id = id_test id_list.append(id) return id config = {'_gen_id': _gen_id, 'ns': ns, 'ns_gml': ns_gml, 'profile_depth': snowprofile.profile_depth if snowprofile.profile_depth is not None else 0, 'profile_swe': snowprofile.profile_swe, 'version': version} if snowprofile.profile_depth is None: logging.warning('Profile depth not set. Ensure this is expected !') # Main XML element root = ET.Element(f'{ns}SnowProfile', attrib={f'{ns_gml}id': _gen_id(snowprofile.id, 'snowprofile')}) # - Metadata (optional) if snowprofile.comment is not None: _ = ET.SubElement(root, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = snowprofile.comment # - timeRef time = ET.SubElement(root, f'{ns}timeRef') if snowprofile.time.comment is not None: _ = ET.SubElement(time, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = snowprofile.time.comment record_time = ET.SubElement(time, f'{ns}recordTime') if snowprofile.time.record_period[0] is not None and snowprofile.time.record_period[1] is not None: _ = ET.SubElement(record_time, f'{ns}TimePeriod') begin = ET.SubElement(_, f'{ns}beginPosition') begin.text = snowprofile.time.record_period[0].isoformat() end = ET.SubElement(_, f'{ns}endPosition') end.text = snowprofile.time.record_period[1].isoformat() elif snowprofile.time.record_time is not None: _ = ET.SubElement(record_time, f'{ns}TimeInstant') begin = ET.SubElement(_, f'{ns}timePosition') begin.text = snowprofile.time.record_time.isoformat() else: logging.error('Could not find a valid record time or time period. Use current time') _ = ET.SubElement(record_time, f'{ns}TimeInstant') begin = ET.SubElement(_, f'{ns}timePosition') begin.text = datetime.datetime.now().isoformat() if snowprofile.time.report_time is not None: _ = ET.SubElement(time, f'{ns}dateTimeReport') _.text = snowprofile.time.report_time.isoformat() if snowprofile.time.last_edition_time is not None: _ = ET.SubElement(time, f'{ns}dateTimeLastEdit') _.text = snowprofile.time.last_edition_time.isoformat() _append_additional_data(time, snowprofile.time.additional_data, ns=ns) # - srcRef src = ET.SubElement(root, f'{ns}srcRef') if snowprofile.observer.source_name is None: src = ET.SubElement(src, f'{ns}Person', attrib={f'{ns_gml}id': _gen_id(snowprofile.observer.contact_persons[0].id, 'person')}) if len(snowprofile.observer.contact_persons) > 1: logging.error('Observer: if you provide more than one contact person you need to provide a source name. ' 'Only the first contact person will be used.') if snowprofile.observer.contact_persons[0].comment is not None: _ = ET.SubElement(src, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = snowprofile.observer.contact_persons[0].comment _ = ET.SubElement(src, f'{ns}name') if snowprofile.observer.contact_persons[0].name is not None: _.text = snowprofile.observer.contact_persons[0].name _append_additional_data(src, snowprofile.observer.contact_persons[0].additional_data, ns=ns) else: op = ET.SubElement(src, f'{ns}Operation', attrib={f'{ns_gml}id': _gen_id(snowprofile.observer.source_id, 'operation')}) if snowprofile.observer.source_comment is not None: _ = ET.SubElement(op, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = snowprofile.observer.source_comment _ = ET.SubElement(op, f'{ns}name') _.text = snowprofile.observer.source_name for person in snowprofile.observer.contact_persons: p = ET.SubElement(op, f'{ns}contactPerson', attrib={f'{ns_gml}id': _gen_id(person.id, 'person')}) if person.comment is not None: _ = ET.SubElement(p, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = person.comment name = ET.SubElement(p, f'{ns}name') # Compulosry element (but no content is fine) if person.name is not None: name.text = person.name _append_additional_data(p, person.additional_data, ns=ns) _append_additional_data(op, snowprofile.observer.source_additional_data, ns=ns) # locRef src = ET.SubElement(root, f'{ns}locRef', attrib={f'{ns_gml}id': _gen_id(snowprofile.location.id, 'location')}) loc = snowprofile.location if loc.comment is not None: _ = ET.SubElement(src, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = loc.comment name = ET.SubElement(src, f'{ns}name') name.text = loc.name _ = ET.SubElement(src, f'{ns}obsPointSubType') if loc.point_type is not None: _.text = loc.point_type if loc.elevation is not None: _ = ET.SubElement(src, f'{ns}validElevation') _ = ET.SubElement(_, f'{ns}ElevationPosition', attrib={'uom': 'm'}) _ = ET.SubElement(_, f'{ns}position') _.text = str(int(loc.elevation)) if loc.aspect is not None: _ = ET.SubElement(src, f'{ns}validAspect') _ = ET.SubElement(_, f'{ns}AspectPosition') _ = ET.SubElement(_, f'{ns}position') _.text = str(int(loc.aspect)) if loc.slope is not None: _ = ET.SubElement(src, f'{ns}validSlopeAngle') _ = ET.SubElement(_, f'{ns}SlopeAnglePosition', attrib={'uom': 'deg'}) _ = ET.SubElement(_, f'{ns}position') _.text = str(int(loc.slope)) if loc.latitude is not None and loc.longitude is not None: _ = ET.SubElement(src, f'{ns}pointLocation') _ = ET.SubElement(_, f'{ns_gml}Point', attrib={f'{ns_gml}id': _gen_id('pointID'), 'srsName': "urn:ogc:def:crs:OGC:1.3:CRS84", 'srsDimension': "2"}) _ = ET.SubElement(_, f'{ns_gml}pos') _.text = f'{loc.longitude} {loc.latitude}' if loc.country is not None: _ = ET.SubElement(src, f'{ns}country') _.text = loc.country if loc.region is not None: _ = ET.SubElement(src, f'{ns}region') _.text = loc.region if version >= "6.0.6": env = snowprofile.environment # Solar Mask sm = snowprofile.environment.solar_mask if sm is not None: e_sm = ET.SubElement(src, f'{ns}solarMask') e_smm = ET.SubElement(e_sm, f'{ns}solarMaskMetaData') if env.solar_mask_comment is not None: _ = ET.SubElement(e_smm, f'{ns}comment') _.text = env.solar_mask_comment _ = ET.SubElement(e_smm, f'{ns}methodOfMeas') if env.solar_mask_method_of_measurement is None: _.text = 'other' else: _.text = env.solar_mask_method_of_measurement if env.solar_mask_uncertainty: _ = ET.SubElement(e_smm, f'{ns}uncertaintyOfMeas') _.text = "{:.12g}".format(env.solar_mask_uncertainty) if env.solar_mask_quality: _ = ET.SubElement(e_smm, f'{ns}qualityOfMeas') _.text = env.solar_mask_quality for _, dataline in sm.data.iterrows(): e_ = ET.SubElement(e_sm, f'{ns}Data', attrib={'uom': 'deg'}) _ = ET.SubElement(e_, f'{ns}azimuth') _.text = str(int(dataline.azimuth)) _ = ET.SubElement(e_, f'{ns}elevation') _.text = "{:.12g}".format(dataline.elevation) _append_additional_data(e_sm, snowprofile.environment.solar_mask_additional_data) # obsPointEnvironment e_ope = ET.SubElement(src, f'{ns}obsPointEnvironment') if env.bed_surface is not None: _ = ET.SubElement(e_ope, f'{ns}bedSurface') _.text = env.bed_surface if env.bed_surface_comment is not None: _ = ET.SubElement(e_ope, f'{ns}bedSurfaceComment') _.text = env.bed_surface_comment if env.litter_thickness is not None: _ = ET.SubElement(e_ope, f'{ns}litterThickness', attrib={'uom': 'm'}) _.text = "{:.12g}".format(env.litter_thickness) if env.ice_thickness is not None: _ = ET.SubElement(e_ope, f'{ns}iceThickness', attrib={'uom': 'm'}) _.text = "{:.12g}".format(env.ice_thickness) if env.low_vegetation_height is not None: _ = ET.SubElement(e_ope, f'{ns}lowVegetationHeight', attrib={'uom': 'm'}) _.text = "{:.12g}".format(env.low_vegetation_height) if env.LAI is not None: _ = ET.SubElement(e_ope, f'{ns}lai', attrib={'uom': '1'}) _.text = "{:.12g}".format(env.LAI) if env.forest_presence is not None: _ = ET.SubElement(e_ope, f'{ns}forestPresence') _.text = env.forest_presence if env.forest_presence_comment is not None: _ = ET.SubElement(e_ope, f'{ns}forestComment') _.text = env.forest_presence_comment if env.sky_view_factor is not None: _ = ET.SubElement(e_ope, f'{ns}skyViewFactor', attrib={'uom': '1'}) _.text = "{:.12g}".format(env.sky_view_factor) if env.tree_height is not None: _ = ET.SubElement(e_ope, f'{ns}treeHeight', attrib={'uom': 'm'}) _.text = "{:.12g}".format(env.tree_height) _append_additional_data(src, loc.additional_data, ns=ns) # snowProfileResultsOf e_r = ET.SubElement(root, f'{ns}snowProfileResultsOf') e_r = ET.SubElement(e_r, f'{ns}SnowProfileMeasurements', attrib={'dir': 'top down'}) if snowprofile.profile_comment is not None: _ = ET.SubElement(e_r, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = snowprofile.profile_comment # profileDepth seem to be designed to be the observed depth rather than the total depth # if snowprofile.profile_depth is not None: # _ = ET.SubElement(r, f'{ns}profileDepth', attrib={'uom': 'cm'}) # _.text = str(float(snowprofile.profile_depth) * 100) # - Weather e_weather = ET.SubElement(e_r, f'{ns}weatherCond') s_weather = snowprofile.weather e_weather_metadata = ET.SubElement(e_weather, f'{ns}metaData') e_weather_comment = ET.SubElement(e_weather_metadata, f'{ns}comment') comment = '' if s_weather.air_temperature_measurement_height is not None and version >= "6.0.6": _ = ET.SubElement(e_weather_metadata, f'{ns}airTempMeasurementHeight', attrib={'uom': 'm'}) _.text = "{:.10g}".format(s_weather.air_temperature_measurement_height) elif s_weather.air_temperature_measurement_height is not None: comment += f'Height of the temperature measurement: {s_weather.air_temperature_measurement_height}m\n' if s_weather.wind_measurement_height is not None and version >= "6.0.6": _ = ET.SubElement(e_weather_metadata, f'{ns}windMeasurementHeight', attrib={'uom': 'm'}) _.text = "{:.10g}".format(s_weather.wind_measurement_height) elif s_weather.wind_measurement_height is not None: comment += f'Height of the wind measurement: {s_weather.wind_measurement_height}m\n' if s_weather.comment is not None or len(comment) > 1: if s_weather.comment is not None and len(comment) == 0: comment = s_weather.comment else: comment = s_weather.comment + "\n\n" + comment e_weather_comment.text = comment if s_weather.cloudiness is not None: _ = ET.SubElement(e_weather, f'{ns}skyCond') _.text = s_weather.cloudiness if s_weather.precipitation is not None: _ = ET.SubElement(e_weather, f'{ns}precipTI') _.text = s_weather.precipitation if s_weather.air_temperature is not None: _ = ET.SubElement(e_weather, f'{ns}airTempPres', attrib={'uom': 'degC'}) _.text = "{:.10g}".format(s_weather.air_temperature) if s_weather.air_humidity is not None and version >= "6.0.6": _ = ET.SubElement(e_weather, f'{ns}airHumPres') _.text = "{:.10g}".format(s_weather.air_humidity) if s_weather.wind_speed is not None: _ = ET.SubElement(e_weather, f'{ns}windSpd', attrib={'uom': 'ms-1'}) _.text = "{:.10g}".format(s_weather.wind_speed) if s_weather.wind_direction is not None: _ = ET.SubElement(e_weather, f'{ns}windDir') _ = ET.SubElement(_, f'{ns}AspectPosition') _ = ET.SubElement(_, f'{ns}position') _.text = str(int(s_weather.wind_direction)) _append_additional_data(e_weather, s_weather.additional_data, ns=ns) # - Snowpack e_snowpack = ET.SubElement(e_r, f'{ns}snowPackCond') if snowprofile.profile_depth is not None or snowprofile.profile_swe is not None: hs = ET.SubElement(e_snowpack, f'{ns}hS') hsc = ET.SubElement(hs, f'{ns}Components') if snowprofile.profile_depth is not None: _ = ET.SubElement(hsc, f'{ns}height', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(snowprofile.profile_depth * 100) if snowprofile.profile_swe is not None: _ = ET.SubElement(hsc, f'{ns}waterEquivalent', attrib={'uom': 'kgm-2'}) _.text = "{:.12g}".format(snowprofile.profile_swe) if (snowprofile.profile_depth_std is not None or snowprofile.profile_swe_std is not None): if version >= "6.0.6": hs = ET.SubElement(e_snowpack, f'{ns}hSVariability') hsc = ET.SubElement(hs, f'{ns}Components') if snowprofile.profile_depth_std is not None: _ = ET.SubElement(hsc, f'{ns}height', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(snowprofile.profile_depth_std * 100) if snowprofile.profile_swe_std is not None: _ = ET.SubElement(hsc, f'{ns}waterEquivalent', attrib={'uom': 'kgm-2'}) _.text = "{:.12g}".format(snowprofile.profile_swe_std) else: logging.warning('Caaml 6 < 6.0.6 does not support profile_depth_std and profile_swe_std.') if snowprofile.new_snow_24_depth is not None or snowprofile.new_snow_24_swe is not None: hs = ET.SubElement(e_snowpack, f'{ns}hN24') hsc = ET.SubElement(hs, f'{ns}Components') if snowprofile.new_snow_24_depth is not None: _ = ET.SubElement(hsc, f'{ns}height', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(snowprofile.new_snow_24_depth * 100) if snowprofile.new_snow_24_swe is not None: _ = ET.SubElement(hsc, f'{ns}waterEquivalent', attrib={'uom': 'kgm-2'}) _.text = "{:.12g}".format(snowprofile.new_snow_24_swe) if (snowprofile.new_snow_24_depth_std is not None or snowprofile.new_snow_24_swe_std is not None): hs = ET.SubElement(e_snowpack, f'{ns}hIN', attrib={'dateTimeCleared': snowprofile.time.record_time.isoformat(timespec='seconds')}) hsc = ET.SubElement(hs, f'{ns}Components') if snowprofile.new_snow_24_depth_std is not None: _ = ET.SubElement(hsc, f'{ns}height', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(snowprofile.new_snow_24_depth_std * 100) if snowprofile.new_snow_24_swe_std is not None: _ = ET.SubElement(hsc, f'{ns}waterEquivalent', attrib={'uom': 'kgm-2'}) _.text = "{:.12g}".format(snowprofile.new_snow_24_swe_std) if snowprofile.snow_transport is not None: if version >= "6.0.6": _ = ET.SubElement(e_snowpack, f'{ns}snowTransport') _.text = snowprofile.snow_transport else: logging.warning('Caaml 6 < 6.0.6 does not support snow transport data.') if snowprofile.snow_transport_occurence_24 is not None: if version >= "6.0.6": _ = ET.SubElement(e_snowpack, f'{ns}snowTransportOccurrence24') _.text = "{:.12g}".format(snowprofile.snow_transport_occurence_24) else: logging.warning('Caaml 6 < 6.0.6 does not support snow transport data.') # - Surface characterization e_surf = ET.SubElement(e_r, f'{ns}surfCond') s_surf = snowprofile.surface_conditions comment = '' _ = ET.SubElement(e_surf, f'{ns}metaData') e_surf_comment = ET.SubElement(_, f'{ns}comment') if not (s_surf.surface_roughness is None and s_surf.surface_wind_features is None and s_surf.surface_melt_rain_features is None and s_surf.surface_features_amplitude is None and s_surf.surface_features_amplitude_min is None and s_surf.surface_features_amplitude_max is None and s_surf.surface_features_wavelength is None and s_surf.surface_features_wavelength_min is None and s_surf.surface_features_wavelength_max is None and s_surf.surface_features_aspect is None and s_surf.surface_temperature is None and s_surf.surface_albedo is None and s_surf.spectral_albedo is None): _ = ET.SubElement(e_surf, f'{ns}surfFeatures') e_surff = ET.SubElement(_, f'{ns}Components') _ = ET.SubElement(e_surff, f'{ns}surfRoughness') if s_surf.surface_roughness is not None: _.text = s_surf.surface_roughness else: _.text = 'unknown' if s_surf.surface_wind_features is not None: if version >= "6.0.6": _ = ET.SubElement(e_surff, f'{ns}surfWindFeatures') _.text = s_surf.surface_wind_features else: comment += f'Wind surface features: {s_surf.surface_wind_features}\n' if s_surf.surface_melt_rain_features is not None: if version >= "6.0.6": _ = ET.SubElement(e_surff, f'{ns}surfMeltRainFeatures') _.text = s_surf.surface_melt_rain_features else: comment += f'Melt and rain surface features: {s_surf.surface_melt_rain_features}\n' if s_surf.surface_features_amplitude is not None: if s_surf.surface_features_amplitude_min is not None or s_surf.surface_features_amplitude_max is not None: logging.warning('CAAML6 could not store both surface_feature amplitude and min/max of amplitude.') _ = ET.SubElement(e_surff, f'{ns}validAmplitude') _ = ET.SubElement(_, f'{ns}AmplitudePosition', attrib={'uom': 'cm'}) _ = ET.SubElement(_, f'{ns}position') _.text = "{:.12g}".format(s_surf.surface_features_amplitude * 100) elif s_surf.surface_features_amplitude_min is not None and s_surf.surface_features_amplitude_max is not None: _ = ET.SubElement(e_surff, f'{ns}validAmplitude') _r = ET.SubElement(_, f'{ns}AmplitudeRange', attrib={'uom': 'cm'}) _ = ET.SubElement(_r, f'{ns}beginPosition') _.text = "{:.12g}".format(s_surf.surface_features_amplitude_min * 100) _ = ET.SubElement(_r, f'{ns}endPosition') _.text = "{:.12g}".format(s_surf.surface_features_amplitude_max * 100) if s_surf.surface_features_wavelength is not None: if s_surf.surface_features_wavelength_min is not None or s_surf.surface_features_wavelength_max is not None: logging.warning('CAAML6 could not store both surface_feature wavelength and min/max of wavelength.') _ = ET.SubElement(e_surff, f'{ns}validWavelength') _ = ET.SubElement(_, f'{ns}WavelengthPosition', attrib={'uom': 'm'}) _ = ET.SubElement(_, f'{ns}position') _.text = "{:.12g}".format(s_surf.surface_features_wavelength) elif s_surf.surface_features_wavelength_min is not None and s_surf.surface_features_wavelength_max is not None: _ = ET.SubElement(e_surff, f'{ns}validWavelength') _r = ET.SubElement(_, f'{ns}WavelengthRange', attrib={'uom': 'm'}) _ = ET.SubElement(_r, f'{ns}beginPosition') _.text = "{:.12g}".format(s_surf.surface_features_wavelength_min) _ = ET.SubElement(_r, f'{ns}endPosition') _.text = "{:.12g}".format(s_surf.surface_features_wavelength_max) if s_surf.surface_features_aspect is not None: _ = ET.SubElement(e_surff, f'{ns}validAspect') _ = ET.SubElement(_, f'{ns}AspectPosition') _ = ET.SubElement(_, f'{ns}position') _.text = str(int(s_surf.surface_features_aspect)) if version >= "6.0.6": _ = ET.SubElement(e_surff, f'{ns}lapPresence') if s_surf.lap_presence is None: _.text = 'unknown' else: _.text = s_surf.lap_presence if s_surf.surface_temperature is not None: _surftemp = ET.SubElement(e_surff, f'{ns}surfTemp') if s_surf.surface_temperature_measurement_method is not None: _ = ET.SubElement(_surftemp, f'{ns}methodOfMeas') _.text = s_surf.surface_temperature_measurement_method _ = ET.SubElement(_surftemp, f'{ns}data', attrib={'uom': 'degC'}) _.text = str(s_surf.surface_temperature) else: if s_surf.lap_presence is not None: comment += f'LAP presence: {s_surf.lap_presence}\n' if s_surf.surface_temperature is not None: comment += f'Surface temperature: {s_surf.surface_temperature}\n' if s_surf.surface_temperature_measurement_method is not None: comment += f'Surface temperature measurement method: {s_surf.surface_temperature_measurement_method}\n' if (s_surf.surface_albedo is not None or s_surf.spectral_albedo is not None) and version >= '6.0.6': e_albedo = ET.SubElement(e_surff, f'{ns}surfAlbedo') if s_surf.surface_albedo is not None: e_albedo_broadband = ET.SubElement(e_albedo, f'{ns}albedo') _ = ET.SubElement(e_albedo_broadband, f'{ns}albedoMeasurement') _.text = "{:.12g}".format(s_surf.surface_albedo) if s_surf.surface_albedo_comment is not None: _ = ET.SubElement(e_albedo_broadband, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = s_surf.surface_albedo_comment if s_surf.spectral_albedo is not None: e_albedo_spectral = ET.SubElement(e_albedo, f'{ns}spectralAlbedo') logging.warning('Spectral albedo not yet implemented for CAAML6 output') for _, dataline in s_surf.spectral_albedo.data.iterrows(): e_sam = ET.SubElement(e_albedo_spectral, f'{ns}spectralAlbedoMeasurement') _ = ET.SubElement(e_sam, f'{ns}minWaveLength', attrib={'uom': 'nm'}) _.text = "{:.12g}".format(dataline.min_wavelength) _ = ET.SubElement(e_sam, f'{ns}maxWaveLength', attrib={'uom': 'nm'}) _.text = "{:.12g}".format(dataline.max_wavelength) attrib = {} if 'uncertainty' in dataline and not np.isnan(dataline.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(dataline.uncertainty) if 'quality' in dataline and dataline.quality is not None and version >= '6.0.6': attrib['quality'] = dataline.quality _ = ET.SubElement(e_sam, f'{ns}albedo', attrib=attrib) _.text = "{:.12g}".format(dataline.albedo) if s_surf.spectral_albedo.comment is not None: _ = ET.SubElement(e_albedo_spectral, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = s_surf.spectral_albedo.comment if s_surf.comment is not None or len(comment) > 0: if s_surf.comment is not None and len(comment) == 0: comment = s_surf.comment elif s_surf.comment is not None: comment = s_surf.comment + "\n\n" + comment e_surf_comment.text = comment if s_surf.penetration_ram is not None: _ = ET.SubElement(e_surf, f'{ns}penetrationRam', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(s_surf.penetration_ram * 100) if s_surf.penetration_foot is not None: _ = ET.SubElement(e_surf, f'{ns}penetrationFoot', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(s_surf.penetration_foot * 100) if s_surf.penetration_ski is not None: _ = ET.SubElement(e_surf, f'{ns}penetrationSki', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(s_surf.penetration_ski * 100) _append_additional_data(e_surf, s_surf.additional_data) # - Profiles if snowprofile.stratigraphy_profile is not None and len(snowprofile.stratigraphy_profile.data) > 0: _insert_stratigrpahy_profile(e_r, snowprofile.stratigraphy_profile, config=config) if version >= "6.0.6": for profile in snowprofile.temperature_profiles: _insert_temperature_profile(e_r, profile, config=config) else: if len(snowprofile.temperature_profiles) > 1: logging.error(f'Only one temperature profile acepted in CAAML v{version}.') if len(snowprofile.temperature_profiles) > 0: _insert_temperature_profile(e_r, snowprofile.temperature_profiles[0], config=config) for profile in snowprofile.density_profiles: if len(profile.data) > 0: _insert_density_profile(e_r, profile, config=config) for profile in snowprofile.lwc_profiles: if len(profile.data) > 0: _insert_lwc_profile(e_r, profile, config=config) for profile in snowprofile.ssa_profiles: if len(profile.data) > 0: _insert_ssa_profile(e_r, profile, config=config) for profile in snowprofile.hardness_profiles: if len(profile.data) > 0: _insert_hardness_profile(e_r, profile, config=config) for profile in snowprofile.strength_profiles: if len(profile.data) > 0: _insert_strength_profile(e_r, profile, config=config) for profile in snowprofile.impurity_profiles: if len(profile.data) > 0: _insert_impurity_profile(e_r, profile, config=config) if len(snowprofile.stability_tests) > 0: e_stb = ET.SubElement(e_r, f'{ns}stbTests') for stbt in snowprofile.stability_tests: _insert_stb_test(e_stb, stbt, config=config) for profile in snowprofile.other_scalar_profiles: if len(profile.data) > 0: _insert_otherscalar_profile(e_r, profile, config=config) for profile in snowprofile.other_vectorial_profiles: if len(profile.data) > 0: _insert_othervectorial_profile(e_r, profile, config=config) # - Additional data _append_additional_data(e_r, snowprofile.profile_additional_data) # application and application_version (optional) if snowprofile.application is not None: _ = ET.SubElement(root, f'{ns}application') _.text = snowprofile.application if snowprofile.application_version is not None: _ = ET.SubElement(root, f'{ns}applicationVersion') _.text = snowprofile.application_version _append_additional_data(root, snowprofile.additional_data, ns=ns) # # Generate Tree from mail element and write # tree = ET.ElementTree(root) if indent: if sys.version_info.major == 3 and sys.version_info.minor >= 9: if isinstance(indent, bool) and indent is True: indent = ' ' ET.indent(tree, space=indent) else: logging.error('CAAML6_XML write: Indentation is not available with python < 8.9. Will use indent=False.') tree.write(filename, encoding='utf-8', xml_declaration=True)
def _append_additional_data(element, data, ns=''): if data is None or data.data is None: return None # TODO: tbd <24-02-25, Léo Viallon-Galinier> # def _gen_common_attrib(s, config={}): attrib = {} if s.id is not None and config['version'] >= '6.0.6': attrib['id'] = config['_gen_id'](s.id) if len(s.related_profiles) > 0 and config['version'] >= '6.0.6': attrib['relatedProfiles'] = ' '.join(s.related_profiles) if s.name is not None and config['version'] >= '6.0.6': attrib['name'] = s.name return attrib def _gen_common_metadata(e, s, config={}, additional_metadata = [], name='metaData'): """ Metadata handler common to all profiles. """ ns = config['ns'] comment = '' e_md = ET.SubElement(e, f'{ns}{name}') e_comment = ET.SubElement(e_md, f'{ns}comment') if config['version'] >= "6.0.6": if s.record_period is not None and s.record_period[0] is not None and s.record_period[1] is not None: e_record_time = ET.SubElement(e_md, f'{ns}recordTime') _ = ET.SubElement(e_record_time, f'{ns}TimePeriod') begin = ET.SubElement(_, f'{ns}beginPosition') begin.text = s.record_period[0].isoformat() end = ET.SubElement(_, f'{ns}endPosition') end.text = s.record_period[1].isoformat() elif s.record_time is not None: e_record_time = ET.SubElement(e_md, f'{ns}recordTime') _ = ET.SubElement(e_record_time, f'{ns}TimeInstant') _ = ET.SubElement(_, f'{ns}timePosition') _.text = s.record_time.isoformat() else: if s.record_time is not None: comment += f"Record time: {s.record_time.isoformat()}\n" if s.record_period is not None and s.record_period[0] is not None and s.record_period[1] is not None: comment += f"Record period: {s.record_period[0].isoformat()}-{s.record_period[1].isoformat()}\n" e_hs = None if s.profile_depth is not None and s.profile_depth != config['profile_depth']: if config['version'] >= "6.0.6": e_hs = ET.SubElement(e_md, f"{ns}hS") e_hs = ET.SubElement(e_hs, f"{ns}Components") _ = ET.SubElement(e_hs, f"{ns}height", attrib={'uom': 'cm'}) _.text = str(s.profile_depth * 100) else: comment += f"Profile depth: {s.profile_depth}m\n" if s.profile_swe is not None and s.profile_swe != config['profile_swe']: if config['version'] >= "6.0.6": if e_hs is None: e_hs = ET.SubElement(e_md, f"{ns}hS") e_hs = ET.SubElement(e_hs, f"{ns}Components") _ = ET.SubElement(e_hs, f"{ns}waterEquivalent", attrib={'uom': 'kgm-2'}) _.text = str(s.profile_swe) else: comment += f"Profile SWE: {s.profile_swe}m\n" for elem in additional_metadata: value = elem['value'] key = elem['key'] # None values if value is None: if 'default_value' in elem and elem['default_value'] is not None: value = elem['default_value'] else: continue # Check version if 'min_version' in elem and elem['min_version'] > config['version']: if 'comment_title' in elem: comment += f'{elem["comment_title"]}: {value}\n' continue # Get the value and pre-process to get a string if 'values' in elem and elem['values'] is not None and value not in elem['values']: if 'default_value' in elem: value = elem['default_value'] else: # Invalid value and no default value logging.error(f'Value {value} not accepted in CAAML format for key {key}. ' 'May generate an invalid CAAML file.') continue if 'factor' in elem: value = value * elem['factor'] if isinstance(value, float): value = "{:.12g}".format(value) elif not isinstance(value, str): value = str(value) # Write the metadata _ = ET.SubElement(e_md, f'{ns}{key}', attrib=elem['attrib'] if 'attrib' in elem and elem['attrib'] is not None else {}) _.text = value if s.comment is not None or len(comment) > 0: if s.comment is not None and len(comment) == 0: comment = s.comment elif s.comment is not None: comment = s.comment + "\n\n" + comment e_comment.text = comment return e_md def _insert_stratigrpahy_profile(e_r, s_strat, config): if s_strat is None: return ns = config['ns'] profile_depth = s_strat.profile_depth if s_strat.profile_depth is not None else config['profile_depth'] e_s = ET.SubElement(e_r, f'{ns}stratProfile', attrib=_gen_common_attrib(s_strat, config=config)) e_md = _gen_common_metadata(e_s, s_strat, config=config, name='stratMetaData') # Layer loop for _, layer in s_strat.data.iterrows(): e_layer = ET.SubElement(e_s, f'{ns}Layer') if ('comment' in layer and layer.comment is not None and len(layer.comment) > 0): _md = ET.SubElement(e_layer, f'{ns}metaData') _ = ET.SubElement(_md, f'{ns}comment') _.text = str(layer.comment) if 'additional_data' in layer and layer.additional_data is not None: if _md is None: _md = ET.SubElement(e_layer, f'{ns}metaData') _append_additional_data(_md, layer.additional_data, ns=ns) _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in statigraphy profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) if layer.grain_1 is not None: _ = ET.SubElement(e_layer, f'{ns}grainFormPrimary') _.text = layer.grain_1 if layer.grain_1 is not None and layer.grain_2 is not None: _ = ET.SubElement(e_layer, f'{ns}grainFormSecondary') _.text = layer.grain_2 if layer.grain_size is not None and not np.isnan(layer.grain_size): _ = ET.SubElement(e_layer, f'{ns}grainSize', attrib={'uom': 'mm'}) _c = ET.SubElement(_, f'{ns}Components') _ = ET.SubElement(_c, f'{ns}avg') _.text = "{:.12g}".format(layer.grain_size * 1e3) if 'grain_size_max' in layer and not np.isnan(layer.grain_size_max): _ = ET.SubElement(_c, f'{ns}avgMax') _.text = "{:.12g}".format(layer.grain_size_max * 1e3) if layer.hardness is not None: _ = ET.SubElement(e_layer, f'{ns}hardness', attrib={'uom': ''}) _.text = layer.hardness if layer.wetness is not None: _ = ET.SubElement(e_layer, f'{ns}wetness', attrib={'uom': ''}) _.text = layer.wetness if 'loc' in layer and layer.loc is not None: _ = ET.SubElement(e_layer, f'{ns}layerOfConcern') _.text = layer.loc _md = None if 'formation_time' in layer and layer.formation_time is not None: _ = ET.SubElement(e_layer, f'{ns}validFormationTime') _t = ET.SubElement(_, f'{ns}TimeInstant') _ = ET.SubElement(_t, f'{ns}timePosition') _.text = layer.formation_time.isoformat() elif ('formation_period_begin' in layer and 'formation_period_end' in layer and layer.formation_period_begin is not None and layer.formation_period_end is not None): _ = ET.SubElement(e_layer, f'{ns}validFormationTime') _t = ET.SubElement(_, f'{ns}TimePeriod') _ = ET.SubElement(_t, f'{ns}beginPosition') _.text = layer.formation_period_begin.isoformat() _ = ET.SubElement(_t, f'{ns}endPosition') _.text = layer.formation_period_end.isoformat() _append_additional_data(e_s, s_strat.additional_data, ns=ns) _density_mom = {'6.0.5': ['Snow Tube', 'Snow Cylinder', 'Snow Cutter', 'Denoth Probe', 'other']} def _insert_density_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}densityProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'densityMetaData', additional_metadata = [ {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _density_mom[version] if version in _density_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': 'kgm-3'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.probed_volume, 'key': 'probeVolume', 'factor': 1e6, 'attrib': {'uom': 'cm3'}}, {'value': s_p.probed_diameter, 'key': 'probeDiameter', 'factor': 100, 'attrib': {'uom': 'cm'}}, {'value': s_p.probed_length, 'key': 'probeLength', 'factor': 100, 'attrib': {'uom': 'cm'}}, {'value': s_p.probed_thickness, 'key': 'probedThickness', 'factor': 100, 'attrib': {'uom': 'cm'}}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in density profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': 'kgm-3'} if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}density', attrib=attrib) _.text = "{:.12g}".format(layer.density) _append_additional_data(e_p, s_p.additional_data, ns=ns) def _insert_temperature_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}tempProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'tempMetaData', additional_metadata = [ {'value': s_p.method_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Measurement method', 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': 'degC'}, 'min_version': '6.0.6', 'comment_title': 'Uncertainty of measurement (degC)'}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, ]) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Obs') _ = ET.SubElement(e_layer, f'{ns}depth', attrib={'uom': 'cm'}) if profile_depth - layer.height < 0: raise ValueError(f'Height ({layer.height}m) > profile depth ({profile_depth}m) ' 'in temperature profile)') _.text = "{:.12g}".format((profile_depth - layer.height) * 100) _ = ET.SubElement(e_layer, f'{ns}snowTemp', attrib={'uom': 'degC'}) _.text = "{:.12g}".format(layer.temperature) if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': _ = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': _ = ET.SubElement(e_layer, f'{ns}qualityOfMeas') _.text = layer.quality if version >= "6.0.6" and s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) _append_additional_data(e_p, s_p.additional_data, ns=ns) _lwc_mom = {'6.0.5': ['Denoth Probe', 'Snow Fork', 'other']} def _insert_lwc_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}lwcProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'lwcMetaData', additional_metadata = [ {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _lwc_mom[version] if version in _lwc_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': '% by Vol'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.probed_thickness, 'key': 'probedThickness', 'factor': 100, 'attrib': {'uom': 'cm'}}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in LWC profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': '% by Vol'} if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}lwc', attrib=attrib) _.text = "{:.12g}".format(layer.lwc) _append_additional_data(e_p, s_p.additional_data, ns=ns) _strength_mom = {'6.0.5': ['Shear Frame', 'other']} def _insert_strength_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}strengthProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'strengthMetaData', additional_metadata = [ {'value': s_p.strength_type, 'key': 'strengthType', 'values': ['compressive', 'tensile', 'shear']}, {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _strength_mom[version] if version in _strength_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': 'Nm-2'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.probed_area, 'key': 'probedArea', 'factor': 1e4, 'attrib': {'uom': 'cm2'}}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in strength profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': 'Nm-2'} if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}strengthValue', attrib=attrib) _.text = "{:.12g}".format(layer.strength) if 'fracture_character' in layer and layer.fracture_character is not None: _ = ET.SubElement(e_layer, f'{ns}fractureCharacter') _.text = layer.fracture_character _append_additional_data(e_p, s_p.additional_data, ns=ns) _impurity_mom = {'6.0.5': ['other']} _impurity_type = {'6.0.5': ['Black Carbon', 'Dust', 'Isotopes']} def _insert_impurity_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}impurityProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'impurityMetaData', additional_metadata = [ {'value': s_p.impurity_type, 'default_value': 'other', 'values': _impurity_type[version] if version in _impurity_type else None, 'key': 'impurity'}, {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _impurity_mom[version] if version in _impurity_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': '%'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.probed_thickness, 'key': 'probedThickness', 'factor': 100, 'attrib': {'uom': 'cm'}}, {'value': s_p.probed_volume, 'key': 'probedVolume', 'factor': 1e6, 'attrib': {'uom': 'cm3'}, 'min_version': '6.0.6', 'comment_title': 'Probe thickness (cm3)'}, {'value': s_p.probed_diameter, 'key': 'probedDiameter', 'factor': 100, 'attrib': {'uom': 'cm'}, 'min_version': '6.0.6', 'comment_title': 'Probe thickness (cm3)'}, {'value': s_p.probed_length, 'key': 'probedLength', 'factor': 100, 'attrib': {'uom': 'cm'}, 'min_version': '6.0.6', 'comment_title': 'Probe thickness (cm3)'}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): if (('mass_fraction' in layer and not np.isnan(layer.mass_fraction)) or ('volume_fraction' in layer and not np.isnan(layer.volume_fraction))): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in impurity profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': '%'} if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': attrib['quality'] = layer.quality if 'mass_fraction' in layer and not np.isnan(layer.mass_fraction): _ = ET.SubElement(e_layer, f'{ns}massFraction', attrib=attrib) _.text = "{:.12g}".format(layer.mass_fraction) else: _ = ET.SubElement(e_layer, f'{ns}volumeFraction', attrib=attrib) _.text = "{:.12g}".format(layer.volume_fraction) _append_additional_data(e_p, s_p.additional_data, ns=ns) def _insert_otherscalar_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] print('Profile depth in ScalarProfile', profile_depth) version = config['version'] if version < '6.0.6': logging.warning(f'Other scalar profile not stored in CAAML XML v{version}.') return e_p = ET.SubElement(e_r, f'{ns}otherScalarProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'otherScalarMetaData', additional_metadata = [ {'value': s_p.parameter, 'key': 'parameter'}, {'value': s_p.method_of_measurement, 'key': 'methodOfMeas'}, {'value': s_p.unit, 'key': 'uom'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas'}, {'value': s_p.quality_of_measurement, 'key': 'qualityOfMeas'}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in other scalar profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {} if 'uncertainty' in layer and not np.isnan(layer.uncertainty): attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None: attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}value', attrib=attrib) _.text = "{:.12g}".format(layer.data) _append_additional_data(e_p, s_p.additional_data, ns=ns) def _insert_othervectorial_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] if version < '6.0.6': logging.warning(f'Other vectorial profile not stored in CAAML XML v{version}.') return e_p = ET.SubElement(e_r, f'{ns}otherVectorialProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'otherVectorialMetaData', additional_metadata = [ {'value': s_p.parameter, 'key': 'parameter'}, {'value': s_p.rank, 'key': 'rank'}, {'value': s_p.method_of_measurement, 'key': 'methodOfMeas'}, {'value': s_p.unit, 'key': 'uom'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas'}, {'value': s_p.quality_of_measurement, 'key': 'qualityOfMeas'}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in vectorial profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {} if 'uncertainty' in layer and not np.isnan(layer.uncertainty): attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None: attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}value', attrib=attrib) _.text = ' '.join(["{:.12g}".format(e) for e in layer.data]) _append_additional_data(e_p, s_p.additional_data, ns=ns) _ssa_mom = {'6.0.5': ['Ice Cube', 'other']} def _insert_ssa_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}specSurfAreaProfile', attrib=_gen_common_attrib(s_p, config=config)) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'specSurfAreaMetaData', additional_metadata = [ {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _ssa_mom[version] if version in _ssa_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': 'm2kg-1'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.probed_thickness, 'key': 'probedThickness', 'factor': 100, 'attrib': {'uom': 'cm'}}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers import snowprofile.profiles if isinstance(s_p, snowprofile.profiles.SSAProfile): for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in SSA profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': 'm2kg-1'} if 'uncertainty' in layer and not np.isnan(layer.uncertainty) and version >= '6.0.6': attrib['uncertainty'] = "{:.12g}".format(layer.uncertainty) if 'quality' in layer and layer.quality is not None and version >= '6.0.6': attrib['quality'] = layer.quality _ = ET.SubElement(e_layer, f'{ns}specSurfArea', attrib=attrib) _.text = "{:.12g}".format(layer.ssa) elif isinstance(s_p, snowprofile.profiles.SSAPointProfile): e_mc = ET.SubElement(e_p, f'{ns}MeasurementComponents', attrib={ 'uomDepth': 'cm', 'uomSpecSurfArea': 'm2kg-1'}) _ = ET.SubElement(e_mc, f'{ns}depth') _.text = 'template' _ = ET.SubElement(e_mc, f'{ns}specSurfArea') _.text = 'template' e_m = ET.SubElement(e_p, f'{ns}Measurements') e_m = ET.SubElement(e_m, f'{ns}tupleList') tl = [] for _, layer in s_p.data.iterrows(): if profile_depth - layer.height < 0: raise ValueError(f'Top height ({layer.height}m) > profile depth ({profile_depth}m) ' 'in SSA profile)') _depth = (profile_depth - layer.height) * 100 tl.append(f'{_depth:.12g},{layer.ssa:.12g}') e_m.text = ' '.join(tl) _append_additional_data(e_p, s_p.additional_data, ns=ns) _hardness_mom = {'6.0.5': ['SnowMicroPen', 'Ram Sonde', 'Push-Pull Gauge', 'other']} def _insert_hardness_profile(e_r, s_p, config): if s_p is None: return ns = config['ns'] profile_depth = s_p.profile_depth if s_p.profile_depth is not None else config['profile_depth'] version = config['version'] e_p = ET.SubElement(e_r, f'{ns}hardnessProfile', attrib={ 'uomWeightHammer': 'kg', 'uomWeightTube': 'kg', 'uomDropHeight': 'cm', **_gen_common_attrib(s_p, config=config)}) e_md = _gen_common_metadata( e_p, s_p, config=config, name = 'hardnessMetaData', additional_metadata = [ {'value': s_p.method_of_measurement, 'default_value': 'other', 'values': _hardness_mom[version] if version in _hardness_mom else None, 'key': 'methodOfMeas'}, {'value': s_p.uncertainty_of_measurement, 'key': 'uncertaintyOfMeas', 'attrib': {'uom': 'N'}}, {'value': s_p.quality_of_measurement, 'min_version': '6.0.6', 'comment_title': 'Quality of measurement', 'key': 'qualityOfMeas'}, {'value': s_p.surface_of_indentation, 'key': 'surfOfIndentation', 'factor': 10000, 'attrib': {'uom': 'cm2'}}, {'value': s_p.penetration_speed, 'key': 'penetrationSpeed', 'attrib': {'uom': 'ms-1'}, 'min_version': '6.0.6', 'comment_title': 'Penetration speed'}, ]) if s_p.profile_nr is not None: _ = ET.SubElement(e_p, f'{ns}profileNr') _.text = str(s_p.profile_nr) # Loop layers import snowprofile.profiles if isinstance(s_p, snowprofile.profiles.HardnessProfile): for _, layer in s_p.data.iterrows(): e_layer = ET.SubElement(e_p, f'{ns}Layer') _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) if profile_depth - layer.top_height < 0: raise ValueError(f'Top height ({layer.top_height}m) > profile depth ({profile_depth}m) ' 'in hardness profile)') _.text = "{:.12g}".format((profile_depth - layer.top_height) * 100) if not np.isnan(layer.thickness): _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(layer.thickness * 100) attrib = {'uom': 'N'} _ = ET.SubElement(e_layer, f'{ns}hardness', attrib=attrib) _.text = "{:.12g}".format(layer.hardness) if 'weight_hammer' in layer and not np.isnan(layer.weight_hammer): _ = ET.SubElement(e_layer, f'{ns}weightHammer') _.text = "{:.12g}".format(layer.weight_hammer) if 'weight_tube' in layer and not np.isnan(layer.weight_tube): _ = ET.SubElement(e_layer, f'{ns}weightTube') _.text = "{:.12g}".format(layer.weight_tube) if 'n_drops' in layer and not np.isnan(layer.n_drops): _ = ET.SubElement(e_layer, f'{ns}nDrops') _.text = "{:.12g}".format(layer.n_drops) if 'drop_height' in layer and not np.isnan(layer.drop_height): _ = ET.SubElement(e_layer, f'{ns}dropHeight') _.text = "{:.12g}".format(layer.drop_height * 100) elif isinstance(s_p, snowprofile.profiles.HardnessPointProfile): e_mc = ET.SubElement(e_p, f'{ns}MeasurementComponents', attrib={ 'uomDepth': 'cm', 'uomHardness': 'N'}) _ = ET.SubElement(e_mc, f'{ns}depth') _.text = 'template' _ = ET.SubElement(e_mc, f'{ns}penRes') _.text = 'template' e_m = ET.SubElement(e_p, f'{ns}Measurements') e_m = ET.SubElement(e_m, f'{ns}tupleList') tl = [] for _, layer in s_p.data.iterrows(): if profile_depth - layer.height < 0: raise ValueError(f'Top height ({layer.height}m) > profile depth ({profile_depth}m) ' 'in hardness profile)') _depth = (profile_depth - layer.height) * 100 tl.append(f'{_depth:.12g},{layer.hardness:.12g}') e_m.text = ' '.join(tl) _append_additional_data(e_p, s_p.additional_data, ns=ns) def _insert_stb_test(e_r, s_t, config): if s_t is None: return ns = config['ns'] profile_depth = config['profile_depth'] import snowprofile.stability_tests if isinstance(s_t, snowprofile.stability_tests.CTStabilityTest): e_t = ET.SubElement(e_r, f'{ns}ComprTest', attrib=_gen_common_attrib(s_t, config=config)) _stb_test_common(e_t, s_t, config=config) if len(s_t.results) == 0: # No failure _ = ET.SubElement(e_t, f'{ns}noFailure') else: # Positive result(s) for result in s_t.results: e_fail = ET.SubElement(e_t, f'{ns}failedOn') # Failure layer details _stb_test_layer_details(e_fail, result, ns=ns, profile_depth=profile_depth) # CT result e_resu = ET.SubElement(e_fail, f'{ns}Results') if result.fracture_character is not None: _ = ET.SubElement(e_resu, f'{ns}fractureCharacter') _.text = result.fracture_character _ = ET.SubElement(e_resu, f'{ns}testScore') _.text = '{:d}'.format(result.test_score) elif isinstance(s_t, snowprofile.stability_tests.ECTStabilityTest): e_t = ET.SubElement(e_r, f'{ns}ExtColumnTest', attrib=_gen_common_attrib(s_t, config=config)) _stb_test_common(e_t, s_t, config=config) if len(s_t.results) == 0: # No failure _ = ET.SubElement(e_t, f'{ns}noFailure') else: # Positive result(s) for result in s_t.results: e_fail = ET.SubElement(e_t, f'{ns}failedOn') # Failure layer details _stb_test_layer_details(e_fail, result, ns=ns, profile_depth=profile_depth) # ECT result e_resu = ET.SubElement(e_fail, f'{ns}Results') _ = ET.SubElement(e_resu, f'{ns}testScore') if result.test_score == 0 and result.propagation: _.text = 'ECTPV' elif result.test_score == 0: _.text = 'ECTN1' else: _.text = 'ECT{p}{n}'.format(p = 'P' if result.propagation else 'N', n = result.test_score) elif isinstance(s_t, snowprofile.stability_tests.RBStabilityTest): e_t = ET.SubElement(e_r, f'{ns}RBlockTest', attrib=_gen_common_attrib(s_t, config=config)) _stb_test_common(e_t, s_t, config=config) if len(s_t.results) == 0: # No failure _ = ET.SubElement(e_t, f'{ns}noFailure') else: # Positive result(s) for result in s_t.results: e_fail = ET.SubElement(e_t, f'{ns}failedOn') # Failure layer details _stb_test_layer_details(e_fail, result, ns=ns, profile_depth=profile_depth) # RB result e_resu = ET.SubElement(e_fail, f'{ns}Results') if result.fracture_character is not None: _ = ET.SubElement(e_resu, f'{ns}fractureCharacter') _.text = result.fracture_character if result.release_type is not None: _ = ET.SubElement(e_resu, f'{ns}releaseType') _.text = result.release_type _ = ET.SubElement(e_resu, f'{ns}testScore') _.text = f'RB{result.test_score}' elif isinstance(s_t, snowprofile.stability_tests.ShearFrameStabilityTest): e_t = ET.SubElement(e_r, f'{ns}ShearFrameTest', attrib=_gen_common_attrib(s_t, config=config)) _stb_test_common(e_t, s_t, config=config) if len(s_t.results) == 0: # No failure _ = ET.SubElement(e_t, f'{ns}noFailure') else: # Positive result(s) for result in s_t.results: e_fail = ET.SubElement(e_t, f'{ns}failedOn') # Failure layer details _stb_test_layer_details(e_fail, result, ns=ns, profile_depth=profile_depth) # SF result e_resu = ET.SubElement(e_fail, f'{ns}Results') if result.fracture_character is not None: _ = ET.SubElement(e_resu, f'{ns}fractureCharacter') _.text = result.fracture_character _ = ET.SubElement(e_resu, f'{ns}failureForce', attrib={'uom': 'N'}) _.text = "{:.12g}".format(result.force) elif isinstance(s_t, snowprofile.stability_tests.PSTStabilityTest): e_t = ET.SubElement(e_r, f'{ns}PropSawTest', attrib=_gen_common_attrib(s_t, config=config)) _stb_test_common(e_t, s_t, config=config) e_t = ET.SubElement(e_t, f'{ns}failedOn') # Failure layer details _stb_test_layer_details(e_t, s_t, ns=ns, profile_depth=profile_depth) # ECT result e_resu = ET.SubElement(e_t, f'{ns}Results') _ = ET.SubElement(e_resu, f'{ns}fracturePropagation') _.text = s_t.propagation _ = ET.SubElement(e_resu, f'{ns}cutLength', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(s_t.cut_length * 100) _ = ET.SubElement(e_resu, f'{ns}columnLength', attrib={'uom': 'cm'}) if s_t.column_length is None: _.text = "150" else: _.text = "{:.12g}".format(s_t.column_length * 100) else: raise ValueError(f'Unknown stability test type {type(s_t)}.') def _stb_test_common(e_t, s_t, config={'ns': ''}): ns = config['ns'] if s_t.comment is not None: _ = ET.SubElement(e_t, f'{ns}metaData') _ = ET.SubElement(_, f'{ns}comment') _.text = s_t.comment if s_t.test_nr is not None and config['version'] >= '6.0.6': _ = ET.SubElement(e_t, f'{ns}testNr') _.text = str(s_t.test_nr) _append_additional_data(e_t, s_t.additional_data, ns=ns) def _stb_test_layer_details(e_fail, result, ns='', profile_depth=0): e_layer = ET.SubElement(e_fail, f'{ns}Layer') if result.layer_comment is not None and len(result.layer_comment) > 0: _md = ET.SubElement(e_layer, f'{ns}metaData') _ = ET.SubElement(_md, f'{ns}comment') _.text = str(result.layer_comment) _ = ET.SubElement(e_layer, f'{ns}depthTop', attrib={'uom': 'cm'}) _.text = "{:.12g}".format((profile_depth - result.height) * 100) if result.layer_thickness is not None: _ = ET.SubElement(e_layer, f'{ns}thickness', attrib={'uom': 'cm'}) _.text = "{:.12g}".format(result.layer_thickness * 100) if result.grain_1 is not None: _ = ET.SubElement(e_layer, f'{ns}grainFormPrimary') _.text = result.grain_1 if result.grain_2 is not None: _ = ET.SubElement(e_layer, f'{ns}grainFormSecondary') _.text = result.grain_2 if result.grain_size is not None: _ = ET.SubElement(e_layer, f'{ns}grainSize', attrib={'uom': 'mm'}) _c = ET.SubElement(_, f'{ns}Components') _ = ET.SubElement(_c, f'{ns}avg') _.text = "{:.12g}".format(result.grain_size * 1e3) if result.grain_size_max is not None: _ = ET.SubElement(_c, f'{ns}avgMax') _.text = "{:.12g}".format(result.grain_size_max * 1e3) _md = None if result.layer_formation_time is not None: _ = ET.SubElement(e_layer, f'{ns}validFormationTime') _t = ET.SubElement(_, f'{ns}TimeInstant') _ = ET.SubElement(_t, f'{ns}timePosition') _.text = result.layer_formation_time.isoformat() elif (result.layer_formation_period is not None and result.layer_formation_period[0] is not None and result.layer_formation_period[1] is not None): _ = ET.SubElement(e_layer, f'{ns}validFormationTime') _t = ET.SubElement(_, f'{ns}TimePeriod') _ = ET.SubElement(_t, f'{ns}beginPosition') _.text = result.layer_formation_period[0].isoformat() _ = ET.SubElement(_t, f'{ns}endPosition') _.text = result.layer_formation_period[1].isoformat() if result.layer_additional_data is not None: if _md is None: _md = ET.SubElement(e_layer, f'{ns}metaData') _append_additional_data(_md, result.additional_data, ns=ns)