Source code for openmdao.main.objserverfactory

"""
Support for an OpenMDAO 'object service', a factory that can create servers
which support various operations such as creating objects, loading models via
egg files, remote execution, and remote file access.
"""

import logging
import optparse
import os.path
import pkg_resources
import platform
import shutil
import signal
import socket
import sys
import time

from multiprocessing import current_process

from openmdao.main.component import SimulationRoot
from openmdao.main.container import Container
from openmdao.main.factory import Factory
from openmdao.main.factorymanager import create, get_available_types
from openmdao.main.filevar import RemoteFile
from openmdao.main.mp_support import OpenMDAO_Manager, OpenMDAO_Proxy, register
from openmdao.main.mp_util import keytype, read_allowed_hosts, setup_tunnel, \
                                  read_server_config, write_server_config
from openmdao.main.rbac import get_credentials, set_credentials, \
                               rbac, RoleError
from openmdao.main.releaseinfo import __version__

from openmdao.util.filexfer import pack_zipfile, unpack_zipfile
from openmdao.util.log import install_remote_handler, remove_remote_handlers, \
                              logging_port, LOG_DEBUG2
from openmdao.util.publickey import make_private, read_authorized_keys, \
                                    write_authorized_keys, HAVE_PYWIN32
from openmdao.util.shellproc import ShellProc, STDOUT, DEV_NULL

_PROXIES = {}


[docs]class ObjServerFactory(Factory): """ An :class:`ObjServerFactory` creates :class:`ObjServers` and objects within those servers. name: string Name of factory, used in log messages, etc. authkey: string Passed to created :class:`ObjServer` servers. allow_shell: bool Passed to created :class:`ObjServer` servers. allowed_types: list(string) Passed to created :class:`ObjServer` servers. address: tuple or string A :mod:`multiprocessing` address specifying an Internet address or a pipe (default). Created :class:`ObjServer` servers will use the same form of address. The environment variable ``OPENMDAO_KEEPDIRS`` can be used to avoid having server directory trees removed when servers are shut-down. """ # These are used to propagate selections from main(). # There isn't a good way to propagate them through a manager. _address = None _allow_shell = False _allowed_types = None _allow_tunneling = False def __init__(self, name='ObjServerFactory', authkey=None, allow_shell=False, allowed_types=None, address=None): super(ObjServerFactory, self).__init__() self._name = name self._authkey = authkey self._address = address or ObjServerFactory._address self._allow_shell = allow_shell or ObjServerFactory._allow_shell self._allowed_types = allowed_types or ObjServerFactory._allowed_types self._managers = {} self._logger = logging.getLogger(name) self._logger.setLevel(logging.DEBUG) self._logger.info('PID: %d, %r, allow_shell %s', os.getpid(), keytype(self._authkey), allow_shell) self.host = platform.node() self.pid = os.getpid() self.version = __version__ self.manager_class = _ServerManager self.server_classname = 'openmdao_main_objserverfactory_ObjServer' @rbac('*', proxy_types=[object]) # ResourceAllocationManager import loop.
[docs] def get_ram(self): """ Returns the :class:`ResourceAllocationManager` instance. Used by :meth:`ResourceAllocationManager.add_remotes`. """ self._logger.debug('get_ram') from openmdao.main.resource import ResourceAllocationManager return ResourceAllocationManager._get_instance()
@rbac('*')
[docs] def echo(self, *args): """ Simply return the arguments. This can be useful for latency/thruput masurements, connectivity testing, firewall keepalives, etc. """ return args
@rbac(('owner', 'user'))
[docs] def release(self, server): """ Shut-down :class:`ObjServer` `server`. server: :class:`ObjServer` Server to be shut down. """ try: address = server._token.address except AttributeError: address = 'not-a-proxy' self._logger.debug('release %r', server) self._logger.debug(' at %r', address) try: manager, root_dir, owner = self._managers[server] except KeyError: # Not identical to any of our proxies. # Could still be a reference to the same remote object. try: server_host = server.host server_pid = server.pid except Exception: self._logger.error("release: can't identify server at %r", address) raise ValueError("can't identify server at %r" % (address,)) for key in self._managers.keys(): if key.host == server_host and key.pid == server_pid: manager, root_dir, owner = self._managers[key] server = key break else: self._logger.error('release: server %r not found', server) for key in self._managers.keys(): self._logger.debug(' %r', key) self._logger.debug(' at %r', key._token.address) raise ValueError('server %r not found' % server) if get_credentials().user != owner.user: raise RoleError('only the owner can release') manager.shutdown() server._close.cancel() del self._managers[server] keep_dirs = int(os.environ.get('OPENMDAO_KEEPDIRS', '0')) if not keep_dirs and os.path.exists(root_dir): shutil.rmtree(root_dir)
@rbac('owner')
[docs] def cleanup(self): """ Shut-down all remaining :class:`ObjServers`. """ self._logger.debug('cleanup') cleanup_creds = get_credentials() servers = self._managers.keys() for server in servers: # Cleanup overrides release() 'owner' protection. set_credentials(self._managers[server][2]) try: self.release(server) finally: set_credentials(cleanup_creds) self._managers = {}
@rbac('*')
[docs] def get_available_types(self, groups=None): """ Returns a set of tuples of the form ``(typename, metadata)``, one for each available plugin type in the given entry point groups. If groups is *None,* return the set for all openmdao entry point groups. """ self._logger.debug('get_available_types %s', groups) types = get_available_types(groups) for typname, version in types: self._logger.log(LOG_DEBUG2, ' %s %s', typname, version) return types
@rbac(('owner', 'user'))
[docs] def create(self, typname, version=None, server=None, res_desc=None, **ctor_args): """ Create a new `typname` object in `server` or a new :class:`ObjectServer`. Returns a proxy for for the new object. Starts servers in a subdirectory of the current directory. typname: string Type of object to create. If null then a proxy for the new :class:`ObjServer` is returned. version: string or None Version of `typname` to create. server: proxy :class:`ObjServer` on which to create `typname`. If none, then a new server is created. res_desc: dict or None Required resources. Currently not used. ctor_args: dict Other constructor arguments. If `name` or `allowed_users` are specified, they are used when creating the :class:`ObjServer`. If no `allowed_users` are specified, the server is private to the current user. """ self._logger.info('create typname %r, version %r server %s,' ' res_desc %s, args %s', typname, version, server, res_desc, ctor_args) if server is None: name = ctor_args.get('name', '') if not name: name = 'Server_%d' % (len(self._managers) + 1) allowed_users = ctor_args.get('allowed_users') if not allowed_users: credentials = get_credentials() allowed_users = {credentials.user: credentials.public_key} else: del ctor_args['allowed_users'] if self._address is None or \ isinstance(self._address, basestring) or \ self._allow_tunneling: # Local access only via pipe if factory accessed by pipe # or factory is accessed via tunnel. address = None else: # Network access via same IP as factory, system-selected port. address = (self._address[0], 0) manager = self.manager_class(address, self._authkey, name=name, allowed_users=allowed_users) root_dir = name count = 1 while os.path.exists(root_dir): count += 1 root_dir = '%s_%d' % (name, count) os.mkdir(root_dir) # On Windows, when running the full test suite under Nose, # starting the process starts a new Nose test session, which # will eventually get here and start a new Nose session, which... orig_main = None if sys.platform == 'win32': #pragma no cover scripts = ('openmdao-script.py', 'openmdao_test-script.py') if sys.modules['__main__'].__file__.endswith(scripts): orig_main = sys.modules['__main__'].__file__ sys.modules['__main__'].__file__ = \ pkg_resources.resource_filename('openmdao.main', 'objserverfactory.py') owner = get_credentials() self._logger.log(LOG_DEBUG2, '%s starting server %r in dir %s', owner, name, root_dir) try: manager.start(cwd=root_dir, log_level=self._logger.getEffectiveLevel()) finally: if orig_main is not None: #pragma no cover sys.modules['__main__'].__file__ = orig_main self._logger.info('new server %r for %s', name, owner) self._logger.info(' in dir %s', root_dir) self._logger.info(' listening on %s', manager.address) server_class = getattr(manager, self.server_classname) server = server_class(name=name, allow_shell=self._allow_shell, allowed_types=self._allowed_types) self._managers[server] = (manager, root_dir, owner) if typname: obj = server.create(typname, version, None, res_desc, **ctor_args) else: obj = server self._logger.log(LOG_DEBUG2, 'create returning %r at %r', obj, obj._token.address) return obj
class _FactoryManager(OpenMDAO_Manager): """ A :class:`multiprocessing.Manager` which manages :class:`ObjServerFactory`. """ pass register(ObjServerFactory, _FactoryManager, 'openmdao.main.objserverfactory')
[docs]class ObjServer(object): """ An object which knows how to create other objects, load a model, etc. All remote file accesses must be within the tree rooted in the current directory at startup. name: string Name of server, used in log messages, etc. allow_shell: bool If True, :meth:`execute_command` and :meth:`load_model` are allowed. Use with caution! allowed_types: list(string) Names of types which may be created. If None, then allow types listed by :meth:`factorymanager.get_available_types`. If empty, no types are allowed. """ def __init__(self, name='', allow_shell=False, allowed_types=None): self._allow_shell = allow_shell if allowed_types is None: allowed_types = [typname for typname, version in get_available_types()] self._allowed_types = allowed_types self.host = platform.node() self.pid = os.getpid() self.name = name or ('sim-%d' % self.pid) self.version = __version__ self._root_dir = os.getcwd() self._logger = logging.getLogger(self.name) self._logger.info('PID: %d, allow_shell %s', os.getpid(), self._allow_shell) print '%s %r PID: %d, allow_shell %s' \ % (self.__class__.__name__, self.name, os.getpid(), self._allow_shell) sys.stdout.flush() SimulationRoot.chroot(self._root_dir) self.tlo = None # Ensure Traits Array support is initialized. The code contains # globals for numpy symbols that are initialized within # AbstractArray.__init__() which won't be executed if we simply # load up egg state (or don't do a real fork, like Windows). try: import numpy except ImportError: pass else: from enthought.traits.trait_numeric import AbstractArray dummy = AbstractArray() @rbac(('owner', 'user'))
[docs] def set_log_level(self, level): """ Set logging level to `level`. """ self._logger.info('log_level %s', level) self._logger.setLevel(level) logging.getLogger().setLevel(level)
@rbac('*')
[docs] def echo(self, *args): """ Simply return the arguments. This can be useful for latency/thruput masurements, connectivity testing, firewall keepalives, etc. """ return args
@rbac('owner', proxy_types=[object])
[docs] def create(self, typname, version=None, server=None, res_desc=None, **ctor_args): """ If *typname* is an allowed type, returns an object of type *typname,* using the specified package version, server location, and resource description. All arguments are passed to :meth:`factorymanager.create`. """ self._logger.debug('create typname %r, version %r server %s,' ' res_desc %s, args %s', typname, version, server, res_desc, ctor_args) if typname in self._allowed_types: obj = create(typname, version, server, res_desc, **ctor_args) self._logger.log(LOG_DEBUG2, ' returning %s', obj) return obj else: raise TypeError('%r is not an allowed type' % typname)
@rbac('owner')
[docs] def execute_command(self, resource_desc): """ Run command described by `resource_desc` in a subprocess if this server's `allow_shell` attribute is True. resource_desc: dict Contains job description. The current environment, along with any 'job_environment' specification, is in effect while running 'remote_command'. If 'input_path' is not specified, ``/dev/null`` or ``nul:`` is used. If 'output_path' is not specified, ``<remote_command>.stdout`` is used. If neither 'error_path' nor 'join_files' are specified, ``<remote_command>.stderr`` is used. If specified in the 'resource_limits' dictionary, 'wallclock_time' is used as a timeout. All other queuing resource keys are ignored. The ``HOME_DIRECTORY`` and ``WORKING_DIRECTORY`` placeholders are ignored. """ try: job_name = resource_desc['job_name'] except KeyError: job_name = '' command = resource_desc['remote_command'] self._check_path(command, 'execute_command') base = os.path.basename(command) command = [command] if 'args' in resource_desc: command.extend(resource_desc['args']) self._logger.debug('execute_command %s %r', job_name, command) if not self._allow_shell: self._logger.error('attempt to execute %r by %r', command, get_credentials().user) raise RuntimeError('shell access is not allowed by this server') env_vars = resource_desc.get('job_environment') try: stdin = resource_desc['input_path'] self._check_path(stdin, 'execute_command') except KeyError: stdin = DEV_NULL try: stdout = resource_desc['output_path'] self._check_path(stdout, 'execute_command') except KeyError: stdout = base+'.stdout' try: stderr = resource_desc['error_path'] self._check_path(stderr, 'execute_command') except KeyError: try: join_files = resource_desc['join_files'] except KeyError: stderr = base+'.stderr' else: stderr = STDOUT if join_files else base+'.stderr' limits = resource_desc.get('resource_limits', {}) timeout = limits.get('wallclock_time', 0) poll_delay = 1 try: process = ShellProc(command, stdin, stdout, stderr, env_vars) except Exception as exc: self._logger.error('exception creating process: %s', exc) raise self._logger.debug(' PID = %d', process.pid) return_code, error_msg = process.wait(poll_delay, timeout) self._logger.debug(' returning %s', (return_code, error_msg)) return (return_code, error_msg)
@rbac('owner', proxy_types=[Container])
[docs] def load_model(self, egg_filename): """ Load model from egg and return top-level object if this server's `allow_shell` attribute is True. egg_filename: string Filename of egg to be loaded. """ self._logger.debug('load_model %r', egg_filename) if not self._allow_shell: self._logger.error('attempt to load %r by %r', egg_filename, get_credentials().user) raise RuntimeError('shell access is not allowed by this server') self._check_path(egg_filename, 'load_model') if self.tlo: self.tlo.pre_delete() self.tlo = Container.load_from_eggfile(egg_filename, log=self._logger) return self.tlo
@rbac('owner')
[docs] def pack_zipfile(self, patterns, filename): """ Create ZipFile of files matching `patterns` if `filename` is legal. patterns: list List of :mod:`glob`-style patterns. filename: string Name of ZipFile to create. """ self._logger.debug('pack_zipfile %r', filename) self._check_path(filename, 'pack_zipfile') return pack_zipfile(patterns, filename, self._logger)
@rbac('owner')
[docs] def unpack_zipfile(self, filename, textfiles=None): """ Unpack ZipFile `filename` if `filename` is legal. filename: string Name of ZipFile to unpack. textfiles: list List of :mod:`fnmatch` style patterns specifying which upnapcked files are text files possibly needing newline translation. If not supplied, the first 4KB of each is scanned for a zero byte. If not found then the file is assumed to be a text file. """ self._logger.debug('unpack_zipfile %r', filename) self._check_path(filename, 'unpack_zipfile') return unpack_zipfile(filename, self._logger, textfiles)
@rbac('owner')
[docs] def chmod(self, path, mode): """ Returns ``os.chmod(path, mode)`` if `path` is legal. path: string Path to file to modify. mode: int New mode bits (permissions). """ self._logger.debug('chmod %r %o', path, mode) self._check_path(path, 'chmod') try: return os.chmod(path, mode) except Exception as exc: self._logger.error('chmod %r %o in %s failed %s', path, mode, os.getcwd(), exc) raise
@rbac('owner')
[docs] def isdir(self, path): """ Returns ``os.path.isdir(path)`` if `path` is legal. path: string Path to check. """ self._logger.debug('isdir %r', path) self._check_path(path, 'isdir') try: return os.path.isdir(path) except Exception as exc: self._logger.error('isdir %r in %s failed %s', path, os.getcwd(), exc) raise
@rbac('owner')
[docs] def listdir(self, path): """ Returns ``os.listdir(path)`` if `path` is legal. path: string Path to directory to list. """ self._logger.debug('listdir %r', path) self._check_path(path, 'listdir') try: return os.listdir(path) except Exception as exc: self._logger.error('listdir %r in %s failed %s', path, os.getcwd(), exc) raise
@rbac('owner', proxy_types=[RemoteFile])
[docs] def open(self, filename, mode='r', bufsize=-1): """ Returns ``open(filename, mode, bufsize)`` if `filename` is legal. filename: string Name of file to open. mode: string Accees mode. bufsize: int Size of buffer to use. """ self._logger.debug('open %r %r %s', filename, mode, bufsize) self._check_path(filename, 'open') try: return RemoteFile(open(filename, mode, bufsize)) except Exception as exc: self._logger.error('open %r %r %s in %s failed %s', filename, mode, bufsize, os.getcwd(), exc) raise
@rbac('owner')
[docs] def remove(self, path): """ Remove `path` if `path` is legal. path: string Path to file to remove. """ self._logger.debug('remove %r', path) self._check_path(path, 'remove') try: return os.remove(path) except Exception as exc: self._logger.error('remove %r in %s failed %s', path, os.getcwd(), exc) raise
@rbac('owner')
[docs] def stat(self, path): """ Returns ``os.stat(path)`` if `path` is legal. path: string Path to file to interrogate. """ self._logger.debug('stat %r', path) self._check_path(path, 'stat') try: return os.stat(path) except Exception as exc: self._logger.error('stat %r in %s failed %s', path, os.getcwd(), exc) raise
def _check_path(self, path, operation): """ Check if path is allowed to be used. """ abspath = os.path.abspath(path) if not abspath.startswith(self._root_dir): raise RuntimeError("Can't %s %r, not within root %s" % (operation, path, self._root_dir))
class _ServerManager(OpenMDAO_Manager): """ A :class:`multiprocessing.Manager` which manages :class:`ObjServer`. """ pass register(ObjServer, _ServerManager, 'openmdao.main.objserverfactory')
[docs]def connect_to_server(config_filename): """ Connects to the the server specified by `config_filename` and returns a (shared) proxy for the associated :class:`ObjServerFactory`. config_filename: string: Name of server configuration file. """ cfg = read_server_config(config_filename) return connect(cfg['address'], cfg['port'], cfg['tunnel'], pubkey=cfg['key'], logfile=cfg['logfile'])
[docs]def connect(address, port, tunnel=False, authkey='PublicKey', pubkey=None, logfile=None): """ Connects to the the server at `address` and `port` using `key` and returns a (shared) proxy for the associated :class:`ObjServerFactory`. address: string IP address for server, or pipe filename. port: int Server port. If < 0, `address` is a pipe filename. tunnel: bool Connect via SSH tunnel. authkey: Server authorization key. pubkey: Server public key, required if `authkey` is 'PublicKey'. logfile: Location of server's log file, if known. """ if port < 0: key = address else: key = (address, port) try: return _PROXIES[key] except KeyError: # Requires ssh setup. if tunnel: # pragma no cover location = setup_tunnel(address, port) else: location = key via = ' (via tunnel)' if tunnel else '' log = ' at %s' % logfile if logfile else '' if not OpenMDAO_Proxy.manager_is_alive(location): raise RuntimeError("Can't connect to server at %s:%s%s. It appears" " to be offline. Please check the server log%s." % (address, port, via, log)) mgr = _FactoryManager(location, authkey, pubkey=pubkey) try: mgr.connect() except EOFError: raise RuntimeError("Can't connect to server at %s:%s%s. It appears" " to be rejecting the connection. Please check" " the server log%s." % (address, port, via, log)) proxy = mgr.openmdao_main_objserverfactory_ObjServerFactory() if proxy.version != __version__: logging.warning('Server version %r different than local version %r', proxy.version, __version__) _PROXIES[key] = proxy return proxy
[docs]def start_server(authkey='PublicKey', address=None, port=0, prefix='server', allowed_hosts=None, allowed_users=None, allow_shell=False, allowed_types=None, timeout=None, tunnel=False, resources=None, log_prefix=None): """ Start an :class:`ObjServerFactory` service in a separate process in the current directory. authkey: string Authorization key, must be matched by clients. address: string IPv4 address, hostname, or pipe name. Default is the host's default IPv4 address. port: int Server port (default of 0 implies next available port). Note that ports below 1024 typically require special privileges. If port is negative, then a local pipe is used for communication. prefix: string Prefix for server config file and stdout/stderr file. allowed_hosts: list(string) Host address patterns to check against. Required if `port` >= 0. Ignored if `allowed_users` is specified. allowed_users: dict Dictionary of users and corresponding public keys allowed access. If None, *any* user may access. If empty, no user may access. The host portions of user strings are used for address patterns. allow_shell: bool If True, :meth:`execute_command` and :meth:`load_model` are allowed. Use with caution! allowed_types: list(string) Names of types which may be created. If None, then allow types listed by :meth:`get_available_types`. If empty, no types are allowed. timeout: int Seconds to wait for server to start. Note that public key generation can take a while. The default value of None will use an internally computed value based on host type (and for Windows, the availability of pyWin32). tunnel: bool If True, report host IP address but listen for connections from a local SSH tunnel. resources: string Filename for resource configuration. log_prefix: string Name used to identify remote remote logging messages from server. Implies that the local process will be receiving the messages. Returns ``(server_proc, config_filename)``. """ if timeout is None: if sys.platform == 'win32' and not HAVE_PYWIN32: #pragma no cover timeout = 120 else: timeout = 30 server_key = prefix+'.key' server_cfg = prefix+'.cfg' server_out = prefix+'.out' for path in (server_cfg, server_out): if os.path.exists(path): os.remove(path) with open(server_key, 'w') as out: out.write('%s\n' % authkey) factory_path = pkg_resources.resource_filename('openmdao.main', 'objserverfactory.py') args = ['python', factory_path, '--port', str(port), '--prefix', prefix] if address is not None: args.extend(['--address', address]) if tunnel: args.append('--tunnel') if resources is not None: args.append('--resources') args.append(resources) if allowed_users is not None: write_authorized_keys(allowed_users, 'users.allow', logging.getLogger()) args.extend(['--users', 'users.allow']) else: args.append('--allow-public') if port >= 0: if allowed_hosts is None: allowed_hosts = [socket.gethostbyname(socket.gethostname())] if allowed_hosts[0].startswith('127.') and \ '127.0.0.1' not in allowed_hosts: allowed_hosts.append('127.0.0.1') with open('hosts.allow', 'w') as out: for pattern in allowed_hosts: out.write('%s\n' % pattern) if sys.platform != 'win32' or HAVE_PYWIN32: make_private('hosts.allow') else: #pragma no cover logging.warning("Can't make hosts.allow private") if allow_shell: args.append('--allow-shell') if allowed_types is not None: with open('types.allow', 'w') as out: for typname in allowed_types: out.write('%s\n' % typname) if sys.platform != 'win32' or HAVE_PYWIN32: make_private('types.allow') else: #pragma no cover logging.warning("Can't make types.allow private") args.extend(['--types', 'types.allow']) if log_prefix is not None: log_host = socket.gethostname() log_port = logging_port(log_host, log_host) args.extend(['--log-host', log_host, '--log-port', str(log_port)]) if log_prefix: # Could be null (for default). args.extend(['--log-prefix', log_prefix]) proc = ShellProc(args, stdout=server_out, stderr=STDOUT) try: # Wait for valid server_cfg file. retry = 0 while (not os.path.exists(server_cfg)) or \ (os.path.getsize(server_cfg) == 0): return_code = proc.poll() if return_code: error_msg = proc.error_message(return_code) raise RuntimeError('Server startup failed %s' % error_msg) retry += 1 if retry < 10*timeout: time.sleep(.1) # Hard to cause a startup timeout. else: #pragma no cover proc.terminate(timeout) raise RuntimeError('Server startup timeout') return (proc, server_cfg) finally: if os.path.exists(server_key): os.remove(server_key)
[docs]def stop_server(server, config_filename): """ Shutdown :class:`ObjServerFactory` specified by `config_filename` and terminate it's process `server`. server: :class:`ShellProc` Server process retured by :meth:`start_server`. config_filename: string: Name of server configuration file. """ factory = connect_to_server(config_filename) factory.cleanup() server.terminate(timeout=10) # Remote process code.
_SERVER_CFG = ''
[docs]def main(): #pragma no cover """ OpenMDAO factory service process. Usage: python objserverfactory.py [--allow-public][--allow-shell][--hosts=filename][--types=filename][--users=filename][--address=address][--port=number][--prefix=name][--tunnel][--resources=filename][--log-host=hostname][--log-port=number][--log-prefix=string] --allow-public: Allows access by anyone from any allowed host. Use with care! --allow-shell: Allows access to :meth:`execute_command` and :meth:`load_model`. Use with care! --hosts: string Filename for allowed hosts specification. Default ``hosts.allow``. Ignored if '--users' is specified. The file should contain IPv4 host addresses, IPv4 domain addresses, or hostnames, one per line. Blank lines are ignored, and '#' marks the start of a comment which continues to the end of the line. For security reasons this file must be accessible only by the user running this server. --types: string Filename for allowed types specification. If not specified then allow types listed by :meth:`factorymanager.get_available_types`. The file should contain one type name per line. --users: string Filename for allowed users specification. Ignored if '--allow-public' is specified. Default is ``~/.ssh/authorized_keys``, other files should be of the same format: each line has ``key-type public-key-data user@host``, where `user` is the username on `host`. `host` will be translated to an IPv4 address and included in the allowed hosts list. Note that this ``user@host`` form is not necessarily enforced by programs which generate keys. For security reasons this file must be accessible only by the user running this server. --address: string IPv4 address, hostname, or pipe name. Default is the host's default IPv4 address. --port: int Server port (default of 0 implies next available port). Note that ports below 1024 typically require special privileges. If port is negative, then a local pipe is used for communication. --prefix: string Prefix for configuration and stdout/stderr files (default ``server``). --tunnel: Report host IP address but listen for connections from a local SSH tunnel. --resources: string Filename for resource configuration. If not specified then the default of ``~/.openmdao/resources.cfg`` will be used. --log-host: string Hostname to send remote log messages to. --log-port: int Port on `log-host` to send remote log messages to. --log-prefix: string Prefix to apply to remote log messages. Default is ``pid@host``. If ``prefix.key`` exists, it is read for an authorization key string. Otherwise public key authorization and encryption is used. Allowed hosts *must* be specified if `port` is >= 0. Only allowed hosts may connect to the server. Once initialized ``prefix.cfg`` is written with address, port, and public key information. """ parser = optparse.OptionParser() parser.add_option('--address', action='store', type='str', help='Network address to serve.') parser.add_option('--allow-public', action='store_true', default=False, help='Allows access by any user, use with care!') parser.add_option('--allow-shell', action='store_true', default=False, help='Allows potential shell access, use with care!') parser.add_option('--hosts', action='store', type='str', default='hosts.allow', help='Filename for allowed hosts') parser.add_option('--types', action='store', type='str', help='Filename for allowed types') parser.add_option('--users', action='store', type='str', default='~/.ssh/authorized_keys', help='Filename for allowed users') parser.add_option('--port', action='store', type='int', default=0, help='Server port (0 implies next available port)') parser.add_option('--prefix', action='store', default='server', help='Prefix for config and stdout/stderr files') parser.add_option('--tunnel', action='store_true', default=False, help='Report host IP address but listen for connections' ' from a local SSH tunnel') parser.add_option('--resources', action='store', type='str', default=None, help='Filename for resource configuration') parser.add_option('--log-host', action='store', type='str', default=None, help='hostname for remote log messages') parser.add_option('--log-port', action='store', type='int', default=None, help='port for remote log messages') parser.add_option('--log-prefix', action='store', type='str', default=None, help='prefix for remote log messages') options, arguments = parser.parse_args() if arguments: parser.print_help() sys.exit(1) logger = logging.getLogger() logger.setLevel(logging.DEBUG) if options.log_host and options.log_port: install_remote_handler(options.log_host, int(options.log_port), options.log_prefix) server_key = options.prefix+'.key' server_cfg = options.prefix+'.cfg' global _SERVER_CFG _SERVER_CFG = server_cfg # Get authkey. authkey = 'PublicKey' try: with open(server_key, 'r') as inp: authkey = inp.readline().strip() os.remove(server_key) except IOError: pass if options.allow_shell: msg = 'Shell access is ALLOWED' logger.warning(msg) print msg allowed_users = None allowed_hosts = None # Get allowed_users. if options.allow_public: allowed_users = None msg = 'Public access is ALLOWED' logger.warning(msg) print msg if options.port >= 0: # Get allowed_hosts. if os.path.exists(options.hosts): try: allowed_hosts = read_allowed_hosts(options.hosts) except Exception as exc: msg = "Can't read allowed hosts file %r: %s" \ % (options.hosts, exc) logger.error(msg) print msg sys.exit(1) else: msg = 'Allowed hosts file %r does not exist.' % options.hosts logger.error(msg) print msg sys.exit(1) if not allowed_hosts: msg = 'No hosts in allowed hosts file %r.' % options.hosts logger.error(msg) print msg sys.exit(1) else: if os.path.exists(options.users): try: allowed_users = read_authorized_keys(options.users, logger) except Exception as exc: msg = "Can't read allowed users file %r: %s" \ % (options.users, exc) logger.error(msg) print msg sys.exit(1) else: msg = 'Allowed users file %r does not exist.' % options.users logger.error(msg) print msg sys.exit(1) if not allowed_users: msg = 'No users in allowed users file %r.' % options.users logger.error(msg) print msg sys.exit(1) # Get allowed_types. allowed_types = None if options.types: if os.path.exists(options.types): allowed_types = [] with open(options.types, 'r') as inp: line = inp.readline() while line: line = line.strip() if line: allowed_types.append(line) line = inp.readline() else: msg = 'Allowed types file %r does not exist.' % options.types logger.error(msg) print msg sys.exit(1) # Optionally configure resources. if options.resources: # Import here to avoid import loop. from openmdao.main.resource import ResourceAllocationManager as RAM RAM.configure(options.resources) # Get address and create manager. if options.port >= 0: if options.address: # Specify IPv4/hostname. address = (options.address, options.port) else: address = (platform.node(), options.port) else: if options.address: # Specify pipename. address = options.address else: address = None logger.info('Starting FactoryManager %s %r', address, keytype(authkey)) current_process().authkey = authkey bind_address = ('127.0.0.1', options.port) if options.tunnel else address manager = _FactoryManager(bind_address, authkey, name='Factory', allowed_hosts=allowed_hosts, allowed_users=allowed_users, allow_tunneling=options.tunnel) # Set defaults for created ObjServerFactories. # There isn't a good method to propagate these through the manager. ObjServerFactory._address = address ObjServerFactory._allow_shell = options.allow_shell ObjServerFactory._allowed_types = allowed_types ObjServerFactory._allow_tunneling = options.tunnel # Get server, retry if specified address is in use. server = None retries = 0 while server is None: try: server = manager.get_server() except socket.error as exc: if str(exc).find('Address already in use') >= 0: if retries < 10: msg = 'Address %s in use, retrying...' % (address,) logger.debug(msg) print msg time.sleep(5) retries += 1 else: msg = 'Address %s in use, too many retries.' % (address,) logger.error(msg) print msg sys.exit(1) else: raise # Record configuration. real_ip = None if address is None else address[0] write_server_config(server, _SERVER_CFG, real_ip) msg = 'Serving on %s' % (server.address,) logger.info(msg) print msg sys.stdout.flush() # And away we go... signal.signal(signal.SIGTERM, _sigterm_handler) try: server.serve_forever() finally: _cleanup() sys.exit(0)
def _sigterm_handler(signum, frame): #pragma no cover """ Try to go down gracefully. """ logging.getLogger().info('sigterm_handler invoked') print 'sigterm_handler invoked' sys.stdout.flush() _cleanup() sys.exit(1) def _cleanup(): #pragma no cover """ Cleanup in preparation to shut down. """ remove_remote_handlers() keep_dirs = int(os.environ.get('OPENMDAO_KEEPDIRS', '0')) if not keep_dirs and os.path.exists(_SERVER_CFG): os.remove(_SERVER_CFG) if __name__ == '__main__': #pragma no cover main()
OpenMDAO Home