Source code for openmdao.recorders.sqlite_recorder

"""
Class definition for SqliteRecorder, which provides dictionary backed by SQLite.
"""

from copy import deepcopy
from io import BytesIO
from collections import OrderedDict

import os
import sqlite3
from itertools import chain

import json
import numpy as np

from six.moves import cPickle as pickle
from six import iteritems

from openmdao.recorders.case_recorder import CaseRecorder
from openmdao.utils.mpi import MPI
from openmdao.utils.record_util import dict_to_structured_array
from openmdao.utils.options_dictionary import OptionsDictionary
from openmdao.utils.general_utils import simple_warning, make_serializable
from openmdao.core.driver import Driver
from openmdao.core.system import System
from openmdao.core.problem import Problem
from openmdao.solvers.solver import Solver


"""
SQL case database version history.
----------------------------------
5 -- OpenMDAO 2.5
     Added source column (driver name, system/solver pathname) to global iterations table.
4 -- OpenMDAO 2.4
     Added variable settings metadata that contains scaling info.
3 -- OpenMDAO 2.4
     Storing most data as JSON rather than binary numpy arrays.
2 -- OpenMDAO 2.4, merged 20 July 2018.
     Added support for recording derivatives from driver, resulting in a new table.
1 -- Through OpenMDAO 2.3
     Original implementation.
"""
format_version = 5


[docs]def array_to_blob(array): """ Make numpy array in to BLOB type. Convert a numpy array to something that can be written to a BLOB field in sqlite. TODO: move this to a util file? Parameters ---------- array : array The array that will be converted to a blob. Returns ------- blob : The blob created from the array. """ out = BytesIO() np.save(out, array) out.seek(0) return sqlite3.Binary(out.read())
[docs]def blob_to_array(blob): """ Convert sqlite BLOB to numpy array. TODO: move this to a util file? Parameters ---------- blob : blob The blob that will be converted to an array. Returns ------- array : The array created from the blob. """ out = BytesIO(blob) out.seek(0) return np.load(out, allow_pickle=True)
[docs]class SqliteRecorder(CaseRecorder): """ Recorder that saves cases in a sqlite db. Attributes ---------- _record_viewer_data : bool Flag indicating whether to record data needed to generate N2 diagram. connection : sqlite connection object Connection to the sqlite3 database. _abs2prom : {'input': dict, 'output': dict} Dictionary mapping absolute names to promoted names. _prom2abs : {'input': dict, 'output': dict} Dictionary mapping promoted names to absolute names. _abs2meta : {'name': {}} Dictionary mapping absolute variable names to their metadata including units, bounds, and scaling. _pickle_version : int The pickle protocol version to use when pickling metadata. _filepath : str Path to the recorder file. _database_initialized : bool Flag indicating whether or not the database has been initialized. _record_on_proc : bool Flag indicating whether to record on this processor when running in parallel. """
[docs] def __init__(self, filepath, append=False, pickle_version=2, record_viewer_data=True): """ Initialize the SqliteRecorder. Parameters ---------- filepath : str Path to the recorder file. append : bool, optional Optional. If True, append to an existing case recorder file. pickle_version : int, optional The pickle protocol version to use when pickling metadata. record_viewer_data : bool, optional If True, record data needed for visualization. """ if append: raise NotImplementedError("Append feature not implemented for SqliteRecorder") self.connection = None self._record_viewer_data = record_viewer_data self._abs2prom = {'input': {}, 'output': {}} self._prom2abs = {'input': {}, 'output': {}} self._abs2meta = {} self._pickle_version = pickle_version self._filepath = filepath self._database_initialized = False # default to record on all procs when running in parallel self._record_on_proc = True super(SqliteRecorder, self).__init__(record_viewer_data)
def _initialize_database(self): """ Initialize the database. """ if MPI: rank = MPI.COMM_WORLD.rank if self._parallel and self._record_on_proc: filepath = '%s_%d' % (self._filepath, rank) print("Note: SqliteRecorder is running on multiple processors. " "Cases from rank %d are being written to %s." % (rank, filepath)) elif rank == 0: filepath = self._filepath else: filepath = None else: filepath = self._filepath if filepath: try: os.remove(filepath) except OSError: pass self.connection = sqlite3.connect(filepath) with self.connection as c: c.execute("CREATE TABLE metadata(format_version INT, " "abs2prom TEXT, prom2abs TEXT, abs2meta TEXT, var_settings TEXT)") c.execute("INSERT INTO metadata(format_version, abs2prom, prom2abs) " "VALUES(?,?,?)", (format_version, None, None)) # used to keep track of the order of the case records across all case tables c.execute("CREATE TABLE global_iterations(id INTEGER PRIMARY KEY, " "record_type TEXT, rowid INT, source TEXT)") c.execute("CREATE TABLE driver_iterations(id INTEGER PRIMARY KEY, " "counter INT, iteration_coordinate TEXT, timestamp REAL, " "success INT, msg TEXT, inputs TEXT, outputs TEXT)") c.execute("CREATE TABLE driver_derivatives(id INTEGER PRIMARY KEY, " "counter INT, iteration_coordinate TEXT, timestamp REAL, " "success INT, msg TEXT, derivatives BLOB)") c.execute("CREATE INDEX driv_iter_ind on driver_iterations(iteration_coordinate)") c.execute("CREATE TABLE problem_cases(id INTEGER PRIMARY KEY, " "counter INT, case_name TEXT, timestamp REAL, " "success INT, msg TEXT, outputs TEXT)") c.execute("CREATE INDEX prob_name_ind on problem_cases(case_name)") c.execute("CREATE TABLE system_iterations(id INTEGER PRIMARY KEY, " "counter INT, iteration_coordinate TEXT, timestamp REAL, " "success INT, msg TEXT, inputs TEXT, outputs TEXT, residuals TEXT)") c.execute("CREATE INDEX sys_iter_ind on system_iterations(iteration_coordinate)") c.execute("CREATE TABLE solver_iterations(id INTEGER PRIMARY KEY, " "counter INT, iteration_coordinate TEXT, timestamp REAL, " "success INT, msg TEXT, abs_err REAL, rel_err REAL, " "solver_inputs TEXT, solver_output TEXT, solver_residuals TEXT)") c.execute("CREATE INDEX solv_iter_ind on solver_iterations(iteration_coordinate)") c.execute("CREATE TABLE driver_metadata(id TEXT PRIMARY KEY, " "model_viewer_data TEXT)") c.execute("CREATE TABLE system_metadata(id TEXT PRIMARY KEY, " "scaling_factors BLOB, component_metadata BLOB)") c.execute("CREATE TABLE solver_metadata(id TEXT PRIMARY KEY, " "solver_options BLOB, solver_class TEXT)") self._database_initialized = True def _cleanup_abs2meta(self): """ Convert all abs2meta variable properties to a form that can be dumped as JSON. """ for name in self._abs2meta: for prop in self._abs2meta[name]: self._abs2meta[name][prop] = make_serializable(self._abs2meta[name][prop]) def _cleanup_var_settings(self, var_settings): """ Convert all var_settings variable properties to a form that can be dumped as JSON. Parameters ---------- var_settings : dict Dictionary mapping absolute variable names to variable settings. Returns ------- var_settings : dict Dictionary mapping absolute variable names to var settings that are JSON compatible. """ # otherwise we trample on values that are used elsewhere var_settings = deepcopy(var_settings) for name in var_settings: for prop in var_settings[name]: var_settings[name][prop] = make_serializable(var_settings[name][prop]) return var_settings
[docs] def startup(self, recording_requester): """ Prepare for a new run and create/update the abs2prom and prom2abs variables. Parameters ---------- recording_requester : object Object to which this recorder is attached. """ super(SqliteRecorder, self).startup(recording_requester) if not self._database_initialized: self._initialize_database() driver = None # grab the system if isinstance(recording_requester, Driver): system = recording_requester._problem.model driver = recording_requester elif isinstance(recording_requester, System): system = recording_requester elif isinstance(recording_requester, Problem): system = recording_requester.model driver = recording_requester.driver elif isinstance(recording_requester, Solver): system = recording_requester._system else: raise ValueError('Driver encountered a recording_requester it cannot handle' ': {0}'.format(recording_requester)) states = system._list_states_allprocs() if self.connection: if driver is None: desvars = system.get_design_vars(True, get_sizes=False) responses = system.get_responses(True, get_sizes=False) objectives = OrderedDict() constraints = OrderedDict() for name, data in iteritems(responses): if data['type'] == 'con': constraints[name] = data else: objectives[name] = data else: desvars = driver._designvars constraints = driver._cons objectives = driver._objs responses = driver._responses inputs = system._var_allprocs_abs_names['input'] + \ system._var_allprocs_abs_names_discrete['input'] outputs = system._var_allprocs_abs_names['output'] + \ system._var_allprocs_abs_names_discrete['output'] var_order = system._get_vars_exec_order(inputs=True, outputs=True) full_var_set = [(outputs, 'output'), (desvars, 'desvar'), (responses, 'response'), (objectives, 'objective'), (constraints, 'constraint')] # merge current abs2prom and prom2abs with this system's version self._abs2prom['input'].update(system._var_abs2prom['input']) self._abs2prom['output'].update(system._var_abs2prom['output']) for v, abs_names in iteritems(system._var_allprocs_prom2abs_list['input']): if v not in self._prom2abs['input']: self._prom2abs['input'][v] = abs_names else: self._prom2abs['input'][v] = list(set(chain(self._prom2abs['input'][v], abs_names))) # for outputs, there can be only one abs name per promoted name for v, abs_names in iteritems(system._var_allprocs_prom2abs_list['output']): self._prom2abs['output'][v] = abs_names # absolute pathname to metadata mappings for continuous & discrete variables # discrete mapping is sub-keyed on 'output' & 'input' real_meta = system._var_allprocs_abs2meta disc_meta = system._var_allprocs_discrete for var_set, var_type in full_var_set: for name in var_set: if name not in self._abs2meta: try: self._abs2meta[name] = real_meta[name].copy() except KeyError: self._abs2meta[name] = disc_meta['output'][name].copy() self._abs2meta[name]['type'] = [] self._abs2meta[name]['explicit'] = name not in states if var_type not in self._abs2meta[name]['type']: self._abs2meta[name]['type'].append(var_type) for name in inputs: try: self._abs2meta[name] = real_meta[name].copy() except KeyError: self._abs2meta[name] = disc_meta['input'][name].copy() self._abs2meta[name]['type'] = ['input'] self._abs2meta[name]['explicit'] = True self._cleanup_abs2meta() # store the updated abs2prom and prom2abs abs2prom = json.dumps(self._abs2prom) prom2abs = json.dumps(self._prom2abs) abs2meta = json.dumps(self._abs2meta) var_settings = {} var_settings.update(desvars) var_settings.update(objectives) var_settings.update(constraints) var_settings = self._cleanup_var_settings(var_settings) var_settings['execution_order'] = var_order var_settings_json = json.dumps(var_settings) with self.connection as c: c.execute("UPDATE metadata SET abs2prom=?, prom2abs=?, abs2meta=?, var_settings=?", (abs2prom, prom2abs, abs2meta, var_settings_json))
[docs] def record_iteration_driver(self, recording_requester, data, metadata): """ Record data and metadata from a Driver. Parameters ---------- recording_requester : object Driver in need of recording. data : dict Dictionary containing desvars, objectives, constraints, responses, and System vars. metadata : dict Dictionary containing execution metadata. """ if self.connection: outputs = data['out'] inputs = data['in'] # convert to list so this can be dumped as JSON for in_out in (inputs, outputs): if in_out is None: continue for var in in_out: in_out[var] = make_serializable(in_out[var]) outputs_text = json.dumps(outputs) inputs_text = json.dumps(inputs) with self.connection as c: c = c.cursor() # need a real cursor for lastrowid c.execute("INSERT INTO driver_iterations(counter, iteration_coordinate, " "timestamp, success, msg, inputs, outputs) VALUES(?,?,?,?,?,?,?)", (self._counter, self._iteration_coordinate, metadata['timestamp'], metadata['success'], metadata['msg'], inputs_text, outputs_text)) c.execute("INSERT INTO global_iterations(record_type, rowid, source) VALUES(?,?,?)", ('driver', c.lastrowid, recording_requester._get_name()))
[docs] def record_iteration_problem(self, recording_requester, data, metadata): """ Record data and metadata from a Problem. Parameters ---------- recording_requester : object Problem in need of recording. data : dict Dictionary containing desvars, objectives, and constraints. metadata : dict Dictionary containing execution metadata. """ if self.connection: outputs = data['out'] # convert to list so this can be dumped as JSON if outputs is not None: for var in outputs: outputs[var] = make_serializable(outputs[var]) outputs_text = json.dumps(outputs) with self.connection as c: c = c.cursor() # need a real cursor for lastrowid c.execute("INSERT INTO problem_cases(counter, case_name, " "timestamp, success, msg, outputs) VALUES(?,?,?,?,?,?)", (self._counter, metadata['name'], metadata['timestamp'], metadata['success'], metadata['msg'], outputs_text))
[docs] def record_iteration_system(self, recording_requester, data, metadata): """ Record data and metadata from a System. Parameters ---------- recording_requester : System System in need of recording. data : dict Dictionary containing inputs, outputs, and residuals. metadata : dict Dictionary containing execution metadata. """ if self.connection: inputs = data['i'] outputs = data['o'] residuals = data['r'] # convert to list so this can be dumped as JSON for i_o_r in (inputs, outputs, residuals): if i_o_r is None: continue for var in i_o_r: i_o_r[var] = make_serializable(i_o_r[var]) outputs_text = json.dumps(outputs) inputs_text = json.dumps(inputs) residuals_text = json.dumps(residuals) with self.connection as c: c = c.cursor() # need a real cursor for lastrowid c.execute("INSERT INTO system_iterations(counter, iteration_coordinate, " "timestamp, success, msg, inputs , outputs , residuals ) " "VALUES(?,?,?,?,?,?,?,?)", (self._counter, self._iteration_coordinate, metadata['timestamp'], metadata['success'], metadata['msg'], inputs_text, outputs_text, residuals_text)) # get the pathname of the source system source_system = recording_requester.pathname if source_system == '': source_system = 'root' c.execute("INSERT INTO global_iterations(record_type, rowid, source) VALUES(?,?,?)", ('system', c.lastrowid, source_system))
[docs] def record_iteration_solver(self, recording_requester, data, metadata): """ Record data and metadata from a Solver. Parameters ---------- recording_requester : Solver Solver in need of recording. data : dict Dictionary containing outputs, residuals, and errors. metadata : dict Dictionary containing execution metadata. """ if self.connection: abs = data['abs'] rel = data['rel'] inputs = data['i'] outputs = data['o'] residuals = data['r'] # convert to list so this can be dumped as JSON for i_o_r in (inputs, outputs, residuals): if i_o_r is None: continue for var in i_o_r: i_o_r[var] = make_serializable(i_o_r[var]) outputs_text = json.dumps(outputs) inputs_text = json.dumps(inputs) residuals_text = json.dumps(residuals) with self.connection as c: c = c.cursor() # need a real cursor for lastrowid c.execute("INSERT INTO solver_iterations(counter, iteration_coordinate, " "timestamp, success, msg, abs_err, rel_err, " "solver_inputs, solver_output, solver_residuals) " "VALUES(?,?,?,?,?,?,?,?,?,?)", (self._counter, self._iteration_coordinate, metadata['timestamp'], metadata['success'], metadata['msg'], abs, rel, inputs_text, outputs_text, residuals_text)) # get the pathname of the source system source_system = recording_requester._system.pathname if source_system == '': source_system = 'root' # get solver type from SOLVER class attribute to determine the solver pathname solver_type = recording_requester.SOLVER[0:2] if solver_type == 'NL': source_solver = source_system + '.nonlinear_solver' elif solver_type == 'LS': source_solver = source_system + '.nonlinear_solver.linesearch' else: raise RuntimeError("Solver type '%s' not recognized during recording. " "Expecting NL or LS" % recording_requester.SOLVER) c.execute("INSERT INTO global_iterations(record_type, rowid, source) VALUES(?,?,?)", ('solver', c.lastrowid, source_solver))
[docs] def record_viewer_data(self, model_viewer_data, key='Driver'): """ Record model viewer data. Parameters ---------- model_viewer_data : dict Data required to visualize the model. key : str, optional The unique ID to use for this data in the table. """ if self.connection: json_data = json.dumps(model_viewer_data, default=make_serializable) # Note: recorded to 'driver_metadata' table for legacy/compatibility reasons. try: with self.connection as c: c.execute("INSERT INTO driver_metadata(id, model_viewer_data) VALUES(?,?)", (key, json_data)) except sqlite3.IntegrityError: print("Model viewer data has already has already been recorded for %s." % key)
[docs] def record_metadata_system(self, recording_requester): """ Record system metadata. Parameters ---------- recording_requester : System The System that would like to record its metadata. """ if self.connection: scaling_vecs, user_options = self._get_metadata_system(recording_requester) if scaling_vecs is None: return scaling_factors = pickle.dumps(scaling_vecs, self._pickle_version) # try to pickle the metadata, report if it failed try: pickled_metadata = pickle.dumps(user_options, self._pickle_version) except Exception: pickled_metadata = pickle.dumps(OptionsDictionary(), self._pickle_version) simple_warning("Trying to record options which cannot be pickled " "on system with name: %s. Use the 'options_excludes' " "recording option on system objects to avoid attempting " "to record options which cannot be pickled. Skipping " "recording options for this system." % recording_requester.name, RuntimeWarning) path = recording_requester.pathname if not path: path = 'root' scaling_factors = sqlite3.Binary(scaling_factors) pickled_metadata = sqlite3.Binary(pickled_metadata) with self.connection as c: # Because we can have a recorder attached to multiple Systems, # and because we are now recording System metadata recursively, # we can store System metadata multiple times. Need to ignore when that happens # so we don't get database errors. So use OR IGNORE c.execute("INSERT OR IGNORE INTO system_metadata" "(id, scaling_factors, component_metadata) " "VALUES(?,?,?)", (path, scaling_factors, pickled_metadata))
[docs] def record_metadata_solver(self, recording_requester): """ Record solver metadata. Parameters ---------- recording_requester : Solver The Solver that would like to record its metadata. """ if self.connection: path = recording_requester._system.pathname solver_class = type(recording_requester).__name__ if not path: path = 'root' id = "{}.{}".format(path, solver_class) solver_options = pickle.dumps(recording_requester.options, self._pickle_version) with self.connection as c: c.execute("INSERT INTO solver_metadata(id, solver_options, solver_class) " "VALUES(?,?,?)", (id, sqlite3.Binary(solver_options), solver_class))
[docs] def record_derivatives_driver(self, recording_requester, data, metadata): """ Record derivatives data from a Driver. Parameters ---------- recording_requester : object Driver in need of recording. data : dict Dictionary containing derivatives keyed by 'of,wrt' to be recorded. metadata : dict Dictionary containing execution metadata. """ if self.connection: data_array = dict_to_structured_array(data) data_blob = array_to_blob(data_array) with self.connection as c: c = c.cursor() # need a real cursor for lastrowid c.execute("INSERT INTO driver_derivatives(counter, iteration_coordinate, " "timestamp, success, msg, derivatives) VALUES(?,?,?,?,?,?)", (self._counter, self._iteration_coordinate, metadata['timestamp'], metadata['success'], metadata['msg'], data_blob))
[docs] def shutdown(self): """ Shut down the recorder. """ # close database connection if self.connection: self.connection.close()