import os
import sys
import threading
import traceback
import socket

from openmdao.main.zmqrpc import ZMQ_RPC
from import get_unused_ip_port
from openmdao.gui.zmqserver import ZMQServer
from openmdao.gui.zmqstreamserver import ZMQStreamServer

debug = True

[docs]def DEBUG(msg): if debug: print '<<<' + str(os.getpid()) + '>>> ZMQServerManager --', msg sys.stdout.flush()
[docs]class ZMQServerManager(object): ''' Creates and keeps track of ZMQ servers for the given class. ''' def __init__(self, classpath, external=False): self.server_dict = {} self.classpath = classpath self.external = external if (external): self.address = socket.gethostbyaddr(socket.gethostname())[0] else: self.address = 'localhost'
[docs] def server(self, server_id): ''' Get server associated with an id; create one if none exists. ''' try: if server_id in self.server_dict: server_info = self.server_dict[server_id] return server_info['proxy'] else: url_fmt = "tcp://" rep_url = url_fmt % get_unused_ip_port() pub_url = url_fmt % get_unused_ip_port() out_url = url_fmt % get_unused_ip_port() DEBUG("%s \n\t RPC on %s \n\t pub on %s \n\t out on %s" % (server_id, rep_url, pub_url, out_url)) server = ZMQServer.spawn_server(self.classpath, rep_url, pub_url, out_url) proxy = ZMQ_RPC(rep_url) self.server_dict[server_id] = { 'server': server, 'proxy': proxy, 'rep_url': rep_url, 'pub_url': pub_url, 'out_url': out_url } return proxy except Exception as err: print 'Error getting server', server_id print str(err.__class__.__name__), ":", err exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_exception(exc_type, exc_value, exc_traceback) traceback.print_tb(exc_traceback, limit=30) return None
[docs] def delete_server(self, server_id): ''' Delete the server(s) associated with an id. ''' if server_id in self.server_dict: server_info = self.server_dict[server_id] del self.server_dict[server_id] self._terminate(server_info, 'out_ws_server') self._terminate(server_info, 'pub_ws_server') # Call proxy.cleanup() in a separate thread so we can # recover if it hangs (happens sometimes on Windows/EC2). proxy = server_info['proxy'] cleaner = threading.Thread(target=self._cleanup_proxy, args=(proxy,), name='Proxy Cleaner') cleaner.daemon = True cleaner.start() cleaner.join(10) if cleaner.is_alive(): print 'Timeout waiting for proxy cleaner' self._terminate(server_info, 'server')
@staticmethod def _terminate(server_info, name): ''' Terminate process `name` in `server_info`. ''' proc = server_info.get(name) if proc is not None: DEBUG('terminating %s' % name) try: proc.terminate() proc.wait() DEBUG(' terminated') except Exception as exc: print 'Error terminating', name, exc else: DEBUG("Can't terminate %s, no process" % name) @staticmethod def _cleanup_proxy(proxy): ''' Try to invoke proxy.cleanup(). This hangs sometimes on Windows. ''' try: proxy.cleanup() except Exception as exc: print 'Error cleaning up proxy', exc
[docs] def get_websocket_url(self, server_id, stream_name, target_url): ''' Get the url of the websocket for the specified stream of the specified server; if websocket server does not exist, start it. server_id: The id of the server. stream_name: The name of the stream ('out' or 'pub'). target_url: The relative url at which to serve the websocket. ''' if stream_name not in ['out', 'pub']: print >>sys.stderr, \ "ZMQServerManager - Invalid stream requested:", stream_name return None server_info = self.server_dict[server_id] ws_url_key = stream_name + '_ws_url' if ws_url_key in server_info: return server_info[ws_url_key] else: ws_port = get_unused_ip_port() ws_url = 'ws://%s:%d%s' % (self.address, ws_port, target_url) server_info[stream_name + '_ws_server'] = \ ZMQStreamServer.spawn_process(server_info[stream_name+'_url'], ws_port, target_url, self.external) server_info[ws_url_key] = ws_url return ws_url
[docs] def cleanup(self): ''' Delete all servers. ''' keys = self.server_dict.keys() for server_id in keys: self.delete_server(server_id)
