Source code for openmdao.gui.zmqstreamserver

import sys
import os
import traceback
import subprocess

from optparse import OptionParser

import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

from tornado import httpserver, web, websocket

debug = True
NAME_SIZE = 256  # this must agree with NAME_SIZE in Model.js


[docs]def DEBUG(msg): if debug: print '<<<' + str(os.getpid()) + '>>> ZMQStreamServer --', msg sys.stdout.flush()
[docs]def make_unicode(content): if type(content) == str: # Ignore errors even if the string is not proper UTF-8 or has # broken marker bytes. # Python built-in function unicode() can do this. content = unicode(content, "utf-8", errors="ignore") else: # Assume the content object has proper __unicode__() method content = unicode(content) return content
[docs]class ZMQStreamHandler(websocket.WebSocketHandler): ''' A handler that forwards output from a ZMQStream to a WebSocket. '''
[docs] def initialize(self, addr): self.addr = addr self.websocket_closed = False
[docs] def open(self): stream = None try: context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(self.addr) socket.setsockopt(zmq.SUBSCRIBE, '') stream = ZMQStream(socket) except Exception, err: exc_type, exc_value, exc_traceback = sys.exc_info() print 'ZMQStreamHandler ERROR getting ZMQ stream:', err traceback.print_exception(exc_type, exc_value, exc_traceback) if stream and not stream.closed(): stream.close() else: stream.on_recv(self._write_message)
def _write_message(self, message): if self.websocket_closed: print 'ZMQStreamHandler message received after websocket closed' return if len(message) == 1: # assume message[0] is a json string binary = False try: message = message[0] except Exception as err: exc_type, exc_value, exc_traceback = sys.exc_info print 'ZMQStreamHandler ERROR parsing message:', str(message), err traceback.print_exception(exc_type, exc_value, exc_traceback) return elif len(message) == 2: # it's a msg of the form [topic, binary_value] binary = True try: if len(message[0]) > NAME_SIZE: raise RuntimeError("topic field of message is longer than %d characters" % NAME_SIZE) if not isinstance(message[1], bytes): raise TypeError("message value must be of type 'bytes', not type '%s'" % str(type(message[1]))) #padded = bytes(message[0])+(NAME_SIZE-len(message[0]))*b'\0' # 0 padded object name in bytes message = message[0].ljust(NAME_SIZE, '\0') + message[1] # FIXME: message is copied here except Exception as err: exc_type, exc_value, exc_traceback = sys.exc_info() print 'ZMQStreamHandler ERROR parsing binary message:', str(message), err traceback.print_exception(exc_type, exc_value, exc_traceback) return try: self.write_message(message, binary=binary) except Exception as err: print 'ZMQStreamHandler ERROR writing message to websocket:', err
[docs] def on_message(self, message): pass
[docs] def on_close(self): self.websocket_closed = True
[docs]class ZMQStreamApp(web.Application): ''' A web application that serves a ZMQStream over a WebSocket. ''' def __init__(self, zmqstream_addr, websocket_url): handlers = [ (websocket_url, ZMQStreamHandler, dict(addr=zmqstream_addr)) ] settings = { 'login_url': '/login', 'debug': True, } super(ZMQStreamApp, self).__init__(handlers, **settings)
[docs]class ZMQStreamServer(object): ''' Runs an http server that serves a ZMQStream over a WebSocket. ''' def __init__(self, options): self.options = options self.web_app = ZMQStreamApp(options.addr, options.url) self.http_server = httpserver.HTTPServer(self.web_app)
[docs] def serve(self): ''' Start server listening on port & start the ioloop. ''' DEBUG('serve %s' % self.options.port) try: if (self.options.external): self.http_server.listen(self.options.port) else: self.http_server.listen(self.options.port, address='localhost') except Exception as exc: print '<<<%s>>> ZMQStreamServer -- listen on %s failed: %s' \ % (os.getpid(), self.options.port, exc) sys.exit(1) try: ioloop.IOLoop.instance().start() except KeyboardInterrupt: DEBUG('interrupt received, shutting down.')
@staticmethod
[docs] def get_options_parser(): ''' Create a parser for command-line arguments. ''' parser = OptionParser() parser.add_option("-z", "--zmqstream", dest="addr", default=0, help="the address of the zmqstream") parser.add_option("-p", "--port", dest="port", type="int", default=0, help="the port to run websocket server on") parser.add_option("-u", "--url", dest="url", help="the url to expose for the websocket") parser.add_option("-x", "--external", dest="external", action="store_true", help="allow access to the server from external clients") return parser
@staticmethod
[docs] def spawn_process(zmq_url, ws_port, ws_url='/', external=False): ''' Run zmqstreamserver in its own process, mapping a zmq stream to a websocket. args: zmq_url the url of the ZMQStream ws_port the port to serve the WebSocket on ws_url the url to map to the WebSocket ''' file_path = os.path.abspath(__file__) cmd = ['python', file_path, '-z', str(zmq_url), '-p', str(ws_port), '-u', str(ws_url)] if external: cmd.append('-x') return subprocess.Popen(cmd)
[docs]def main(): ''' Process command line arguments, create server, and start it up. ''' # make sure to install zmq ioloop before creating any tornado objects ioloop.install() # create the server and kick it off parser = ZMQStreamServer.get_options_parser() (options, args) = parser.parse_args() server = ZMQStreamServer(options) server.serve()
if __name__ == '__main__': # dont run main() if this is a forked windows process if sys.modules['__main__'].__file__ == __file__: main()
OpenMDAO Home