Source code for openmdao.gui.filemanager

import os.path
import shutil
import traceback
import zipfile
import re
import mimetypes
import logging

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from openmdao.gui.util import filedict
from openmdao.main.publisher import Publisher
from openmdao.util.log import logger
from openmdao.util.fileutil import onerror


[docs]class FilesPublisher(FileSystemEventHandler): ''' Publishes file collection when ANY file system event occurs. ''' def __init__(self, files): self.files = files # watchdog has a PatternMatchingEventHandler but it was causing # random errors, so we will just filter which events we publish if os.sep == "\\": self.ignore_patterns = [ r".*\\_macros$", r".*\\_macros\\.*", r".*\\.git.*", r".*\.pyc$", r".*\.pyd$" ] else: self.ignore_patterns = [ r".*/_macros$", r".*/_macros/.*", r".*/\.git.*", r".*\.pyc$", r".*\.pyd$" ]
[docs] def dispatch(self, event): ''' Just publish the updated file collection. ''' for pattern in self.ignore_patterns: if re.match(pattern, event.src_path): return try: self.files.publish_files() except Exception: traceback.print_exc()
[docs]class FileManager(object): ''' Object that keeps track of a collection of files (i.e., a directory) and optionally publishes an update when the collection is modified. ''' def __init__(self, name, path, publish_updates=False): self.name = name self.orig_dir = os.getcwd() self.root_dir = path self.publish_updates = publish_updates self.publisher = None if self.publish_updates: self.observer = Observer() self.observer.daemon = True self.observer.schedule(FilesPublisher(self), path=self.root_dir, recursive=True) self.observer.start() else: self.observer = None
[docs] def publish_files(self): ''' Publish the current file collection. ''' if not self.publisher: try: self.publisher = Publisher.get_instance() except Exception, err: print 'Error getting publisher:', err self.publisher = None if self.publisher: self.publisher.publish(self.name, self.get_files())
[docs] def cleanup(self): ''' Stop observer and cleanup the file directory. ''' if self.observer: self.observer.unschedule_all() self.observer.stop() self.observer.join() os.chdir(self.orig_dir)
[docs] def get_files(self, root=None): ''' Get a nested dictionary of files in the working directory. ''' if root is None: cwd = self.root_dir else: cwd = root return filedict(cwd)
def _get_abs_path(self, name): '''Return the absolute pathname of the given file/dir. ''' return os.path.join(self.root_dir, str(name).lstrip('/'))
[docs] def get_file(self, filename): ''' Get contents of file in working directory. Returns None if file was not found. ''' filepath = self._get_abs_path(filename) if os.path.exists(filepath): try: (mimetype, encoding) = mimetypes.guess_type(filepath) except Exception, err: logging.warn("In %s: %r", __file__, err) mimetype = 'application/octet-stream' encoding = 'binary' else: if mimetype and mimetype.lower() == 'image/x-png': # image/x-png is a legacy MIME type from the days before # it got its official name, image/png, in 1996. mimetype = 'image/png' contents = open(filepath, 'rb').read() return (contents, mimetype, encoding) else: return (None, None, None)
[docs] def ensure_dir(self, dirname): ''' Create directory in working directory. (Does nothing if directory already exists.) ''' try: dirpath = self._get_abs_path(dirname) if not os.path.isdir(dirpath): os.makedirs(dirpath) return str(True) except Exception, err: return str(err)
[docs] def write_file(self, filename, contents): ''' Write contents to file in working directory. ''' try: filename = str(filename) fpath = self._get_abs_path(filename) if filename.endswith('.py'): files = os.listdir(os.path.dirname(fpath)) # FIXME: This is a bit of a kludge, but for now we only create # an __init__.py file if it's the very first file in the # directory where a new file is being added. initpath = os.path.join(os.path.dirname(fpath), '__init__.py') if not files and not os.path.isfile(initpath): with open(initpath, 'w') as f: f.write(' ') with open(fpath, 'wb') as fout: fout.write(contents) return True except Exception, err: logger.error(str(err)) return err
[docs] def add_file(self, filename, contents): ''' Add file to working directory. If it's a zip file, unzip it. ''' self.write_file(filename, contents) fpath = self._get_abs_path(filename) if zipfile.is_zipfile(fpath): userdir = self.root_dir zfile = zipfile.ZipFile(fpath, "r") zfile.printdir() for fname in zfile.namelist(): if fname.endswith('/'): dirname = userdir + '/' + fname if not os.path.exists(dirname): os.makedirs(dirname) for fname in zfile.namelist(): if not fname.endswith('/'): data = zfile.read(fname) fname = userdir + '/' + fname fname = fname.replace('\\', '/') fout = open(fname, "wb") fout.write(data) fout.close() zfile.close() os.remove(fpath)
[docs] def delete_file(self, filename): ''' Delete file in working directory. Returns False if file was not found; otherwise, returns True. ''' filepath = self._get_abs_path(filename) if os.path.exists(filepath): if os.path.isdir(filepath): shutil.rmtree(filepath, onerror=onerror) else: os.remove(filepath) return True else: return False
[docs] def rename_file(self, oldpath, newname): ''' Rename last component of `oldpath` to `newname`. ''' filepath = self._get_abs_path(oldpath) if os.path.exists(filepath): newpath = os.path.join(os.path.dirname(filepath), newname) os.rename(filepath, newpath) return True else: return False
OpenMDAO Home