"""
Object encapsulation of directory/file trees
"""
import glob
import json
import os
from copy import deepcopy
from pathlib import Path
from . import utils
from .objchain import ObjChain, ChainList
#TODO: touch with hidden file that indicates a managed directory or file
[docs]class OPath(ObjChain):
""" A generic path """
def __init__(self, name, push_up=True, check_attr=True):
"""
<<<<<<< HEAD:magicdir/magic_dir.py
Initializer for MagicPath
=======
Initializer for OPath
>>>>>>> refactoring:opath/opath.py
:param name: basename of the path
:type name: str
:param push_up: default of whether to 'push' access of this path to the root path node
:type push_up: boolean
:param check_attr: default to validate attributes. For example 'this is not valid' is not a valid
attribute since it contains spaces but 'this_is_a_valid_attribute' is a valid attribute.
:type check_attr: boolean
"""
super().__init__(push_up=push_up, check_attr=check_attr)
self.name = name
self._parent_dir = ''
@property
def dir(self):
""" The parent directory of this location"""
return str(self.root._parent_dir)
@property
def relpath(self):
""" Relative path of this location """
return Path(*self.ancestors(include_self=True).name)
@property
def path(self):
""" Path of this location"""
return Path(str(self.dir), *self.ancestors(include_self=True).name)
@property
def abspath(self):
""" Absolute path of this location """
return self.path.absolute()
[docs] def set_dir(self, path):
""" Set the parent directory """
self.root._parent_dir = str(path)
return path
[docs] def remove_parent(self):
""" Remove parent from this path """
new_parent = self.abspath.parent
self._parent_dir = new_parent
return super().remove_parent()
def __repr__(self):
return "<{}(\"{}\")>".format(self.__class__.__name__, self.name, self.relpath)
def print(self, print_files=False, indent=4, max_level=None, level=0, list_missing=True):
print(self.show(print_files=print_files, indent=indent,
max_level=max_level, level=0, list_missing=True))
[docs] def show(self, print_files=False, indent=4, max_level=None, level=0, list_missing=True):
"""
Recursively print the file structure
:param print_files: whether to print files
:type print_files: boolean
:param indent: number of spaces per indent
:type indent: int
:param max_level: max level depth to print
:type max_level: int
:param level: start at level
:type level: int
:param list_missing: whether to annotate files that do not exist
:type list_missing: boolean
:return: None
:rtype: None
"""
padding = '| ' * level
name = self.name
if self.attr and name != self.attr:
name += " (\"{}\")".format(self.attr)
missing_tag = ''
if list_missing and not self.exists():
missing_tag = "*"
s = "{padding}{missing}{name}".format(missing=missing_tag, padding=padding, name=name)
s += '\n'
level += 1
for name, child in self._children.items():
s += child.show(print_files, indent, max_level, level, list_missing)
return s
[docs]class OFile(OPath):
""" A file object """
[docs] def write(self, data, mode='w', **kwargs):
""" Write data to a file """
return self.parent.write_file(self.name, mode, data, **kwargs)
[docs] def read(self, mode='r', **kwargs):
""" Read data from a file """
return self.parent.read_file(self.name, mode, **kwargs)
[docs] def open(self, mode='r', **kwargs):
""" Opens a file for reading or writing """
return self.parent.open_file(self.name, mode, **kwargs)
[docs] def dump_json(self, data, mode='w', **json_kwargs):
"""Dump data as a json"""
return self.parent.dump_json(self.name, mode, data, **json_kwargs)
[docs] def load_json(self, mode='r', **json_kwargs):
"""Load data from json"""
return self.parent.load_json(self.name, mode, **json_kwargs)
[docs] def exists(self):
""" Whether the file exists """
return Path(self.abspath).is_file()
[docs] def rm(self):
""" Removes file if it exists. """
if self.exists():
os.remove(str(self.abspath))
[docs]class ODir(OPath):
""" A directory object """
# TODO: this doesn't take into account files not in descendents
@property
def files(self):
"""
Recursively returns all files :class:`OFile` of this directory. Does not include parent directories.
:return: list of OFiles
:rtype: list
"""
desc = self.descendents(include_self=True)
return ChainList([d for d in desc if isinstance(d, OFile)])
@property
def dirs(self):
"""
Recursively returns all directories :class:`ODir` of this directory. Does not include parent directories.
:return: list of ODir
:rtype: list
"""
desc = self.descendents(include_self=True)
return ChainList([d for d in desc if isinstance(d, ODir)])
[docs] def list_dirs(self):
"""List immediate directories in this directory"""
return ChainList([c for c in self.children if isinstance(c, ODir)])
[docs] def list_files(self):
"""List immediate files in this directory"""
return ChainList([c for c in self.children if isinstance(c, OFile)])
@property
def relpaths(self):
"""
Returns all paths to all directories (includes parent_dir).
:return: directory paths in this directory (inclusive)
:rtype: list of PosixPath
"""
return self.dirs.relpath
@property
def paths(self):
"""
Returns all paths to all directories (includes parent_dir).
:return: directory paths in this directory (inclusive)
:rtype: list of PosixPath
"""
return self.dirs.path
@property
def abspaths(self):
"""
Returns all absolute paths to all directories (includes parent_dir).
:return: directory absolute paths in this directory (inclusive)
:rtype: list of PosixPath
"""
return self.paths.absolute()
[docs] def all_exists(self):
"""
Whether all directories in the tree exist.
:return: directory tree exists
:rtype: boolean
"""
return all(self.abspaths.is_dir())
[docs] def mkdirs(self):
"""
Creates all directories in the directory tree. Existing directories are ignored.
:return: self
:rtype: ODir
"""
for p in self.abspaths:
utils.makedirs(p, exist_ok=True)
return self
[docs] def rmdirs(self):
"""
Recursively removes all files and directories of this directory (inclusive)
:return: self
:rtype: ODir
"""
if self.abspath.is_dir():
utils.rmtree(self.abspath)
return self
[docs] def cpdirs(self, new_parent):
"""
Copies the directory tree to a new location. Returns the root of the newly copied tree.
:param new_parent: path of new parent directory
:type new_parent: basestring or PosixPath or Path object
:return: copied directory
:rtype: ODir
"""
utils.copytree(self.abspath, Path(new_parent, self.name))
copied_dirs = deepcopy(self)
copied_dirs.remove_parent()
copied_dirs.set_dir(new_parent)
return copied_dirs
[docs] def mvdirs(self, new_parent):
""" Moves the directory. If this directory has a parent connection, the connection will be broken and this
directory will become a root directory. The original root will be left in place but will no longer have
access to the moved directories. """
oldpath = self.abspath
self.remove_parent()
if self.exists():
utils.copytree(oldpath, Path(new_parent, self.name))
self.set_dir(new_parent)
if self.exists():
utils.rmtree(oldpath)
return self
[docs] def exists(self):
""" Whether the directory exists """
return self.abspath.is_dir()
[docs] def ls(self):
""" Lists the files that exist in directory """
return utils.listdir(self.abspath)
def glob(self, pattern):
return glob.glob(str(Path(self.abspath, pattern)))
# def collect(self):
# """ collects new directories that exist on the local machine and add to tree """
def _get_if_exists(self, name, attr):
if self.has(attr):
c = self.get(attr)
if hasattr(c, 'name') and c.name == name:
return c
def _validate_add(self, name, attr, expected_type, blacklisted_names):
e = self._get_if_exists(name, attr)
if e and issubclass(e.__class__, expected_type):
return e
if name in blacklisted_names:
raise AttributeError(
"{} name \"{}\" already exists for {}. Existing dirnames: {}".format(
expected_type.__name__, name, self, ', '.join(blacklisted_names)))
# TODO: exist_ok kwarg
[docs] def add(self, name, attr=None, push_up=None, check_attr=None):
"""
Adds a new directory to the directory tree.
:param name: name of the new directory
:type name: basestring
:param attr: attribute to use to access this directory. Defaults to name.
:type attr: basestring
:param push_up: whether to 'push' attribute to the root, where it can be accessed
:type push_up: boolean
:param check_attr: if True, will raise exception if attr is not a valid attribute. If None, value will
default to defaults defined on initialization
:type check_attr: boolean|None
:return: new directory
:rtype: ODir
"""
if attr is None:
attr = name
existing = self._validate_add(name, attr, ODir, self.children.name)
if existing:
return existing
return self._create_and_add_child(attr, with_attributes={"name": name}, push_up=push_up, check_attr=check_attr)
[docs] def add_file(self, name, attr=None, push_up=None, check_attr=False):
"""
Adds a new file to the directory tree.
:param name: name of the new file
:type name: basestring
:param attr: attribute to use to access this directory. Defaults to name.
:type attr: basestring
:param push_up: whether to 'push' attribute to the root, where it can be accessed
:type push_up: boolean
:param check_attr: if True, will raise exception if attr is not a valid attribute. If None, value will
default to defaults defined on initialization
:type check_attr: boolean|None
:return: new directory
:rtype: ODir
"""
if attr is None:
attr = name
existing = self._validate_add(name, attr, OFile, self.files.name)
if existing:
return existing
file = OFile(name)
self._add(attr, file, push_up=push_up, check_attr=check_attr)
return file
[docs] def write_file(self, filename, mode, data, **kwargs):
""" Write a file at this location """
utils.makedirs(self.abspath)
with self.open_file(str(Path(self.abspath, filename)), mode, **kwargs) as f:
f.write(data)
[docs] def read_file(self, filename, mode, **kwargs):
""" Read a file at this location """
with self.open_file(str(Path(self.abspath, filename)), mode, **kwargs) as f:
return f.read()
[docs] def open_file(self, filename, mode, **kwargs):
""" Open a file at this location """
utils.makedirs(self.abspath)
return utils.fopen(str(Path(self.abspath, filename)), mode, **kwargs)
[docs] def dump_json(self, filename, mode, data, *args, **json_kwargs):
"""Dump data to json"""
utils.makedirs(self.abspath)
with self.open_file(str(Path(self.abspath, filename)), mode) as f:
json.dump(data, f, *args, **json_kwargs)
[docs] def load_json(self, filename, mode, **kwargs):
"""Load data from a json"""
with self.open_file(str(Path(self.abspath, filename)), mode, **kwargs) as f:
return json.load(f)