diff --git a/.gitignore b/.gitignore index 6544fa8b..241994ef 100644 --- a/.gitignore +++ b/.gitignore @@ -118,5 +118,9 @@ ENV/ # Etc *.local +<<<<<<< HEAD +# TF Profiler +======= # TF Profiler output +>>>>>>> master *.json \ No newline at end of file diff --git a/data/signals.py b/data/signals.py index ff069552..386c3659 100644 --- a/data/signals.py +++ b/data/signals.py @@ -4,7 +4,7 @@ import sys from plasma.primitives.data import ( - Signal, ProfileSignal, ChannelSignal, Machine + Signal, ProfileSignal, ChannelSignal, Signal2D, Machine ) @@ -17,7 +17,7 @@ def create_missing_value_filler(): def get_tree_and_tag(path): if '/' not in path: return None, '\\' + path - + spl = path.split('/') tree = spl[0] tag = '\\' + spl[1] @@ -27,7 +27,7 @@ def get_tree_and_tag(path): def get_tree_and_tag_no_backslash(path): if '/' not in path: return None, path - + spl = path.split('/') tree = spl[0] tag = spl[1] @@ -298,6 +298,16 @@ def fetch_nstx_data(signal_path, shot_num, c): "q95 safety factor", ['ppf/efit/q95', "EFIT01/RESULTS.AEQDSK.Q95"], [jet, d3d], causal_shifts=[15, 10], normalize=False, data_avail_tolerances=[0.03, 0.02]) + +q95_EFITRT1 = Signal( + "q95 safety factor in real time", ['ppf/efit/q95', "EFITRT1/RESULTS.AEQDSK.Q95"], + [jet, d3d], causal_shifts=[0, 0], normalize=False, + data_avail_tolerances=[0.03, 0.029]) +vd = Signal( + "vertical displacement change", [ "d3d/vpsdfz"], + [d3d], causal_shifts=[ 0], normalize=False, + data_avail_tolerances=[ 0.029]) + q95t = Signal( "q95 safety factor tol", ['ppf/efit/q95', "EFIT01/RESULTS.AEQDSK.Q95"], [jet, d3d], causal_shifts=[15, 10], normalize=False, @@ -391,6 +401,12 @@ def fetch_nstx_data(signal_path, shot_num, c): ipdirectt = Signal("plasma current direction tol", ["iptdirect"], [d3d], data_avail_tolerances=[0.029]) +ecei = Signal2D("ECEi", ['ECEI_kHz'], [d3d], (20,8), + is_ecei = True, miss_chan_threshold = 80) + +eceit = Signal2D("ECEi", ['ECEI_kHz'], [d3d], (20,8), + is_ecei = True, miss_chan_threshold = 159) + # for downloading, modify this to preprocess shots with only a subset of # signals. This may produce more shots # since only those shots that contain all_signals contained here are used. @@ -400,11 +416,24 @@ def fetch_nstx_data(signal_path, shot_num, c): # dataset. all_signals = { - 'q95': q95, 'li': li, 'ip': ip, 'betan': betan, 'energy': energy, 'lm': lm, - 'dens': dens, 'pradcore': pradcore, - 'pradedge': pradedge, 'pradtot': pradtot, 'pin': pin, + 'q95_EFITRT1': q95_EFITRT1, + #'q95': q95, + 'li': li, + 'ip': ip, + 'betan': betan, + 'energy': energy, + 'lm': lm, + 'dens': dens, + # KGF: Ge leaves these uncommented in all_signals_real_time_0D + #'pradcore': pradcore, + #'pradedge': pradedge, + #'pradtot': pradtot, + #'energydt': energydt, + # 'vd': vd, + 'pin': pin, 'torquein': torquein, - 'energydt': energydt, 'ipdirect': ipdirect, 'iptarget': iptarget, + 'ipdirect': ipdirect, + 'iptarget': iptarget, 'iperr': iperr, # 'tmamp1':tmamp1, 'tmamp2':tmamp2, 'tmfreq1':tmfreq1, 'tmfreq2':tmfreq2, # 'pechin':pechin, @@ -414,14 +443,55 @@ def fetch_nstx_data(signal_path, shot_num, c): # IMPORTANT: must comment-out the following line when preprocessing for # training on JET CW and testing on JET ILW (FRNN 0D). # Otherwise 1K+ CW shots are excluded due to missing profile data - 'etemp_profile': etemp_profile, 'edens_profile': edens_profile, + #'etemp_profile': etemp_profile, 'edens_profile': edens_profile, # 'itemp_profile':itemp_profile, 'zdens_profile':zdens_profile, # 'trot_profile':trot_profile, 'pthm_profile':pthm_profile, # 'neut_profile':neut_profile, 'q_profile':q_profile, # 'bootstrap_current_profile':bootstrap_current_profile, # 'q_psi_profile':q_psi_profile} + + # KGF(2021-12-15): exclude ecei by default, for now: + # 'ecei': ecei, } +# --------------------- +# KGF: from Ge's 2019-2020 code +all_signals_real_time={ + 'q95_EFITRT1': q95_EFITRT1, 'li': li, 'ip': ip, 'betan': betan, 'energy': energy, 'lm': lm, + 'dens': dens, 'pradcore': pradcore, + 'pradedge': pradedge, 'pradtot': pradtot, 'pin': pin, + 'torquein': torquein, + 'energydt': energydt, 'ipdirect': ipdirect, 'iptarget': iptarget, + 'iperr': iperr, + # 'tmamp1':tmamp1, 'tmamp2':tmamp2, 'tmfreq1':tmfreq1, 'tmfreq2':tmfreq2, + # 'pechin':pechin, + # 'rho_profile_spatial':rho_profile_spatial, 'etemp':etemp, + 'etemp_profile': etemp_profile, 'edens_profile': edens_profile, +} +all_signals_real_time_0D={ + 'q95_EFITRT1': q95_EFITRT1, + 'li': li, + 'ip': ip, + 'betan': betan, + 'energy': energy, + 'lm': lm, + 'dens': dens, + 'pradcore': pradcore, + 'pradedge': pradedge, + 'pradtot': pradtot, + 'pin': pin, + 'torquein': torquein, + 'vd': vd, + 'energydt': energydt, + 'iperr':iperr, + 'ipdirect': ipdirect, + 'iptarget': iptarget + # 'tmamp1':tmamp1, 'tmamp2':tmamp2, 'tmfreq1':tmfreq1, 'tmfreq2':tmfreq2, + # 'pechin':pechin, + # 'rho_profile_spatial':rho_profile_spatial, 'etemp':etemp, +} +# --------------------- + all_signals_max_tol = { 'q95t': q95t, 'lit': lit, 'ipt': ipt, 'betant': betant, 'energyt': energyt, 'lmt': lmt, @@ -431,6 +501,11 @@ def fetch_nstx_data(signal_path, shot_num, c): 'ipdirectt': ipdirectt, 'iptargett': iptargett, 'iperrt': iperrt, 'etemp_profilet': etemp_profilet, 'edens_profilet': edens_profilet, + 'ecei': eceit, +} + +ecei_test = { + 'ecei': ecei, } # for actual data analysis, use: @@ -451,26 +526,14 @@ def fetch_nstx_data(signal_path, shot_num, c): sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( sig.is_defined_on_machines(all_machines) and sig.num_channels == 1) } +# NOTE(JAR): The check sig.num_channels > 1 to determine if a signal is 1D will +# now include 2D signals. Need to add something like sig.num_channels < 160 to +# exclude ecei, for example, or come up with another Signal attribute that can +# be used as a reliable way to distiguish between 0D, 1D, and 2D signals fully_defined_signals_1D = { sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( sig.is_defined_on_machines(all_machines) and sig.num_channels > 1) } -d3d_signals = { - sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( - sig.is_defined_on_machine(d3d)) -} -d3d_signals_max_tol = { - sig_name: sig for (sig_name, sig) in all_signals_max_tol.items() if ( - sig.is_defined_on_machine(d3d)) -} -d3d_signals_0D = { - sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( - (sig.is_defined_on_machine(d3d) and sig.num_channels == 1)) -} -d3d_signals_1D = { - sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( - (sig.is_defined_on_machine(d3d) and sig.num_channels > 1)) -} jet_signals = { sig_name: sig for (sig_name, sig) in all_signals_restricted.items() if ( diff --git a/envs/requirements-cpu.yaml b/envs/requirements-cpu.yaml index 4687a03a..c54f63af 100644 --- a/envs/requirements-cpu.yaml +++ b/envs/requirements-cpu.yaml @@ -5,4 +5,4 @@ h5py pyparsing pyyaml pytorch>1.3 -tensorflow>=1.3,<2.0.0 +tensorflow>2.1.0 diff --git a/envs/requirements-linux-64-gpu.yaml b/envs/requirements-linux-64-gpu.yaml index 4f106b9b..204eaeb2 100644 --- a/envs/requirements-linux-64-gpu.yaml +++ b/envs/requirements-linux-64-gpu.yaml @@ -13,7 +13,7 @@ dependencies: - pyparsing - pyyaml - pytorch>1.3 - - tensorflow-gpu>=1.3,<2.0.0 + - tensorflow>2.1.0 - pip: - pathos - matplotlib>=2.0.2 diff --git a/envs/requirements-traverse.yaml b/envs/requirements-traverse.yaml index 09a136a0..aad550db 100644 --- a/envs/requirements-traverse.yaml +++ b/envs/requirements-traverse.yaml @@ -11,16 +11,19 @@ dependencies: - scipy - pandas - flake8 - - h5py + - h5py<3.0.0 - pyparsing - pyyaml - - pytorch>1.3 - - tensorflow-gpu>=1.3,<2.0.0 + #- pytorch>1.3 + - tensorflow>2.1.0 # limited to 2.1.3 on IBM WMLCE 1.7.0 (2020-02-12) as of mid-2021 +# WML CE 1.7.0 is built for CUDA 10.2 and requires version 440 of the NVIDIA GPU driver. +# Which GPU device driver does Traverse have? - pip: - pathos - matplotlib>=2.0.2 - - hyperopt # TODO(KGF): remove + - h5py<3.0.0 + # - hyperopt # TODO(KGF): remove # - mpi4py # must reload MPI library modules before installing via pip - - xgboost - - scikit-learn - - joblib + # - xgboost + # - scikit-learn + # - joblib diff --git a/envs/traverse.cmd b/envs/traverse.cmd index 7708dbc0..e472969d 100644 --- a/envs/traverse.cmd +++ b/envs/traverse.cmd @@ -3,8 +3,8 @@ module load anaconda3 conda activate frnn -module load cudatoolkit/11.0 -module load cudnn/cuda-10.1/7.6.1 +module load cudatoolkit/11.3 +module load cudnn/cuda-11.x/8.2.0 # after RHEL 8 upgrade module load openmpi/gcc/4.0.4/64 diff --git a/examples/conf.yaml b/examples/conf.yaml index 0b996b79..fbeb1066 100644 --- a/examples/conf.yaml +++ b/examples/conf.yaml @@ -2,13 +2,21 @@ # note, the YAML parser will NOT evaluate expressions in the value fields. # e.g. "validation_frac: 1.0/3.0" will result in str value "1.0/3.0" -# will do stuff in fs_path / [username] / signal_data | shot_lists | processed shots, etc. +# will read and write (normalization, etc.) shot data +# in fs_path / [username] / signal_data | shot_lists | processed shots, etc. +# (username is automatically added as first subdir if user_subdir==True) -fs_path: '/tigress' +# will output csvlog, trained model checkpoints, etc. +# in fs_path_output / [username] / results | csv_logs | model_checkpoints | Graph, etc. + +fs_path: '/lus/theta-fs0/projects/fusiondl_aesp/' +user_subdir: True +fs_path_output: '/lus/theta-fs0/projects/fusiondl_aesp/' +user_subdir_output: True target: 'hinge' # 'maxhinge' # 'maxhinge' # 'binary' # 'hinge' -num_gpus: 4 # per node +num_gpus: 1 # per node paths: - signal_prepath: '/signal_data/' # /signal_data/jet/ + signal_prepath: ['/signal_data/', '/signal_data_new_nov2019/'] # /signal_data/jet/ shot_list_dir: '/shot_lists/' tensorboard_save_path: '/Graph/' data: d3d_0D @@ -87,7 +95,7 @@ model: # size 100 slight overfitting, size 20 no overfitting. 200 is not better than 100. # Prediction is much better with size 100, size 20 cannot capture the data. rnn_size: 200 - rnn_type: 'LSTM' + rnn_type: 'CuDNNLSTM' # TODO(KGF): optimize number of RNN layers rnn_layers: 2 num_conv_filters: 128 @@ -106,7 +114,7 @@ model: # lr=1e-4 also works well if we decay a lot (i.e ~0.7 or more) lr: 0.00002 # 0.00001 # 0.0005 # for adam plots 0.0000001 lr_decay: 0.97 # 0.98 # 0.9 - stateful: True + stateful: False return_sequences: True dropout_prob: 0.1 # only relevant if we want to do MPI training. The number of steps with a single replica @@ -120,14 +128,14 @@ training: # used iff 1) test & 2) (train U validate) are both sampled from the same distribution/source lists of shots: train_frac: 0.75 validation_frac: 0.3333333333333333 - batch_size: 128 # 256 + batch_size: 1024 # THE MAX_PATCH_LENGTH WAS THE CULPRIT FOR NO TRAINING! Lower than 1000 performs very poorly max_patch_length: 100000 # How many shots are we loading at once? num_shots_at_once: 200 # large number = maximum number of epochs. # Early stopping will occur if loss does not decrease, after some patience # of epochs - num_epochs: 1000 + num_epochs: 1 use_mock_data: False data_parallel: False hyperparam_tuning: False @@ -136,7 +144,7 @@ training: num_batches_minimum: 20 # minimum number of batches per epoch ranking_difficulty_fac: 1.0 # how much to upweight incorrectly classified shots during training timeline_prof: False - step_limit: 50 + step_limit: 0 no_validation: True callbacks: list: ['earlystop'] diff --git a/examples/generate_onnx.sh b/examples/generate_onnx.sh new file mode 100755 index 00000000..1510302e --- /dev/null +++ b/examples/generate_onnx.sh @@ -0,0 +1,26 @@ +#!/usr/bin/bash -l + +module load conda/2021-11-30; conda activate + +length=(32) +rnn_size=(300) +rnn_layers=(6) + +for l in ${length[*]} +do + for size in ${rnn_size[*]} + do + for nlayer in ${rnn_layers[*]} + do + cd /home/felker/plasma-python/examples + echo "STARTING bs_dynamic_layers${nlayer}_length${l}_size${size}.onnx" + sed -i "91 c\ length: ${l}" conf.yaml + sed -i "97 c\ rnn_size: ${size}" conf.yaml + sed -i "100 c\ rnn_layers: ${nlayer}" conf.yaml + python mpi_learn.py + mv /lus/theta-fs0/projects/fusiondl_aesp/felker/model_checkpoints/*.onnx ~/bs_dynamic_layers${nlayer}_length${l}_size${size}.onnx + rm -rfd /lus/theta-fs0/projects/fusiondl_aesp/felker/model_checkpoints/* + echo "FINISHED bs_dynamic_layers${nlayer}_length${l}_size${size}.onnx" + done + done +done diff --git a/examples/mpi_learn.py b/examples/mpi_learn.py index e06a64ad..d00eae75 100644 --- a/examples/mpi_learn.py +++ b/examples/mpi_learn.py @@ -1,8 +1,8 @@ import plasma.global_vars as g -g.init_MPI() - +import argparse import os.path +g.init_MPI() # TODO(KGF): replace this workaround for the "from plasma.conf import conf" def is_valid_file(parser, arg): @@ -11,7 +11,6 @@ def is_valid_file(parser, arg): else: return arg -import argparse parser = argparse.ArgumentParser(prog='mpi_learn', description='FusionDL TensorFlow 1.x + mpi4py') parser.add_argument("--input_file", "-i", # type=str, required=False, dest="conf_file", @@ -23,7 +22,6 @@ def is_valid_file(parser, arg): from plasma.conf import conf - from plasma.models.mpi_runner import ( mpi_train, mpi_make_predictions_and_evaluate ) diff --git a/plasma/conf_parser.py b/plasma/conf_parser.py index 29b96ece..cd86386f 100644 --- a/plasma/conf_parser.py +++ b/plasma/conf_parser.py @@ -25,13 +25,11 @@ def parameters(input_file): params = yaml.load(yaml_file, Loader=yaml.SafeLoader) params['user_name'] = getpass.getuser() base_path = params['fs_path'] - output_path = os.path.join(base_path, params['user_name']) - # TODO(KGF): this next line should be deleted at some pt, breaking BC - base_path = output_path - print(output_path) - # TODO(KGF): allow for completely indpendent save/output_path vs. base_path - # configured in conf.yaml. don't assume username subdirectory, or pwd - # save_path = os.environ.get("PWD") + if params['user_subdir']: + base_path = os.path.join(base_path, params['user_name']) + output_path = params['fs_path_output'] + if params['user_subdir_output']: + output_path = os.path.join(output_path, params['user_name']) params['paths']['base_path'] = base_path params['paths']['output_path'] = output_path @@ -64,7 +62,7 @@ def parameters(input_file): h = myhash_signals(sig.all_signals.values()) params['paths']['global_normalizer_path'] = ( - output_path + base_path + '/normalization/normalization_signal_group_{}.npz'.format(h)) if params['training']['hyperparam_tuning']: # params['paths']['saved_shotlist_path'] = @@ -90,7 +88,7 @@ def parameters(input_file): + params['paths']['data'] + '/shot_lists_signal_group_{}.npz'.format(h)) params['paths']['processed_prepath'] = ( - output_path + '/processed_shots/' + 'signal_group_{}/'.format(h)) + base_path + '/processed_shots/' + 'signal_group_{}/'.format(h)) # ensure shallow model has +1 -1 target. if params['model']['shallow'] or params['target'] == 'hinge': params['data']['target'] = HingeTarget @@ -320,6 +318,28 @@ def parameters(input_file): 'etemp_profile': sig.etemp_profile, 'edens_profile': sig.edens_profile, } + elif params['paths']['data'] == 'd3d_2019_all_dims': + params['paths']['shot_files'] = [d3d_full_2019] + params['paths']['shot_files_test'] = [] + params['paths']['use_signals_dict'] = { + 'q95': sig.q95, + 'li': sig.li, + 'ip': sig.ip, + 'lm': sig.lm, + 'betan': sig.betan, + 'energy': sig.energy, + 'dens': sig.dens, + 'pradcore': sig.pradcore, + 'pradedge': sig.pradedge, + 'pin': sig.pin, + 'torquein': sig.torquein, + 'ipdirect': sig.ipdirect, + 'iptarget': sig.iptarget, + 'iperr': sig.iperr, + 'etemp_profile': sig.etemp_profile, + 'edens_profile': sig.edens_profile, + 'ecei': sig.ecei, + } elif params['paths']['data'] == 'd3d_1D': params['paths']['shot_files'] = [d3d_full] params['paths']['shot_files_test'] = [] @@ -328,6 +348,12 @@ def parameters(input_file): 'etemp_profile': sig.etemp_profile, 'edens_profile': sig.edens_profile, } + elif params['paths']['data'] == 'd3d_2D': + params['paths']['shot_files'] = [d3d_full_2019] + params['paths']['shot_files_test'] = [] + params['paths']['use_signals_dict'] = { + 'ecei': sig.ecei, + } elif params['paths']['data'] == 'd3d_all_profiles': params['paths']['shot_files'] = [d3d_full] params['paths']['shot_files_test'] = [] @@ -345,19 +371,24 @@ def parameters(input_file): 'q_psi_profile': sig.q_psi_profile, } elif params['paths']['data'] == 'd3d_0D': - params['paths']['shot_files'] = [d3d_full] + #params['paths']['shot_files'] = [d3d_full] + params['paths']['shot_files'] = [d3d_full_2019] params['paths']['shot_files_test'] = [] params['paths']['use_signals_dict'] = { - 'q95': sig.q95, + # KGF: edits for real-time signals + #'q95': sig.q95, + 'q95_EFITRT1': sig.q95_EFITRT1, 'li': sig.li, 'ip': sig.ip, 'lm': sig.lm, 'betan': sig.betan, 'energy': sig.energy, 'dens': sig.dens, - 'pradcore': sig.pradcore, - 'pradedge': sig.pradedge, + # 'pradcore': sig.pradcore, + # 'pradedge': sig.pradedge, 'pin': sig.pin, + # KGF: added but commented out in Ge's version + # 'vd': sig.vd, 'torquein': sig.torquein, 'ipdirect': sig.ipdirect, 'iptarget': sig.iptarget, diff --git a/plasma/global_vars.py b/plasma/global_vars.py index 5f7e1275..db0967ee 100644 --- a/plasma/global_vars.py +++ b/plasma/global_vars.py @@ -9,6 +9,8 @@ MY_GPU = 0 # TODO(KGF): remove this (and all?) references to Keras backend backend = '' +backendpackage = '' +bfloat16= '' tf_ver = None conf_file = None @@ -22,11 +24,23 @@ def init_MPI(): def init_GPU_backend(conf): - global NUM_GPUS, MY_GPU, backend + global NUM_GPUS, MY_GPU, backend, backendpackage, bfloat16 NUM_GPUS = conf['num_gpus'] MY_GPU = task_index % NUM_GPUS backend = conf['model']['backend'] + # KGF: added via Subrata patch in April 2021 specific to tf2 branch + # (neither of the following options are in the default conf.yaml) + try: + backendpackage = conf['model']['backendpackage'] + except KeyError as ex: + backendpackage = backend + + try: + bfloat16 = conf['model']['bfloat16'] + except KeyError as ex: + bfloat16 = '' + def pprint_unique(obj): from pprint import pprint diff --git a/plasma/models/builder.py b/plasma/models/builder.py index 83dbe8f5..d62355f4 100644 --- a/plasma/models/builder.py +++ b/plasma/models/builder.py @@ -14,7 +14,8 @@ Convolution1D, MaxPooling1D, TimeDistributed, Concatenate ) -from tensorflow.compat.v1.keras.layers import CuDNNLSTM +CuDNNLSTM = LSTM +# from tensorflow.compat.v1.keras.layers import CuDNNLSTM from tensorflow.keras.callbacks import Callback from tensorflow.keras.regularizers import l2 # l1, l1_l2 @@ -23,19 +24,19 @@ import sys import numpy as np from copy import deepcopy +from packaging import version from plasma.utils.downloading import makedirs_process_safe from plasma.utils.hashing import general_object_hash from plasma.models.tcn import TCN # TODO(KGF): consider using importlib.util.find_spec() instead (Py>3.4) try: - import keras2onnx + import tf2onnx import onnx + # CLI: python -m tf2onnx.convert --saved-model model.97765202633820900403308121179367157713._epoch_.0 --output frnn-1D.onnx except ImportError: # as e: - _has_onnx = False - # onnx = None - # keras2onnx = None + _has_tf2onnx = False else: - _has_onnx = True + _has_tf2onnx = True # Synchronize 2x stderr msg from TensorFlow initialization via Keras backend # "Succesfully opened dynamic library... libcudart" "Using TensorFlow backend." @@ -146,7 +147,18 @@ def build_model(self, predict, custom_batch_size=None): print('Unkown Model Type, exiting.') exit(1) + # KGF: key line batch_input_shape = (batch_size, length, num_signals) + + # TODO(KGF): need a more substantial redesign of conf to support stateful=False + # variable training batch size, since even in that case, we need + # conf[training][batch_size] to be fixed to form the inputs to the variable model + + # For now, just reset batch_input_shape=(None, ...) at tf2onnx export time below + #batch_input_shape = (None, length, num_signals) + + + # batch_shape_non_temporal = (batch_size, num_signals) indices_0d, indices_1d, num_0D, num_1D = self.get_0D_1D_indices() @@ -287,10 +299,10 @@ def slicer_output_shape(input_shape, indices): # pre_rnn_model.summary() # sys.stdout = ori # fr.close() - # pre_rnn_model.summary() + + # if g.task_index == 0: + # pre_rnn_model.summary() x_input = Input(batch_shape=batch_input_shape) - # TODO(KGF): Ge moved this inside a new conditional in Dec 2019. check - # x_in = TimeDistributed(pre_rnn_model)(x_input) if (num_1D > 0 or ( 'extra_dense_input' in model_conf.keys() and model_conf['extra_dense_input'])): @@ -333,9 +345,10 @@ def slicer_output_shape(input_shape, indices): bias_regularizer=l2(regularization), ) if rnn_type != 'CuDNNLSTM': - # Dropout is unsupported in CuDNN library - model_kwargs['dropout'] = dropout_prob - model_kwargs['recurrent_dropout'] = dropout_prob + # Dropout (on linear transformation of recurrent state) is unsupported + # in cuDNN library + model_kwargs['recurrent_dropout'] = dropout_prob # recurrent states + model_kwargs['dropout'] = dropout_prob # input states for _ in range(model_conf['rnn_layers']): x_in = rnn_model(rnn_size, **model_kwargs)(x_in) x_in = Dropout(dropout_prob)(x_in) @@ -373,35 +386,34 @@ def save_model_weights(self, model, epoch): signatures=None, # applicable to 'tf' SavedModel format only ) # TensorFlow SavedModel format (full directory) - full_model_save_dir = full_model_save_path.rsplit('.',1)[0] + full_model_save_dir = full_model_save_path.rsplit('.', 1)[0] # TODO(KGF): model.save(..., save_format='tf') disabled in r1.15 # Same with tf.keras.models.save_model(..., save_format="tf"). # Need to use experimental API until r2.x - # model.save(full_model_save_dir, overwrite=True, save_format='tf') - tf.keras.experimental.export_saved_model(model, full_model_save_dir, - custom_objects=None, - as_text=False, - input_signature=None, - serving_only=False - ) - # WARNING:tensorflow:Export includes no default signature! - - # KGF: is the above comment accurate? 1.15.0 on macOS marks export_saved_model() - # as deprecated, but below save() call errors out - # model.save(full_model_save_dir, - # overwrite=True, - # include_optimizer=True, - # save_format='tf', - # signatures=None, - # ) - + if (version.parse(g.tf_ver) > version.parse('2.1.0') + and version.parse(g.tf_ver).minor != 1): + # errors out in TF 2.1.3 on Traverse (latest version on IBM WMLCE 1.7.0) + model.save(full_model_save_dir, overwrite=True, save_format='tf') # try: - if _has_onnx: + if _has_tf2onnx: save_path = self.get_save_path(epoch, ext='onnx') - onnx_model = keras2onnx.convert_keras(model, model.name, - target_opset=10) - onnx.save_model(onnx_model, save_path) + # TODO(KGF): eliminate this repeated def of x_input shape from build_model() + model_conf = self.conf['model'] + length = model_conf['length'] + batch_size = self.conf['training']['batch_size'] + use_signals = self.conf['paths']['use_signals'] + num_signals = sum([sig.num_channels for sig in use_signals]) + batch_input_shape = (None, length, num_signals) + ########print(f"batch_input_shape = {batch_input_shape}") + # ValueError: Input 0 of node model_1/lstm/AssignVariableOp was passed float + # from model_1/lstm/lstm_cell/ones_like_1/ReadVariableOp/resource:0 + # incompatible with expected resource. + model_proto, external_tensor_storage = tf2onnx.convert.from_keras( + model, input_signature=[tf.TensorSpec(batch_input_shape)], + opset=10, output_path=save_path) + # KGF: error likely due to the splitting of pre_rnn_model and rnn_model, since + # the latter expects a trivially-wrapped TimeDistributed(Input()) for 0D model # except Exception as e: # print(e) return diff --git a/plasma/models/mpi_runner.py b/plasma/models/mpi_runner.py index 9a3c5d0c..4b01b296 100644 --- a/plasma/models/mpi_runner.py +++ b/plasma/models/mpi_runner.py @@ -12,8 +12,10 @@ # Keras "Using TensorFlow backend" stderr messages do not interfere in stdout from plasma.conf import conf from mpi4py import MPI -from pkg_resources import parse_version, get_distribution import random +import itertools +from packaging import version +import contextlib ''' ######################################################### This file trains a deep learning model to predict @@ -32,7 +34,6 @@ import datetime import numpy as np -from tensorflow.python.client import timeline from functools import partial from copy import deepcopy # import socket @@ -57,7 +58,6 @@ os.environ['CUDA_VISIBLE_DEVICES'] = '{}'.format(g.MY_GPU) # ,mode=NanGuardMode' os.environ['KERAS_BACKEND'] = 'tensorflow' # default setting - g.tf_ver = parse_version(get_distribution('tensorflow').version) # compat/compat.py first committed on 2018-06-29 for Py 2 vs 3 # (around, but not present in, the release of v1.9.0) # v2 compatiblity code added, then moved from compat.py in Nov and Dec 2018 @@ -65,30 +65,53 @@ # But many TF deprecation warnings in 1.14.0, e.g.: # "The name tf.GPUOptions is deprecated. Please use tf.compat.v1.GPUOptions # instead". See tf_export.py - if g.tf_ver >= parse_version('1.14.0'): - import tensorflow.compat.v1 as tf - else: - import tensorflow as tf + # if g.tf_ver >= parse_version('1.14.0'): + # import tensorflow.compat.v1 as tf + # else: + import tensorflow as tf + g.tf_ver = tf.__version__ # setting g.tf_ver moved after the import Summer 2021 # TODO(KGF): above, builder.py (bug workaround), mpi_launch_tensorflow.py, # and runner.py are the only files that import tensorflow directly - from tensorflow.keras.backend import set_session + # from tensorflow.keras.backend import set_session # KGF: next 3 lines dump many TensorFlow diagnostics to stderr. # All MPI ranks first "Successfully opened dynamic library libcuda" # then, one by one: ID GPU, libcudart, libcublas, libcufft, ... # Finally, "Device interconnect StreamExecutor with strength 1 edge matrix" - gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.95, - allow_growth=True) - config = tf.ConfigProto(gpu_options=gpu_options) - set_session(tf.Session(config=config)) + # gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.95, + # allow_growth=True) + # config = tf.ConfigProto(gpu_options=gpu_options) + # set_session(tf.Session(config=config)) g.flush_all_inorder() else: - sys.exit('Invalid Keras backend specified') + sys.exit('Invalid Keras backend specified !! {}'.format(g.backend)) for i in range(g.num_workers): g.comm.Barrier() if i == g.task_index: print('[{}] importing Keras'.format(g.task_index)) - import tensorflow.keras.backend as K + # bf16 + if g.bfloat16 == 'keras': + print('Running in BFloat16 Via Keras') + from tensorflow.keras.mixed_precision import experimental as mixed_precision + policy = mixed_precision.Policy('mixed_bfloat16') + mixed_precision.set_policy(policy) + elif g.bfloat16 == 'amp': + print('Running in BFloat16 via AutoMixedPrecisionMkl') + import tensorflow as tf + from tensorflow.core.protobuf import rewriter_config_pb2 + from tensorflow.python.keras import backend as K + + graph_options = tf.compat.v1.GraphOptions( + rewrite_options=rewriter_config_pb2.RewriterConfig( + auto_mixed_precision_mkl=rewriter_config_pb2.RewriterConfig.ON)) + + session_conf = tf.compat.v1.ConfigProto(graph_options = graph_options) + sess = tf.compat.v1.Session(config=session_conf) + K.set_session(sess) + else: + import tensorflow.keras.backend as K + + from tensorflow.keras.utils import Progbar # TODO(KGF): instead of tensorflow.keras.callbacks.CallbackList() # until API added in tf-nightly in v2.2.0 @@ -256,9 +279,9 @@ def compile(self, optimizer, clipnorm, loss='mse'): SGD, Adam, RMSprop, Nadam ) if optimizer == 'sgd': - optimizer_class = SGD(lr=self.DUMMY_LR, clipnorm=clipnorm) + optimizer_class = SGD(learning_rate=self.DUMMY_LR, clipnorm=clipnorm) elif optimizer == 'momentum_sgd': - optimizer_class = SGD(lr=self.DUMMY_LR, clipnorm=clipnorm, + optimizer_class = SGD(learning_rate=self.DUMMY_LR, clipnorm=clipnorm, decay=1e-6, momentum=0.9) elif optimizer == 'tf_momentum_sgd': # TODO(KGF): removed TFOptimizer wrapper from here and below @@ -267,27 +290,19 @@ def compile(self, optimizer, clipnorm, loss='mse'): optimizer_class = tf.train.MomentumOptimizer( learning_rate=self.DUMMY_LR, momentum=0.9) elif optimizer == 'adam': - optimizer_class = Adam(lr=self.DUMMY_LR, clipnorm=clipnorm) + optimizer_class = Adam(learning_rate=self.DUMMY_LR, clipnorm=clipnorm) elif optimizer == 'tf_adam': optimizer_class = tf.train.AdamOptimizer( learning_rate=self.DUMMY_LR) elif optimizer == 'rmsprop': - optimizer_class = RMSprop(lr=self.DUMMY_LR, clipnorm=clipnorm) + optimizer_class = RMSprop(learning_rate=self.DUMMY_LR, clipnorm=clipnorm) elif optimizer == 'nadam': - optimizer_class = Nadam(lr=self.DUMMY_LR, clipnorm=clipnorm) + optimizer_class = Nadam(learning_rate=self.DUMMY_LR, clipnorm=clipnorm) else: print("Optimizer not implemented yet") exit(1) - # Timeline profiler - if (self.conf is not None - and conf['training']['timeline_prof']): - self.run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) - self.run_metadata= tf.RunMetadata() - self.model.compile(optimizer=optimizer_class, loss=loss, - options=self.run_options, run_metadata=self.run_metadata) - else: - self.model.compile(optimizer=optimizer_class, loss=loss) + self.model.compile(optimizer=optimizer_class, loss=loss) self.ensure_equal_weights() @@ -474,6 +489,11 @@ def build_callbacks(self, conf, callbacks_list): patience=patience, monitor=monitor, mode=mode)] if "lr_scheduler" in callbacks_list: pass + # if conf['training']['timeline_prof']: + # tb_callback = tf.keras.callbacks.TensorBoard( + # log_dir="./logs", profile_batch=(10, 15), + # update_freq=1,) + # callbacks += [tb_callback] return cbks.CallbackList(callbacks) @@ -527,10 +547,6 @@ def train_epoch(self): loss_averager = Averager() t_start = time.time() - timeline_prof = False - if (self.conf is not None - and conf['training']['timeline_prof']): - timeline_prof = True step_limit = 0 if (self.conf is not None and conf['training']['step_limit'] > 0): @@ -546,88 +562,87 @@ def train_epoch(self): while ((self.num_so_far - self.epoch * num_total) < num_total or step < self.num_batches_minimum): - if step_limit > 0 and step > step_limit: - print('reached step limit') - break - try: - (batch_xs, batch_ys, batches_to_reset, num_so_far_curr, - num_total, is_warmup_period) = next(batch_iterator_func) - except StopIteration: - g.print_unique("Resetting batch iterator.") - self.num_so_far_accum = self.num_so_far_indiv - self.set_batch_iterator_func() - batch_iterator_func = self.batch_iterator_func - (batch_xs, batch_ys, batches_to_reset, num_so_far_curr, - num_total, is_warmup_period) = next(batch_iterator_func) - self.num_so_far_indiv = self.num_so_far_accum + num_so_far_curr - - # if batches_to_reset: - # self.model.reset_states(batches_to_reset) - - warmup_phase = (step < self.warmup_steps and self.epoch == 0) - num_replicas = 1 if warmup_phase else self.num_replicas - - self.num_so_far = self.mpi_sum_scalars( - self.num_so_far_indiv, num_replicas) - - # run the model once to force compilation. Don't actually use these - # values. - if first_run: - first_run = False - t0_comp = time.time() - # print('input_dimension:',batch_xs.shape) - # print('output_dimension:',batch_ys.shape) - _, _ = self.train_on_batch_and_get_deltas( + # TODO(KGF): this is still not correctly tracing the steps on CPU + if version.parse(g.tf_ver) >= version.parse('2.2.0'): + # TensorFlow profiler added in April 2020, TF 2.2.0 + cm = tf.profiler.experimental.Trace('train', step_num=step, _r=1) + else: + cm = contextlib.nullcontext() + with cm: + if step_limit > 0 and step > step_limit: + print('reached step limit') + break + try: + (batch_xs, batch_ys, batches_to_reset, num_so_far_curr, + num_total, is_warmup_period) = next(batch_iterator_func) + except StopIteration: + g.print_unique("Resetting batch iterator.") + self.num_so_far_accum = self.num_so_far_indiv + self.set_batch_iterator_func() + batch_iterator_func = self.batch_iterator_func + (batch_xs, batch_ys, batches_to_reset, num_so_far_curr, + num_total, is_warmup_period) = next(batch_iterator_func) + self.num_so_far_indiv = self.num_so_far_accum + num_so_far_curr + + # if batches_to_reset: + # self.model.reset_states(batches_to_reset) + + warmup_phase = (step < self.warmup_steps and self.epoch == 0) + num_replicas = 1 if warmup_phase else self.num_replicas + + self.num_so_far = self.mpi_sum_scalars( + self.num_so_far_indiv, num_replicas) + + # run the model once to force compilation. Don't actually use these + # values. + if first_run: + first_run = False + t0_comp = time.time() + # print('input_dimension:',batch_xs.shape) + # print('output_dimension:',batch_ys.shape) + _, _ = self.train_on_batch_and_get_deltas( + batch_xs, batch_ys, verbose) + self.comm.Barrier() + sys.stdout.flush() + # TODO(KGF): check line feed/carriage returns around this + g.print_unique('\nCompilation finished in {:.2f}s'.format( + time.time() - t0_comp)) + t_start = time.time() + sys.stdout.flush() + + if np.any(batches_to_reset) and self.conf['model']['stateful']: + print(f"KGF batches_to_reset = {batches_to_reset}") + reset_states(self.model, batches_to_reset) + if ('noise' in self.conf['training'].keys() + and self.conf['training']['noise'] is not False): + batch_xs = self.add_noise(batch_xs) + t0 = time.time() + deltas, loss = self.train_on_batch_and_get_deltas( batch_xs, batch_ys, verbose) - self.comm.Barrier() - sys.stdout.flush() - # TODO(KGF): check line feed/carriage returns around this - g.print_unique('\nCompilation finished in {:.2f}s'.format( - time.time() - t0_comp)) - t_start = time.time() - sys.stdout.flush() - - if np.any(batches_to_reset): - reset_states(self.model, batches_to_reset) - if ('noise' in self.conf['training'].keys() - and self.conf['training']['noise'] is not False): - batch_xs = self.add_noise(batch_xs) - t0 = time.time() - deltas, loss = self.train_on_batch_and_get_deltas( - batch_xs, batch_ys, verbose) - t1 = time.time() - if not is_warmup_period: - self.set_new_weights(deltas, num_replicas) - t2 = time.time() - write_str_0 = self.calculate_speed(t0, t1, t2, num_replicas) - curr_loss = self.mpi_average_scalars(1.0*loss, num_replicas) - # g.print_unique(self.model.get_weights()[0][0][:4]) - loss_averager.add_val(curr_loss) - ave_loss = loss_averager.get_ave() - eta = self.estimate_remaining_time( - t0 - t_start, self.num_so_far - self.epoch*num_total, - num_total) - write_str = ( - '\r[{}] step: {} [ETA: {:.2f}s] [{:.2f}/{}], '.format( - self.task_index, step, eta, 1.0*self.num_so_far, + t1 = time.time() + if not is_warmup_period: + self.set_new_weights(deltas, num_replicas) + t2 = time.time() + write_str_0 = self.calculate_speed(t0, t1, t2, num_replicas) + curr_loss = self.mpi_average_scalars(1.0*loss, num_replicas) + # g.print_unique(self.model.get_weights()[0][0][:4]) + loss_averager.add_val(curr_loss) + ave_loss = loss_averager.get_ave() + eta = self.estimate_remaining_time( + t0 - t_start, self.num_so_far - self.epoch*num_total, num_total) - + 'loss: {:.5f} [{:.5f}] | '.format(ave_loss, curr_loss) - + 'walltime: {:.4f} | '.format( - time.time() - self.start_time)) - g.write_unique(write_str + write_str_0) - - if timeline_prof: - # dump profile - tl = timeline.Timeline(self.run_metadata.step_stats) - ctf = tl.generate_chrome_trace_format() - # dump file per iteration - with open('timeline_%s.json' % step, 'w') as f: - f.write(ctf) - - step += 1 - else: - g.write_unique('\r[{}] warmup phase, num so far: {}'.format( - self.task_index, self.num_so_far)) + write_str = ( + '\r[{}] step: {} [ETA: {:.2f}s] [{:.2f}/{}], '.format( + self.task_index, step, eta, 1.0*self.num_so_far, + num_total) + + 'loss: {:.5f} [{:.5f}] | '.format(ave_loss, curr_loss) + + 'walltime: {:.4f} | '.format( + time.time() - self.start_time)) + g.write_unique(write_str + write_str_0) + step += 1 + else: + g.write_unique('\r[{}] warmup phase, num so far: {}'.format( + self.task_index, self.num_so_far)) effective_epochs = 1.0*self.num_so_far/num_total epoch_previous = self.epoch @@ -757,8 +772,27 @@ def mpi_make_predictions(conf, shot_list, loader, custom_path=None): if g.task_index != 0: loader.verbose = False + # MPI loop works by predicting in batches of the + # largest possible multiple of len(shot_sublists) < num_workers + # i.e. if there are 9 shot_sublists and 4 workers, + # worker 0 will predict shot_sublist 0, 4, and 8 + # workers 1-3 will predict shot_sublist 1-3 and 5-7 respectively + # each makes their prediction on the ith iteration of the loop + # (i.e. worker 0 predicts shot_sublist 0 on loop iteration i=0) + # and then skips through loop iterations unless it has to predict again (i=4) + # or aggregate predictions with other workers, after each worker has made a prediction + # which happens every num_workers iterations in the for loop + # (i.e. worker 0 will aggregate predictions with workers 1-3 at the end of i=3) + # During the aggregation step, each worker is uses its color (which denotes whether it was + # predicting or not predicting during the last few runs of the for loop) to split the main comm + # The predictors (color = 1) share their predictions with first each other, and then to everyone + # the nonpredictors (color = 2) only recieve the global predictions from the predictors + color = 2 for (i, shot_sublist) in enumerate(shot_sublists): + shpz = [] + max_length = -1 # So non shot predictive workers don't have a real length if i % g.num_workers == g.task_index: + color = 1 X, y, shot_lengths, disr = loader.load_as_X_y_pred(shot_sublist) # load data and fit on data @@ -774,19 +808,64 @@ def mpi_make_predictions(conf, shot_list, loader, custom_path=None): y_prime += y_p y_gold += y disruptive += disr - # print_all('\nFinished with i = {}'.format(i)) if (i % g.num_workers == g.num_workers - 1 or i == len(shot_sublists) - 1): + # Create numpy block from y list which is used in MPI + # Pads y_prime and y_gold with zeros to maximum shot length within block being transferred + if color ==1: + shpz = [max(y.shape) for y in y_prime] + max_length = max([max(y.shape) for y in y_p]) + max_length = g.comm.allreduce(max_length, MPI.MAX) + if color == 1: + y_prime_numpy = np.stack([np.pad(sublist, pad_width=((0,max_length-max(sublist.shape)),(0,0))) for sublist in y_prime]) + y_gold_numpy = np.stack([np.pad(sublist, pad_width=((0,max_length-max(sublist.shape)),(0,0))) for sublist in y_gold]) + + temp_predictor_only_comm = MPI.Comm.Split(g.comm, color, i) + # Create numpy array to store all processors output, then aggregate and unpad using MPI gathered shape list + shpzg = g.comm.allgather(shpz) + shpzg = list(itertools.chain(*shpzg)) + shpzg = [s for s in shpzg if s != []] + max_length = g.comm.allreduce(max_length, MPI.MAX) + if color == 1: + num_pred = temp_predictor_only_comm.size + else: + num_pred = g.comm.size - temp_predictor_only_comm.size + y_primeg = np.zeros((num_pred*conf['model']['pred_batch_size'],max_length,1), dtype=conf['data']['floatx']) + y_goldg = np.zeros((num_pred*conf['model']['pred_batch_size'],max_length,1), dtype=conf['data']['floatx']) + y_primeg_flattend = np.zeros(y_primeg.flatten().shape) + y_goldg_flattend = np.zeros(y_goldg.flatten().shape) + if color == 1: + # Ensure that numpy arrays have correct dimensions before gathering them + assert num_pred*max(y_prime_numpy.flatten().shape) == max(y_primeg_flattend.shape) + assert num_pred*max(y_gold_numpy.flatten().shape) == max(y_goldg_flattend.shape) + temp_predictor_only_comm.Allgather(y_prime_numpy.flatten(), y_primeg_flattend) + temp_predictor_only_comm.Allgather(y_gold_numpy.flatten(), y_goldg_flattend) + # Process 0 broadcast y_primeg and y_goldg to all processors, including ones + # not involved in calculating predictions so they can each create their own + # y_prime_global and y_gold_global g.comm.Barrier() - y_prime_global += concatenate_sublists(g.comm.allgather(y_prime)) - y_gold_global += concatenate_sublists(g.comm.allgather(y_gold)) + g.comm.Bcast(y_primeg_flattend, root=0) + g.comm.Bcast(y_goldg_flattend, root=0) + y_primeg_flattend = np.split(y_primeg_flattend, num_pred) + y_goldg_flattend = np.split(y_goldg_flattend, num_pred) + y_primeg = [y.reshape((conf['model']['pred_batch_size'], max_length, 1)) for y in y_primeg_flattend] + y_goldg = [y.reshape((conf['model']['pred_batch_size'], max_length, 1)) for y in y_goldg_flattend] + y_primeg = np.concatenate(y_primeg, axis=0) + y_goldg = np.concatenate(y_goldg, axis=0) + # Unpad each shot to its true length + for idx, s in enumerate(shpzg): + trim = lambda nparry, s: nparry[0:int(s),:] + y_prime_global.append(trim(y_primeg[idx],s)) + y_gold_global.append(trim(y_goldg[idx], s)) + disruptive_global += concatenate_sublists( g.comm.allgather(disruptive)) - g.comm.Barrier() y_prime = [] y_gold = [] disruptive = [] + color = 2 + temp_predictor_only_comm.Free() if g.task_index == 0: pbar.add(1.0*len(shot_sublist)) @@ -843,7 +922,7 @@ def mpi_train(conf, shot_list_train, shot_list_validate, loader, conf['num_workers'] = g.comm.Get_size() specific_builder = builder.ModelBuilder(conf) - if g.tf_ver >= parse_version('1.14.0'): + if version.parse(g.tf_ver) >= version.parse('1.14.0'): # Internal TensorFlow flags, subject to change (v1.14.0+ only?) try: from tensorflow.python.util import module_wrapper as depr @@ -911,7 +990,7 @@ def mpi_train(conf, shot_list_train, shot_list_validate, loader, tensorboard_save_path = conf['paths']['tensorboard_save_path'] write_grads = conf['callbacks']['write_grads'] tensorboard = TensorBoard(log_dir=tensorboard_save_path, - histogram_freq=1, write_graph=True, + histogram_freq=1, # write_graph=True, write_grads=write_grads) tensorboard.set_model(mpi_model.model) # TODO(KGF): check addition of TF model summary write added from fork @@ -938,6 +1017,9 @@ def mpi_train(conf, shot_list_train, shot_list_validate, loader, best_so_far = np.inf cmp_fn = min + # if conf['training']['timeline_prof']: + # tf.profiler.experimental.start('./logs') + while e < num_epochs: g.write_unique('\nBegin training from epoch {:.2f}/{}'.format( e, num_epochs)) @@ -1044,6 +1126,8 @@ def mpi_train(conf, shot_list_train, shot_list_validate, loader, if stop_training: g.write_unique("Stopping training due to early stopping") break + # if conf['training']['timeline_prof']: + # tf.profiler.experimental.stop() if g.task_index == 0: callbacks.on_train_end() @@ -1064,25 +1148,24 @@ def get_stop_training(callbacks): class TensorBoard(object): def __init__(self, log_dir='./logs', histogram_freq=0, validation_steps=0, - write_graph=True, write_grads=False): + # write_graph=True, + write_grads=False): if K.backend() != 'tensorflow': raise RuntimeError('TensorBoard callback only works ' 'with the TensorFlow backend.') self.log_dir = log_dir self.histogram_freq = histogram_freq - self.merged = None self.writer = None - self.write_graph = write_graph + # self.write_graph = write_graph self.write_grads = write_grads self.validation_steps = validation_steps - self.sess = None self.model = None def set_model(self, model): self.model = model - self.sess = K.get_session() - if self.histogram_freq and self.merged is None: + # TODO(KGF): check removal of cond "&& self.merged is None" + if self.histogram_freq: for layer in self.model.layers: for weight in layer.weights: mapped_weight_name = weight.name.replace(':', '_') @@ -1099,82 +1182,22 @@ def is_indexed_slices(grad): tf.summary.histogram( '{}_grad'.format(mapped_weight_name), grad) + # KGF: Skip writing layer output histograms in TF 2.x, for now? if hasattr(layer, 'output'): tf.summary.histogram('{}_out'.format(layer.name), layer.output) - self.merged = tf.summary.merge_all() - if self.write_graph: - self.writer = tf.summary.FileWriter(self.log_dir, - self.sess.graph) - else: - self.writer = tf.summary.FileWriter(self.log_dir) + self.writer = tf.summary.create_file_writer(self.log_dir) def on_epoch_end(self, val_generator, val_steps, epoch, logs=None): logs = logs or {} + # KGF: val_roc, val_loss, train_loss for name, value in logs.items(): if name in ['batch', 'size']: continue - summary = tf.Summary() - summary_value = summary.value.add() - summary_value.simple_value = value.item() - summary_value.tag = name - self.writer.add_summary(summary, epoch) + tf.summary.scalar(name, value, step=epoch) self.writer.flush() - # KGF: targets attribute of Model class moved to private in tf.keras - tensors = (self.model.inputs + self.model._targets - ) # + self.model.sample_weights) - # KGF: tf.keras results in sample_weights = None. Dont pass it - # since we use equal weights, anyway - - # KGF: former external Keras API returns the following - # print(type(self.model.uses_learning_phase)) - # - # print(self.model.uses_learning_phase) - # True - # "True if the layer has a different behavior in training mode and - # test mode" - - # No longer necessary to check backend-dependent flag (eliminated) - # https://stackoverflow.com/questions/52295852/what-is-uses-learning-phase-in-keras - # if self.model.uses_learning_phase: - # print(type(K.learning_phase())) - # - # print(K.learning_phase()) - # Tensor("keras_learning_phase:0", shape=(), dtype=bool) - # KGF: This indicates that TensorFlow is set to training at this point, - # but we zip K.learning_phase() as the key with '1' as the value below - # when building our feed_dict, which indicates testing (appropriate for - # this TensorBoard evaluation of the validation set accuracy - tensors += [K.learning_phase()] - - self.sess = K.get_session() - - for val_data in val_generator: - batch_val = [] - # sh = val_data[0].shape[0] - # - # 3x numpy arrays matching input, targets, sample_weights tensors - # + 1x bool flag if any layer in model takes a train vs. test flag - batch_val.append(val_data[0]) - batch_val.append(val_data[1]) - # batch_val.append(np.ones(sh)) # equal weights - - # TODO(KGF): confirm that this flag check can be skipped. See above - # if self.model.uses_learning_phase: - batch_val.append(1) - # Things may break if there is no layer in model that uses this flg - # E.g. if all Dropout, BatchNorm layers are missing - - feed_dict = dict(zip(tensors, batch_val)) - result = self.sess.run([self.merged], feed_dict=feed_dict) - summary_str = result[0] - self.writer.add_summary(summary_str, int(round(epoch))) - val_steps -= 1 - if val_steps <= 0: - break - def on_train_end(self): self.writer.close() diff --git a/plasma/primitives/data.py b/plasma/primitives/data.py index 91a2aed0..ec80eb72 100644 --- a/plasma/primitives/data.py +++ b/plasma/primitives/data.py @@ -3,11 +3,13 @@ import sys import os import re +import h5py from scipy.interpolate import UnivariateSpline from plasma.utils.processing import get_individual_shot_file from plasma.utils.downloading import get_missing_value_array from plasma.utils.hashing import myhash +from plasma.utils.ECEI import ECEI # class SignalCollection: # """GA Data Obj""" @@ -22,7 +24,33 @@ pass +############################################################################### +# Parent Signal Class +############################################################################### class Signal(object): + """ + A Signal object is a wrapper for a single signal. It is used to fetch data + remotely and load locally stored data, as well as to return useful + information about the data which is used during processing and model + training. + + Attributes: + description: str, full name of signal, e.g. "Plasma density" + paths: list of str, MDSplus point-names, must correspond in index to + the Machine objects in self.machines + machines: list of Machine objects that the signal is defined on. + causal_shifts: list of floats; the causal shift needed to be sure the + signal is not utilizing future data. + is_ip: bool, True if signal is plasma current + num_channels: int, number of data collection channels. Used in profile + signals and two dimensional signals + normalize: bool, True if signal is to be normalized (?) + data_avail_tolerances: list of floats, value in s of the maximum + allowable time between cessation of data + collection and t_disrupt for each machine + is_strictly_positive: bool, True if signal is strictly positive + mapping_paths: list of str, MDSplus mapping paths + """ def __init__(self, description, paths, machines, tex_label=None, causal_shifts=None, is_ip=False, normalize=True, data_avail_tolerances=None, is_strictly_positive=False, @@ -30,10 +58,12 @@ def __init__(self, description, paths, machines, tex_label=None, assert len(paths) == len(machines) self.description = description self.paths = paths - self.machines = machines # on which machines is the signal defined + self.machines = machines if causal_shifts is None: causal_shifts = [0 for m in machines] - self.causal_shifts = causal_shifts # causal shift in ms + self.causal_shifts = causal_shifts # causal shift in ms -> NOTE(JAR): + # the causal shifts appear to be supplied + # in s in signals.py, NOT ms self.is_ip = is_ip self.num_channels = 1 self.normalize = normalize @@ -213,6 +243,9 @@ def __repr__(self): return self.description +############################################################################### +# Profile (1D) Signal Class +############################################################################### class ProfileSignal(Signal): def __init__(self, description, paths, machines, tex_label=None, causal_shifts=None, mapping_range=(0, 1), num_channels=32, @@ -309,6 +342,9 @@ def fetch_data(self, machine, shot_num, c): return time, data, mapping, success +############################################################################### +# Channel Signal Class +############################################################################### class ChannelSignal(Signal): def __init__(self, description, paths, machines, tex_label=None, causal_shifts=None, data_avail_tolerances=None, @@ -369,6 +405,217 @@ def get_file_path(self, prepath, machine, shot_number): raw_signal=True) +############################################################################### +# 2-Dimensional Signal Class +############################################################################### +class Signal2D(Signal): + """ + Signal2D is a signal class specifically tailored for two-dimensional + signals, such as ECEi data + + Non-inherited Attributes: + dims: tuple of ints, dimensions of 2d signal; ((20, 8) for ECEi) + is_ecei: bool, True if data is ECEi data + miss_chan_threshold: int, number of channels that can be + missing in order for a shot to be included + """ + def __init__(self, description, paths, machines, dims, is_ecei = False, + miss_chan_threshold = 80, tex_label=None, causal_shifts=None, + is_ip=False, normalize=True, data_avail_tolerances=None, + is_strictly_positive=False, mapping_paths=None): + super(Signal2D, self).__init__( + description, paths, machines, + tex_label=tex_label, causal_shifts=causal_shifts, + is_ip=False, normalize=normalize, + data_avail_tolerances=data_avail_tolerances, + is_strictly_positive=is_strictly_positive, + mapping_paths=mapping_paths) + self.dims = dims + self.num_channels = dims[0]*dims[1] + self.is_ecei = is_ecei + self.miss_chan_threshold = miss_chan_threshold + + + def get_file_path(self, prepath, machine, shot_number): + """ + Returns file path. + + Args: + prepath: str, file prepath + machine: Machine object, machine that signal is defined on + shot_number: int, shot number + """ + signal_dirname = self.get_path(machine) + dirname = os.path.join(prepath, machine.name, signal_dirname) + if self.is_ecei: + return get_individual_shot_file(dirname, machine.name, shot_number,\ + raw_signal=True, ext = '.hdf5') + else: + return get_individual_shot_file(dirname, machine.name, shot_number,\ + raw_signal=True) + + + def load_data_from_hdf5_safe(self, prepath, shot): + """ + Loads 2D data from hdf5 file where each database in the file contains + data from a single channel. Stacks data into 2D numpy array with shape + (time_steps, num_channels+1) (where time series is first column) and + returns it. Pads missing channel data with 0's up to the missing channel + threshold. + + Args: + prepath: str, location of data + shot: Shot object for shot number of interest + """ + file_path = self.get_file_path(prepath, shot.machine, shot.number) + if not self.is_saved(prepath, shot): + print('Signal {}, shot {} was never downloaded at {} [omit]'.format( + self.description, shot.number, file_path)) + return None, False + + if os.path.getsize(file_path) == 0: + print('Signal {}, shot {} '.format(self.description, shot.number), + 'was downloaded incorrectly (empty file) [omit]') + os.remove(file_path) + return None, False + + if self.is_ecei: + try: + E = ECEI() + f = h5py.File(file_path, 'r') + miss_count = 0 + missing = [] + for key in f.keys(): + if key.startswith('missing'): + miss_count += 1 + missing.append(key) + if miss_count == 160: + print('Signal {}, shot {} contains no data [omit]'.format( + self.description, shot.number)) + return None, False + if miss_count > self.miss_chan_threshold: + print('Signal {}, shot {} is missing too many channels \ + [omit]'.format(self.description, shot.number)) + return None, False + + no_time_series = True + idx = 0 + while no_time_series: + chan = E.ecei_channels[idx] + if chan not in missing: + data = (np.asarray(f.get(chan))[:,0])/1000 #units of raw data are ms + data = data.reshape((data.shape[0],1)) + no_time_series = False + idx += 1 + + for channel in E.ecei_channels: + if channel in missing: + chan = np.zeros((data.shape[0],1)) + data = np.append(data, chan, axis = 1) + else: + chan = np.asarray(f.get(channel)) + data = np.append(data, chan[:,1].reshape((chan.shape[0],1)),\ + axis = 1) + except Exception as e: + print(e) + print('Cannot load signal {} shot {} [omit]'.format( + file_path, shot.number)) + os.remove(file_path) + return None, False + assert data.shape[1] == 161 + + else: + print('Non-ECEi 2D hdf5 data not yet supported.') + return None, False + + return data, True + + + def load_data_from_txt_safe(self, prepath, shot, dtype='float32'): + file_path = self.get_file_path(prepath, shot.machine, shot.number) + if not self.is_saved(prepath, shot): + print('Signal {}, shot {} was never downloaded [omit]'.format( + self.description, shot.number)) + return None, False + + if os.path.getsize(file_path) == 0: + print('Signal {}, shot {} '.format(self.description, shot.number), + 'was downloaded incorrectly (empty file) [omit]') + os.remove(file_path) + return None, False + try: + data = np.loadtxt(file_path, dtype=dtype) + if np.all(data == get_missing_value_array()): + print('Signal {}, shot {} contains no data [omit]'.format( + self.description, shot.number)) + return None, False + except Exception as e: + print(e) + print('Cannot load signal {} shot {} [omit]'.format( + file_path, shot.number)) + os.remove(file_path) + return None, False + + return data, True + + def load_data(self, prepath, shot, dtype='float32'): + if self.is_ecei: + data, succ = self.load_data_from_hdf5_safe(prepath, shot) + else: + data, succ = self.load_data_from_txt_safe(prepath, shot) + + if not succ: + return None, None, False + + if np.ndim(data) == 1: + data = np.expand_dims(data, axis=0) + + t = data[:, 0] + sig = data[:, 1:] + + # make sure shot is not garbage data + if len(t) <= 1 or (np.max(sig) == 0.0 and np.min(sig) == 0.0): + if self.is_ip: + print('Shot {} has no current [omit]'.format(shot.number)) + else: + print('Signal {}, shot {} contains no data [omit]'.format( + self.description, shot.number)) + return None, sig.shape, False + + # make sure data doesn't contain NaN values + if np.any(np.isnan(t)) or np.any(np.isnan(sig)): + print('Signal {}, shot {} contains NaN [omit]'.format( + self.description, shot.number)) + return None, sig.shape, False + + return t, sig, True + + def fetch_data_basic(self, machine, shot_num, c, path=None): + success = False + if self.is_ecei: + E = ECEI() + time, data, mapping, success = E.Fetch_Shot(shot_num) + else: + if path is None: + path = self.get_path(machine) + mapping = None + try: + time, data, mapping, success = machine.fetch_data_fn( + path, shot_num, c) + except Exception as e: + print(e) + sys.stdout.flush() + + if not success: + return None, None, None, False + + time = np.array(time) + 1e-3*self.get_causal_shift(machine) + return time, np.array(data), mapping, success + + def fetch_data(self, machine, shot_num, c): + return self.fetch_data_basic(machine, shot_num, c) + + class Machine(object): def __init__(self, name, server, fetch_data_fn, max_cores=8, current_threshold=0): diff --git a/plasma/primitives/shots.py b/plasma/primitives/shots.py index 221e7d72..2a9bcba5 100644 --- a/plasma/primitives/shots.py +++ b/plasma/primitives/shots.py @@ -408,8 +408,8 @@ def get_signals_and_times_from_file(self, conf): for prepath in signal_prepath: t, sig, valid_signal = signal.load_data( prepath, self, conf['data']['floatx']) - if valid_signal: - break + if valid_signal: + break else: t, sig, valid_signal = signal.load_data( signal_prepath, self, conf['data']['floatx']) diff --git a/plasma/utils/ECEI.py b/plasma/utils/ECEI.py new file mode 100644 index 00000000..5916646f --- /dev/null +++ b/plasma/utils/ECEI.py @@ -0,0 +1,1040 @@ +""" +The module composed in this file is designed to handle the processing/handling +and incorporation of electron cyclotron emission imaging data into the FRNN +disruption prediction software suite. It contains snippets from the rest of +the FRNN codebase, and therefore is partially redundant, particularly in the +utility functions at the top of the file. +Jesse A Rodriguez, 06/28/2021 +""" + +import numpy as np +import matplotlib as mpl +#mpl.rcParams['figure.dpi']=10 +import matplotlib.pyplot as plt +#plt.rc('font', family='tahoma') +#font = 1 +#plt.rc('xtick', labelsize=font) +#plt.rc('ytick', labelsize=font) +import time +import sys +import os +import multiprocessing as mp +from functools import partial +import h5py +import scipy.signal +import math +try: + import MDSplus as MDS +except ImportError: + pass + +################################################################################ +## Utility Functions and Globals +################################################################################ +def Fetch_ECEI_d3d(channel_path, shot_number, c = None, verbose = False): + """ + Basic fetch ecei data function, uses MDSplus Connection objects and looks + for data in all the locations we know of. + + Args: + channel_path: str, path to save .txt file (channel folder, format + "LFSxxxx") + shot_number: int, DIII-D shot number + c: MDSplus.Connection object. None by default + verbose: bool, suppress print statements + """ + channel = channel_path + shot = str(int(shot_number)) + mds_fail_pd = False + mds_fail_pd2 = False + mds_fail_p = False + mds_fail_t = False + + #ptdata2 method (seems to be most reliable) + try: + x_pd2 = c.get('dim_of(_s = ptdata2('+channel+','+shot+'))') + y_pd2 = c.get('_s = ptdata2('+channel+','+shot+')') + except Exception as e: + if verbose: + print(e) + mds_fail_pd2 = True + pass + if not mds_fail_pd2: + if x_pd2.shape[0] > 1: + print('Data exists for shot '+shot+' in channel '+channel[-5:-1]+'.') + return x_pd2, y_pd2, None, True + + #psuedo method + try: + x_p = c.get('dim_of(_s = psuedo('+channel+','+shot+'))') + y_p = c.get('_s = psuedo('+channel+','+shot+')') + except Exception as e: + if verbose: + print(e) + mds_fail_p = True + pass + if not mds_fail_p: + if x_p.shape[0] > 1: + print('Data exists for shot '+shot+' in channel '+channel[-5:-1]+'.') + return x_p, y_p, None, True + + #ptdata method + try: + x_pd = c.get('dim_of(_s = ptdata('+channel+','+shot+'))') + y_pd = c.get('_s = ptdata('+channel+','+shot+')') + except Exception as e: + if verbose: + print(e) + mds_fail_pd = True + pass + if not mds_fail_pd: + if x_pd.shape[0] > 1: + print('Data exists for shot '+shot+' in channel '+channel[-5:-1]+'.') + return x_pd, y_pd, None, True + + #tree method + try: + c.openTree(channel, shot) + x_t = c.get('dim_of(_s = '+shot+')').data() + y_t = c.get('_s = '+shot).data() + except Exception as e: + if verbose: + print(e) + mds_fail_t = True + pass + if not mds_fail_t: + if x_t.shape[0] > 1: + print('Data exists for shot '+shot+' in channel '+channel[-5:-1]+'.') + return x_t, y_t, None, True + + print('Data DOES NOT exist for shot '+shot+' in channel '+channel[-5:-1]+'.') + return None, None, None, False + + +def Download_Shot(shot_num_queue, c, n_shots, n_procs, channel_paths,\ + sentinel = -1, verbose = False, d_sample = 1,\ + try_again = False): + """ + Accepts a multiprocessor queue of shot numbers and downloads/saves data for + a single shot off the front of the queue. + + Args: + shot_num_queue: multiprocessing queue object containing shot numbers + c: MDSplus.Connection object + n_shots: int, total number of shots to be processed + n_proc: int, number of processes + channel_paths: list containing savepaths to channel folders + sentinel: sentinel value; -1 by default. Serves as the mechanism for + terminating the parallel program. + verbose: bool, suppress print statements + d_sample: int, downsample factor, MUST BE IN FORM 10^y + try_again: bool, tells script to try and download signals that were + found to be missing in a prior run. + """ + missing_shots = 0 + while True: + shot_num = shot_num_queue.get() + shots_left = shot_num_queue.qsize() - n_procs + shots_progress = 100*(n_shots - shots_left)/n_shots + shots_progress_next = 100*(n_shots + 1 - shots_left)/n_shots + if shot_num == sentinel: + break + shot_complete = True + chan_done = 0 + for channel_path in channel_paths: + save_path = channel_path[:-9]+'{}.hdf5'.format(int(shot_num)) + channel = channel_path[-9:] + + success = False + if os.path.isfile(save_path): + if os.path.getsize(save_path) > 0: + f = h5py.File(save_path, 'r') + for key in f.keys(): + if key == channel: + success = True + if key.startswith('missing') and key.endswith(channel)\ + and not try_again: + success = True + f.close() + else: + print('Shot {} '.format(int(shot_num)),'was downloaded \ + incorrectly (empty file). Redownloading.') + + else: + f = h5py.File(save_path, 'w') + f.close() + + if not success: + try: + try: + time, data, mapping, success = Fetch_ECEI_d3d(\ + channel_path[-9:], shot_num, c,\ + verbose) + except Exception as e: + print(e) + sys.stdout.flush() + print('Channel {}, shot {} missing, all mds commands \ + failed.'.format(channel_path[-5:-1], shot_num)) + success = False + + if success: + data_two_column = np.vstack((time, data)).transpose() + if d_sample > 1: + n = int(math.log10(d_sample)) + for _ in range(n): + data_two_column = scipy.signal.decimate(\ + data_two_column, 10, axis = 0) + f = h5py.File(save_path, 'r+') + for key in f.keys(): + if key.startswith('missing'): + if key[8:] == channel: + del f[key] + dset = f.create_dataset(channel, data = data_two_column) + f.close() + else: + f = h5py.File(save_path, 'r+') + dsetk = 'missing_'+channel + already = False + for key in f.keys(): + if key == dsetk: + f.close() + already = True + if not already: + dset = f.create_dataset(dsetk,\ + data = np.array([-1.0])) + f.close() + + except BaseException: + print('Could not save channel {}, shot {}.'.format(\ + channel_path[-5:-1], shot_num)) + print('Warning: Incomplete!!!') + raise + else: + print('Channel {}, shot {} '.format(channel_path[-5:-1],\ + int(shot_num)),'has already been downloaded.') + sys.stdout.flush() + if not success: + missing_shots += 1 + chan_done += 1 + shot_prog = chan_done/160 + overall_prog = shots_progress +\ + (shots_progress_next-shots_progress)*shot_prog + print('Approximate download progress: {:.2f}%.'\ + .format(overall_prog)) + + print('Finished with {} channel signals missing.'.format(missing_shots)) + return + + +def Download_Shot_List(shot_numbers, channel_paths, max_cores = 8,\ + server = 'atlas.gat.com', verbose = False,\ + d_sample = 1, try_again = False): + """ + Accepts list of shots and downloads them in parallel + + Args: + shot_numbers: list of integer shot numbers + channel_paths: list of channel save path folders + max_cores: int, max number of cores for parallelization + server: MDSplus server, str. D3D server by default + verbose: bool, suppress print statements + d_sample: int, downsample factor, MUST BE IN FORM 10^y + try_again: bool, tells script to try and download signals that were + found to be missing in a prior run. + """ + sentinel = -1 + num_cores = min(mp.cpu_count(), max_cores) + fn = partial(Download_Shot, n_shots = len(shot_numbers),\ + n_procs = num_cores, channel_paths = channel_paths,\ + sentinel = sentinel, verbose = verbose,\ + d_sample = d_sample, try_again = try_again) + queue = mp.Queue() + assert len(shot_numbers) < 32000 + for shot_num in shot_numbers: + queue.put(shot_num) + for i in range(num_cores): + queue.put(sentinel) + + connections = [MDS.Connection(server) for _ in range(num_cores)] + processes = [mp.Process(target=fn, args = (queue, connections[i]))\ + for i in range(num_cores)] + print('Running in parallel on {} processes.'.format(num_cores)) + for p in processes: + p.start() + for p in processes: + p.join() + + +def Count_Missing(shot_list, shot_path, missing_path): + """ + Accepts a shot list and a path to the shot files and produces an up-to-date + list of all missing data and places it in missing_path. Automatically + called after a download operation. + + Args: + shot_list: 1-D numpy array of DIII-D shot numbers + shot_path: path to folder containing shot files + missing_path: folder for missing shot reports + """ + min_shot = np.argmin(shot_list) + max_shot = np.argmax(shot_list) + num_shots = np.shape(shot_list)[0]*160 + num_shots_miss = 0 + print("Generating report for {} shots between shots {} and {}".format(\ + np.shape(shot_list)[0], int(shot_list[min_shot]),\ + int(shot_list[max_shot]))) + report = open(missing_path+'/missing_report_'+str(int(shot_list[min_shot]))\ + +'-'+str(int(shot_list[max_shot]))+'.txt', mode = 'w',\ + encoding='utf-8') + report.write('Missing channel signals for download from shot {} to shot '\ + '{}:\n'.format(int(shot_list[min_shot]),\ + int(shot_list[max_shot]))) + for filename in os.listdir(shot_path): + if filename.endswith('hdf5'): + if int(filename[:-5]) >= shot_list[min_shot]\ + and int(filename[:-5]) <= shot_list[max_shot]: + f = h5py.File(shot_path+'/'+filename, 'r') + count = 0 + for key in f.keys(): + if key.startswith('missing'): + count += 1 + report.write('Channel '+key[-5:-1]+', shot #'+\ + filename[:-5]+'\n') + num_shots_miss +=1 + if count > 160: + print('Shot #'+filename[:-5]+' has more than 160 channels '\ + 'missing!') + + report.write('Missing channel signals for {} out of {} signals.'.\ + format(num_shots_miss, num_shots)) + report.close() + + return (num_shots_miss, num_shots) + + +############################################################################### +## ECEI Class +############################################################################### +class ECEI: + def __init__(self): + """ + Initialize ECEI object by creating an internal list of channel keys. + + Args: + """ + self.ecei_channels = [] + for i in range(20): + for j in range(8): + self.ecei_channels.append('"LFS{:02d}{:02d}"'.format(i+3,j+1)) + + ########################################################################### + ## Data Processing + ########################################################################### + def Generate_Missing_Report(self, shots, shot_1, clear_file, disrupt_file,\ + save_path = os.getcwd()): + """ + Accept a start shot and a number of clear shots and generate a verbose + missing shot report for all shots in that range of the shot list files. + + Args: + shots: int, number of non-disruptive shots you want to download + shot_1: int, the shot number you want to start with + clear_file: The path to the clear shot list + disrupt_file: The path to the disruptive shot list + save_path: location where the shot hdf5 files will be stored, + current directory by default + """ + clear_shots = np.loadtxt(clear_file) + disrupt_shots = np.loadtxt(disrupt_file) + + first_c = False + first_d = False + i = 0 + while not first_c: + if clear_shots[i,0] >= shot_1: + start_c = i + first_c = True + i += 1 + i = 0 + while not first_d: + if disrupt_shots[i,0] >= shot_1: + start_d = i + first_d = True + i += 1 + + if start_c + shots > clear_shots.shape[0]-1: + shots = clear_shots.shape[0] - start_c - 1 + + shot_list = np.array([clear_shots[start_c,0]]) + for i in range(shots-1): + shot_list = np.append(shot_list, [clear_shots[i+start_c+1,0]]) + + last = False + no_disrupt = False + i = start_d + while not last: + if disrupt_shots[i,0] >= clear_shots[start_c+shots-1,0]: + end_d = i + last = True + i += 1 + if i >= disrupt_shots.shape[0]: + no_disrupt = True + last = True + + if not no_disrupt: + for i in range(end_d - start_d + 1): + shot_list = np.append(shot_list, [disrupt_shots[i+start_d,0]]) + + channel_paths = [] + for i in range(len(self.ecei_channels)): + channel_path = os.path.join(save_path, self.ecei_channels[i]) + channel_paths.append(channel_path) + #Missing shots directory + missing_path = os.path.join(save_path, 'missing_shot_info') + if not os.path.exists(missing_path): + os.mkdir(missing_path) + + missed = Count_Missing(shot_list, save_path, missing_path) + + return + + + def Clean_Signals(self, save_path = os.getcwd()): + """ + Removes all signal files in the save_path directory. + """ + check = input("WARNING: this function will delete ALL signal files in \ + the "+"designated save path. Type 'yes' to continue, anything \ + else to cancel.\n") + if check == 'yes': + for filename in os.listdir(save_path): + if filename.endswith('hdf5'): + shot = os.path.join(save_path, filename) + os.remove(shot) + + + def Clean_Missing_Signals(self, missing_path, save_path = os.getcwd()): + """ + Removes all signal files with all channels missing in the save_path + directory. + """ + check = input("WARNING: this function will delete ALL signal files in "\ + "the designated save path which have all channel signals"\ + " missing. Type 'yes' to continue, anything else to "\ + "cancel.\n") + report = open(missing_path+'/AllChannelsMissing.txt', mode = 'w',\ + encoding='utf-8') + if check == 'yes': + for filename in os.listdir(save_path): + if filename.endswith('hdf5'): + shot = os.path.join(save_path, filename) + f = h5py.File(shot, 'r') + count = 0 + for key in f.keys(): + if key.startswith('missing'): + count += 1 + if count == 160: + f.close() + if os.path.getsize(shot) <= 342289: + report.write(filename[:-5]+"\n") + os.remove(shot) + else: + f.close() + + report.close() + + + + def Clean_Missing_Signal(self, shot_file): + """ + Removes a single shot file if it has all channel signals missing. + """ + shot = os.path.join(os.getcwd(), shot_file) + f = h5py.File(shot, 'r') + count = 0 + for key in f.keys(): + if key.startswith('missing'): + count += 1 + if count == 160: + f.close() + check = input("You are about to delete "+shot+". Are "+\ + "you sure about that?\n") + if check == 'yes': + os.remove(shot) + else: + f.close() + + + def Clean_Missing_Signal_List(self, shots): + """ + Removes shot files in a list if they have all channel signals missing. + """ + for s in shots: + shot_file = str(s)+".hdf5" + shot = os.path.join(os.getcwd(), shot_file) + f = h5py.File(shot, 'r') + count = 0 + for key in f.keys(): + if key.startswith('missing'): + count += 1 + if count == 160: + f.close() + check = input("You are about to delete "+shot+". Are "+\ + "you sure about that?\n") + if check == 'yes': + os.remove(shot) + else: + f.close() + + + def Generate_Missing_Report_Concise(self, todays_date,\ + data_path = os.getcwd(), output_path = os.getcwd()): + """ + Creates a report of missing data in a more readable format. + + Args: + todays_date: str, todays date in a readable, filename-friendly + format, like "MM-DD-YYYY" + data_path: str, path where data files are stored + output_path: str, path where the report will go + """ + # Collect necessary information. + shot_count = 0 + none_missing = 0 + all_missing = 0 + one_missing = 0 + eight_missing = 0 + sixteen_missing = 0 + sixteen_to_all_missing = 0 + one_to_sixteen_missing = 0 + missing_by_chan = {} + all_missing_list = [] + some_missing_list = [] + full_shot_list = [] + file_list = os.listdir(data_path) + num_shots = len(file_list) + print("Generating concise report for the {} shots in "\ + .format(int(num_shots))+data_path) + t_b = time.time() + for filename in file_list: + if filename.endswith('hdf5'): + f = h5py.File(data_path+'/'+filename, 'r') + miss_count = 0 + for key in f.keys(): + if key[-9:] not in missing_by_chan: + missing_by_chan[key[-9:]] = 0 + if key.startswith('missing'): + miss_count += 1 + missing_by_chan[key[-9:]] += 1 + if miss_count == 160: + all_missing += 1 + for key in f.keys(): + missing_by_chan[key[-9:]] -= 1 + all_missing_list.append(int(filename[:-5])) + elif miss_count == 1: + one_missing += 1 + elif miss_count == 8: + eight_missing += 1 + elif miss_count == 16: + sixteen_missing += 1 + elif miss_count > 0 and miss_count <= 16: + one_to_sixteen_missing += 1 + some_missing_list.append(int(filename[:-5])) + elif miss_count > 16 and miss_count < 160: + sixteen_to_all_missing += 1 + some_missing_list.append(int(filename[:-5])) + elif miss_count == 0: + none_missing += 1 + full_shot_list.append(int(filename[:-5])) + shot_count += 1 + f.close() + if shot_count%10 == 0: + print("{:.2f}% of the way through collecting missing shot "\ + "info.".format(shot_count/num_shots*100)) + + t_e = time.time() + T = t_e-t_b + + print("Finished collecting info in {} seconds.".format(T)) + + # Write report + report = open(output_path+'/missing_signal_report_'+todays_date+'.txt',\ + 'w') + report.write('This missing shot report was generated using the \ + contents of '+output_path+' on '+todays_date+'.\n\n') + report.write('Number of shots with NO channels missing: {}\n'.format(\ + int(none_missing))) + report.write('Number of shots with ALL channels missing: {}\n'.format(\ + int(all_missing))) + report.write('Number of shots with just one channel missing: {}\n'\ + .format(int(one_missing))) + report.write('Number of shots with 8 channels missing: {}\n'.format(\ + int(eight_missing))) + report.write('Number of shots with 16 channels missing: {}\n'.format(\ + int(sixteen_missing))) + report.write('Number of shots with 2 to 15 channels missing: {}\n'\ + .format(int(one_to_sixteen_missing))) + report.write('Number of shots with 17 to 159 channels missing: {}\n\n'\ + .format(int(sixteen_to_all_missing))) + report.write('Missing signal distribution by channel in shots with '+\ + 'fewer than 160 channels missing:\n') + missing_chan_tot = 0 + most_miss = 0 + for key in missing_by_chan: + missing_chan_tot += missing_by_chan[key] + if missing_by_chan[key] > most_miss: + most_miss = missing_by_chan[key] + + for i in range(20): + for j in range(8): + key = '"LFS{:02d}{:02d}"'.format(i+3, j+1) + bar_length = int(missing_by_chan[key]/most_miss*50) + bar = '█'*bar_length + report.write('Channel {:02d}{:02d}: '.format(i+3, j+1)+\ + str(int(missing_by_chan[key]))+' | '+bar+'\n') + + report.close() + + all_missing_list = np.sort(all_missing_list) + some_missing_list = np.sort(some_missing_list) + full_shot_list = np.sort(full_shot_list) + + np.savetxt(output_path+'/all_channels_missing_list.txt',\ + all_missing_list, fmt='%i') + np.savetxt(output_path+'/some_channels_missing_list.txt',\ + some_missing_list, fmt='%i') + np.savetxt(output_path+'/no_channels_missing_list.txt', full_shot_list,\ + fmt='%i') + + + def Generate_Quality_Report(self, todays_date, data_path, disrupt_list,\ + shots_of_interest, shotlist_name, output_path =\ + os.getcwd()): + """ + Create a report that checks shots in shots_of_interest for NaNs, as + well as for cases in which the data collection ceases before t_disrupt. + If the shots are missing channels, the report will give the number of + channels missed per shot. + + Args: + todays_date: str, todays date in a readable, filename-friendly + format, like "MM-DD-YYYY" + data_path: str, path where data files are stored. + disrupt_list: numpy array, shot list that contains the disruptive + shots of interest. + shots_of_interest: numpy array, shot list that contains the shots + you would like to check. + shotlist_name: str, name describing the shotlist of interest. + output_path: str, path where the report will go. + """ + print("Generating a data quality report for {} shots of interest in "\ + .format(int(len(shots_of_interest)))+data_path) + t_b = time.time() + + contains_NaN = {} + ends_before_t_disrupt = {} + missing_chans = {} + low_std_dev = {} + + count = 0 + files = os.listdir(data_path) + num_files = len(files) + for filename in files: + if filename.endswith('hdf5'): + count += 1 + shot_no = int(filename[:-5]) + if shot_no in shots_of_interest: + f = h5py.File(data_path+'/'+filename, 'r') + keys = f.keys() + # First we check for NaNs + for key in keys: + data = np.asarray(f.get(key)) + if np.any(np.isnan(data)): + if shot_no not in contains_NaN: + contains_NaN[shot_no] = [] + contains_NaN[shot_no].append(key[-5:-1]) + # Check to make sure 'something' happens + if not key.startswith('missing'): + sig = np.sqrt(np.var(data[:,1])) + if sig < 0.001: + if shot_no not in low_std_dev: + low_std_dev[shot_no] = [] + low_std_dev[shot_no].append(key[-5:-1]) + # Next, missing channels + if key.startswith('missing'): + if shot_no not in missing_chans: + missing_chans[shot_no] = [] + missing_chans[shot_no].append(key[-5:-1]) + # Now we check if data is collected up to t_disrupt + if shot_no in disrupt_list[:,0] and\ + not key.startswith('missing'): + i_disrupt=np.where(disrupt_list[:,0]==shot_no)[0][0] + t_max = np.max(data[:,0]) + t_disrupt = disrupt_list[i_disrupt,1] + if t_max < t_disrupt: + if shot_no not in ends_before_t_disrupt: + ends_before_t_disrupt[shot_no] = [] + ends_before_t_disrupt[shot_no].append(key[-5:-1]) + print("{:2f}% of the way through shot files"\ + .format(count/num_files*100)) + f.close() + + t_e = time.time() + T = t_e-t_b + + print("Finished collecting info in {} seconds.".format(T)) + + # Write report + report = open(output_path+'/data_quality_report_'+todays_date+'.txt',\ + 'w') + report.write('This data quality report was generated using the '\ + 'contents of '+output_path+'\non '+todays_date+', using '\ + 'a shotlist named "'+shotlist_name+'".\n\n') + + report.write('Number of shots with NaNs present: {}\n'.format(\ + int(len(contains_NaN)))) + if len(contains_NaN) > 0: + for shot in contains_NaN: + report.write('Shot {} Contains NaNs in the following '\ + 'channels:\n'.format(shot)) + count = 0 + for i in range(len(contains_NaN[shot])): + count += 1 + if count%10 == 0: + report.write(contains_NaN[shot][i]+',\n') + else: + report.write(contains_NaN[shot][i]+', ') + report.write('\n') + + report.write('_'*80) + report.write('\n\n') + report.write('Number of shots that cease data collection before '\ + 't_disrupt: {}\n'.format(int(len(ends_before_t_disrupt)))) + if len(ends_before_t_disrupt) > 0: + for shot in ends_before_t_disrupt: + report.write('Shot {} stops short of t_disrupt in the '\ + 'following channels:\n'.format(shot)) + count = 0 + for i in range(len(ends_before_t_disrupt[shot])): + count += 1 + if count%10 == 0: + report.write(ends_before_t_disrupt[shot][i]+',\n') + else: + report.write(ends_before_t_disrupt[shot][i]+', ') + report.write('\n') + + report.write('_'*80) + report.write('\n\n') + report.write('Number of shots that have a standard deviation which is '\ + 'smaller than 1 mV: {}\n'.format(int(len(low_std_dev)))) + if len(low_std_dev) > 0: + for shot in low_std_dev: + report.write('Shot {} has a std. dev less than 1 mV in the '\ + 'following channels:\n'.format(shot)) + count = 0 + for i in range(len(low_std_dev[shot])): + count += 1 + if count%10 == 0: + report.write(low_std_dev[shot][i]+',\n') + else: + report.write(low_std_dev[shot][i]+', ') + report.write('\n') + + report.write('_'*80) + report.write('\n\n') + report.write('Number of shots with missing channels: {}\n'.format(\ + int(len(missing_chans)))) + if len(missing_chans) > 0: + for shot in missing_chans: + report.write('Shot {} is missing data in the following '\ + 'channels:\n'.format(shot)) + count = 0 + for i in range(len(missing_chans[shot])): + count += 1 + if count%10 == 0: + report.write(missing_chans[shot][i]+',\n') + else: + report.write(missing_chans[shot][i]+', ') + report.write('\n') + + report.close() + + + ########################################################################### + ## Visualization + ########################################################################### + def Single_Shot_Plot(self, shot, data_path, save_dir = os.getcwd(),\ + show = False): + """ + Plot voltage traces for a single shot, saves plot as a .pdf + + Args: + shot: int, shot number + data_path: str, path to ECEI data + save_dir: str, directory for output plot image + shot: bool, determines whether output is shown right away + """ + shot_file = data_path+'/'+str(int(shot))+'.hdf5' + f = h5py.File(shot_file, 'r') + fig = plt.figure() + gs = fig.add_gridspec(4, 5, hspace=0.35, wspace=0) + ax = gs.subplots(sharex='col') + count = 0 + plot_no = 0 + for channel in self.ecei_channels: + count += 1 + row = plot_no//5 + col = plot_no%5 + if channel in f.keys(): + data = f.get(channel) + ax[row,col].plot(data[:,0], data[:,1],\ + label = 'YY = '+channel[-3:-1],\ + linewidth = 0.4, alpha = 0.8) + if count%8 == 0: + plot_no += 1 + XX = count//8 + 2 + title = 'XX = {:02d}'.format(XX) + ax[row,col].set_title(title, fontsize = 5) + #ax[row,col].legend(prop={'size': 2.75}) + ax[row,col].tick_params(width = 0.3) + + fig.suptitle('Shot #{}'.format(int(shot)), fontsize = 10) + for axs in ax.flat: + axs.set_xlabel('Time (ms)', fontsize = 5) + axs.set_ylabel('ECEi Voltage (V)', fontsize = 5) + + # Hide x labels and tick labels for top plots and y ticks for right plots. + for axs in ax.flat: + axs.label_outer() + axs.tick_params(axis='x', labelsize = 5) + axs.tick_params(axis='y', labelsize = 5) + + labels = [] + for i in range(8): + labels.append('YY = {:2d}'.format(i+1)) + fig.legend(labels=labels, loc="lower center", ncol=8, prop={'size': 5.5}) + + if show: + fig.show() + + fig.savefig(save_dir+'/Shot_{}.pdf'.format(int(shot))) + + + def Generate_Txt(self, shot, channel, save_dir = os.getcwd()): + """ + Get a .txt file out for signal data in a readable format for a single + channel. + + Args: + shot: int, shot number + channel: str, format "XXYY", 03<=XX<=22, 01<=YY<=08, designates + channel + save_dir: str, directory where shot files are stored + """ + shot_s = str(shot) + f = h5py.File(save_dir+'/'+shot_s+'.hdf5', 'r') + data = np.asarray(f.get('"LFS'+channel+'"')) + np.savetxt(save_dir+'/'+shot_s+'_chan'+channel+'.txt', data) + f.close() + + return + + + def Generate_Txt_Interactive(self, save_dir = os.getcwd()): + """ + Get a .txt file out for reading signal data, accepts input from command + line. + + Args: + save_dir: str, directory where shot files are stored + """ + shot = int(input("Which shot? Enter an integer.\n")) + channel = input("Which channel? format 'XXYY', 03<=XX<=22, 01<=YY<=08"\ + ".\n") + + self.Generate_Txt(shot, channel, save_dir) + + return + + + ########################################################################### + ## Data Acquisition + ########################################################################### + def Acquire_Shots_D3D(self, shot_numbers, save_path = os.getcwd(),\ + max_cores = 8, verbose = False, chan_lowlim = 3,\ + chan_uplim = 22, d_sample = 1, try_again = False): + """ + Accepts a list of shot numbers and downloads the data, saving them into + folders corresponding to the individual channels. Returns nothing. + Shots are saved in hdf5 format, downsampling is done BEFORE saving. + Each channel is labelled within its own dataset in the hdf5 file, where + the label is the channel name/MDS point name, e.g. '"LFSXXYY"'. If data + is not found, labels are 'missing_"LFSXXYY"' with [-1.0] as the dataset. + + Args: + shot_numbers: 1-D numpy array of integers, DIII-D shot numbers + save_path: location where the channel folders will be stored, + current directory by default + max_cores: int, max # of cores to carry out download tasks + verbose: bool, suppress most print statements + chan_lowlim: int, lower limit of subset of channels to download + chan_uplim: int, upper limit of subset of channels to download + d_sample: int, downsample factor, MUST BE IN FORM 10^y + try_again: bool, tells script to try and download signals that were + found to be missing in a prior run. + """ + t_b = time.time() + # Construct channel save paths and create them if needed. + channel_paths = [] + for i in range(len(self.ecei_channels)): + XX = int(self.ecei_channels[i][-5:-3]) + if XX >= chan_lowlim and XX <= chan_uplim: + channel_path = os.path.join(save_path, self.ecei_channels[i]) + channel_paths.append(channel_path) + #Missing shots directory + missing_path = os.path.join(save_path, 'missing_shot_info') + if not os.path.exists(missing_path): + os.mkdir(missing_path) + + try: + c = MDS.Connection('atlas.gat.com') + except Exception as e: + print(e) + return False + + Download_Shot_List(shot_numbers, channel_paths, max_cores = max_cores,\ + server = 'atlas.gat.com', verbose = verbose,\ + d_sample = d_sample, try_again = try_again) + + missed = Count_Missing(shot_numbers, save_path, missing_path) + + t_e = time.time() + T = t_e-t_b + + print("Downloaded {} out of {} signals in {} seconds."\ + .format(missed[1]-missed[0], missed[1], T)) + + return + + + def Acquire_Shot_Sequence_D3D(self, shots, shot_1, clear_file,\ + disrupt_file, save_path = os.getcwd(),\ + max_cores = 8, verbose = False,\ + chan_lowlim = 3, chan_uplim = 22,\ + d_sample = 1, try_again = False): + """ + Accepts a desired number of non-disruptive shots, then downloads all + shots in our labelled database up to the last non-disruptive shot. + Returns nothing. Shots are saved in hdf5 format, downsampling is done + BEFORE saving. Each channel is labelled within its own dataset in the + hdf5 file, where the label is the channel name/MDS point name, e.g. + '"LFSXXYY"'. If data is not found, labels are 'missing_"LFSXXYY"' with + [-1.0] as the dataset. + + Args: + shots: int, number of non-disruptive shots you want to download + shot_1: int, the shot number you want to start with + clear_file: The path to the clear shot list + disrupt_file: The path to the disruptive shot list + save_path: location where the channel folders will be stored, + current directory by default + max_cores: int, max # of cores to carry out download tasks + verbose: bool, suppress some exception info + chan_lowlim: int, lower limit of subset of channels to download + chan_uplim: int, upper limit of subset of channels to download + d_sample: int, downsample factor, MUST BE IN FORM 10^y + try_again: bool, tells script to try and download signals that were + found to be missing in a prior run. + """ + clear_shots = np.loadtxt(clear_file) + disrupt_shots = np.loadtxt(disrupt_file) + + first_c = False + first_d = False + i = 0 + while not first_c: + if clear_shots[i,0] >= shot_1: + start_c = i + first_c = True + i += 1 + i = 0 + while not first_d: + if disrupt_shots[i,0] >= shot_1: + start_d = i + first_d = True + i += 1 + + if start_c + shots > clear_shots.shape[0]-1: + shots = clear_shots.shape[0] - start_c - 1 + + shot_list = np.array([clear_shots[start_c,0]]) + for i in range(shots-1): + shot_list = np.append(shot_list, [clear_shots[i+start_c+1,0]]) + + last = False + no_disrupt = False + i = start_d + while not last: + if disrupt_shots[i,0] >= clear_shots[start_c+shots-1,0]: + end_d = i + last = True + i += 1 + if i >= disrupt_shots.shape[0]: + no_disrupt = True + last = True + + if not no_disrupt: + for i in range(end_d - start_d + 1): + shot_list = np.append(shot_list, [disrupt_shots[i+start_d,0]]) + + self.Acquire_Shots_D3D(shot_list, save_path, max_cores, verbose,\ + chan_lowlim, chan_uplim, d_sample, try_again) + + return + + + def Fetch_Shot(self, shot_number, verbose = False): + """ + Fetch shot data from MDSplus server directly as numpy arrays. Returns + a 1D time array and a 2D data array with shape (t_steps, 160), where + each column is the signal data from one channel, along with None in + place of a mapping and True to indicate success. Missing channels are + padded with zeros. + + Args: + shot_number: int, shot number + verbose: bool, determines if MDS exceptions are printed + """ + no_time = True + idx = 0 + while no_time: + t, d, mapping, success = Fetch_ECEI_d3d(self.ecei_channels[idx],\ + shot_number, verbose = verbose) + if success: + time = np.asarray(t) + no_time = False + idx += 1 + if idx >= 160: + return None, None, None, False + + no_data = True + for channel in self.ecei_channels: + t, d, mapping, success = Fetch_ECEI_d3d(channel, shot_number,\ + verbose = verbose) + if success: + if no_data: + data = np.asarray(d).reshape((time.shape[0],1)) + no_data = False + else: + data = np.append(data, d, axis = 1) + else: + if no_data: + data = np.zeros((time.shape[0],1)) + no_data = False + else: + d = np.zeros((time.shape[0],1)) + data = np.append(data, d, axis = 1) + + return time, data, None, True + + diff --git a/plasma/utils/hashing.py b/plasma/utils/hashing.py index 6628cc50..5903999b 100644 --- a/plasma/utils/hashing.py +++ b/plasma/utils/hashing.py @@ -67,6 +67,23 @@ def myhash_obj(x): Serialize a generic Python object using dill, decode the bytes obj, then pass the Unicode string to the particular hash function. ''' + + # KGF: Python 3.8 made Pickle serialization protocol version 4 the default + # Dill (v0.3.3) wraps Pickle, and Pickle now returns an invalid utf-8 + # escape code when serializing the conf dictionary and nested objs + # Works totally fine in Python 3.7 with protocol=3 + # See PEP 3154 + + # protocol=0 in ANSI readable, and I suspect that protocol=3 produces valid utf-8, + # but I can't find any documentation of that. + # https://stackoverflow.com/questions/30469575/how-to-pickle-and-unpickle-to-portable-string-in-python-3 + # "pickle.dumps() produces a bytes object. Expecting these arbitrary bytes to be valid + # UTF-8 text (the assumption you are making by trying to decode it to a string from + # UTF-8) is pretty optimistic." + + # return myhash(pickle.dumps(x).decode('unicode_escape')) + # return myhash(dill.dumps(x).decode('raw_unicode_escape')) + return myhash(dill.dumps(x, protocol=3).decode('unicode_escape'))