Commit d7ad5b04 by Sébastien Eustace

Improve TOML file management

parent 8b195ed1
......@@ -5,10 +5,12 @@
### Added
- Add `show` command.
- Added the `--dry-run` option to the `add` command.
### Changed
- Dependencies of each package is now stored in `poetry.lock`.
- Improved TOML file management.
### Fixed
......
......@@ -302,6 +302,7 @@ poetry add requests pendulum
* `--D|dev`: Add package as development dependency.
* `--optional` : Add as an optional dependency.
* `--dry-run` : Outputs the operations but will not execute anything (implicitly enables --verbose).
### remove
......
......@@ -25,9 +25,9 @@ include = ['poetry/**/*', 'LICENSE']
[dependencies]
cleo = "^0.6"
requests = "^2.18"
toml = "^0.9"
cachy = "^0.1.0"
pip-tools = "^1.11"
toml = "^0.9.4"
[dev-dependencies]
pytest = "~3.4"
......
import re
import toml
from typing import List
from typing import Tuple
......@@ -20,6 +18,8 @@ class AddCommand(Command):
{ name* : Packages to add. }
{--D|dev : Add package as development dependency. }
{--optional : Add as an optional dependency. }
{--dry-run : Outputs the operations but will not execute anything
(implicitly enables --verbose). }
"""
help = """The add command adds required packages to your <comment>poetry.toml</> and installs them.
......@@ -28,10 +28,22 @@ If you do not specify a version constraint, poetry will choose a suitable one ba
"""
def handle(self):
names = self.argument('name')
packages = self.argument('name')
is_dev = self.option('dev')
requirements = self._determine_requirements(names)
section = 'dependencies'
if is_dev:
section = 'dev-dependencies'
original_content = self.poetry.locker.original.read()
content = self.poetry.locker.original.read()
for name in packages:
for key in content[section]:
if key.lower() == name.lower():
raise ValueError(f'Package {name} is already present')
requirements = self._determine_requirements(packages)
requirements = self._format_requirements(requirements)
# validate requirements format
......@@ -39,53 +51,11 @@ If you do not specify a version constraint, poetry will choose a suitable one ba
for constraint in requirements.values():
parser.parse_constraints(constraint)
# Trying to figure out where to add our dependencies
# If we find a toml library that keeps comments
# We could remove this whole section
section = '[dependencies]'
if is_dev:
section = '[dev-dependencies]'
new_content = None
with self.poetry.locker.original.path.open() as fd:
content = fd.read().split('\n')
in_section = False
index = None
for i, line in enumerate(content):
line = line.strip()
if line == section:
in_section = True
continue
if in_section and not line:
index = i
break
if index is not None:
for i, require in enumerate(requirements.items()):
name, version = require
if '.' in name:
name = f'"{name}"'
content.insert(
index + i,
f'{name} = "{version}"'
)
new_content = '\n'.join(content)
if new_content is not None:
with self.poetry.locker.original.path.open('w') as fd:
fd.write(new_content)
else:
# We could not find where to put the dependencies
# We raise an warning
self.warning('Unable to automatically add dependencies')
self.warning('Add them manually to your poetry.toml')
for name, constraint in requirements.items():
content[section][name] = constraint
return 1
# Write new content
self.poetry.locker.original.write(content)
# Cosmetic new line
self.line('')
......@@ -100,10 +70,29 @@ If you do not specify a version constraint, poetry will choose a suitable one ba
self.poetry.pool
)
installer.dry_run(self.option('dry-run'))
installer.update(True)
installer.whitelist(requirements)
installer.run()
try:
status = installer.run()
except Exception:
self.poetry.locker.original.write(original_content)
raise
if status != 0 or self.option('dry-run'):
# Revert changes
if not self.option('dry-run'):
self.error(
'\n'
'Addition failed, reverting poetry.toml '
'to its original content.'
)
self.poetry.locker.original.write(original_content)
return status
def _determine_requirements(self, requires: List[str]) -> List[str]:
if not requires:
......
import toml
from poetry.installation import Installer
from .command import Command
......@@ -22,59 +20,33 @@ list of installed packages
<info>poetry remove</info>"""
def handle(self):
packages = [p.lower() for p in self.argument('packages')]
packages = self.argument('packages')
is_dev = self.option('dev')
with self.poetry.locker.original.path.open() as fd:
content = fd.read().split('\n')
# Trying to figure out where are our dependencies
# If we find a toml library that keeps comments
# We could remove this whole section
section = '[dependencies]'
original_content = self.poetry.locker.original.read()
content = self.poetry.locker.original.read()
section = 'dependencies'
if is_dev:
section = '[dev-dependencies]'
section = 'dev-dependencies'
# Searching for package in
in_section = False
indices = []
# Deleting entries
requirements = {}
for i, line in enumerate(content):
line = line.strip()
if line == section:
in_section = True
continue
if in_section:
if not line:
# End of section
for name in packages:
found = False
for key in content[section]:
if key.lower() == name.lower():
found = True
requirements[name] = content[section][name]
break
requirement = toml.loads(line)
name = list(requirement.keys())[0].lower()
version = requirement[name]
if name in packages:
requirements[name] = version
indices.append(i)
break
if not indices or len(indices) != len(packages):
raise RuntimeError(
'Packages are not present in your poetry.toml file'
)
new_content = []
for i, line in enumerate(content):
if i in indices:
continue
if not found:
raise ValueError(f'Package {name} not found')
new_content.append(line)
for key in requirements:
del content[section][key]
new_content = '\n'.join(new_content)
with self.poetry.locker.original.path.open('w') as fd:
fd.write(new_content)
# Write the new content back
self.poetry.locker.original.write(content)
# Update packages
self.reset_poetry()
......@@ -93,8 +65,7 @@ list of installed packages
try:
status = installer.run()
except Exception:
with self.poetry.locker.original.path.open('w') as fd:
fd.write('\n'.join(content))
self.poetry.locker.original.write(original_content)
raise
......@@ -107,7 +78,6 @@ list of installed packages
'to its original content.'
)
with self.poetry.locker.original.path.open('w') as fd:
fd.write('\n'.join(content))
self.poetry.locker.original.write(original_content)
return status
......@@ -2,8 +2,8 @@
import re
import toml
from poetry.toml.toml_file import TOMLFile
from poetry.utils.toml_file import TomlFile
from poetry.vcs.git import Git
_canonicalize_regex = re.compile(r"[-_.]+")
......@@ -89,29 +89,20 @@ class Layout(object):
)
def _write_poetry(self, path):
output = {
'package': {
'name': self._project,
'version': self._version,
'authors': [self._author],
}
toml = TOMLFile([])
toml['package'] = {
'name': self._project,
'version': self._version,
'authors': [self._author],
}
content = toml.dumps(output, preserve=True)
output = {
'dependencies': {},
'dev-dependencies': {
'pytest': '^3.4'
}
toml['dependencies'] = {}
toml['dev-dependencies'] = {
'pytest': '^3.4',
}
content += '\n' + toml.dumps(output, preserve=True)
poetry = path / 'poetry.toml'
poetry = TomlFile(path / 'poetry.toml')
with poetry.open('w') as f:
f.write(content)
poetry.write(toml)
def _canonicalize_name(self, name: str) -> str:
return _canonicalize_regex.sub("-", name).lower()
......@@ -21,8 +21,6 @@ class DetachVertexNamed(Action):
if self._name not in graph.vertices:
return []
print('Detaching', graph.vertices[self._name])
self._vertex = graph.vertices[self._name]
del graph.vertices[self._name]
removed_vertices = [self._vertex]
......
......@@ -62,3 +62,6 @@ class VCSDependency(Dependency):
def is_vcs(self) -> bool:
return True
def accepts_prereleases(self):
return True
import os
import shutil
import toml
from functools import cmp_to_key
from pathlib import Path
from tempfile import mkdtemp
......@@ -22,6 +20,7 @@ from poetry.repositories import Pool
from poetry.semver import less_than
from poetry.semver.constraints import Constraint
from poetry.utils.toml_file import TomlFile
from poetry.utils.venv import Venv
from poetry.vcs.git import Git
......@@ -100,11 +99,11 @@ class Provider(SpecificationProvider):
if dependency.tag or dependency.rev:
revision = dependency.reference
if (tmp_dir / 'poetry.toml').exists():
poetry = TomlFile(tmp_dir / 'poetry.toml')
if poetry.exists():
# If a poetry.toml file exists
# We use it to get the information we need
with (tmp_dir / 'poetry.toml').open() as fd:
info = toml.loads(fd.read())
info = poetry.read()
name = info['package']['name']
version = info['package']['version']
......
"""
This toml module is a port with changes and fixes
of [contoml](https://github.com/jumpscale7/python-consistent-toml).
"""
from .toml_file import TOMLFile
from .prettify.lexer import tokenize as lexer
from .prettify.parser import parse_tokens
def loads(text):
"""
Parses TOML text into a dict-like object and returns it.
"""
tokens = tuple(lexer(text, is_top_level=True))
elements = parse_tokens(tokens)
return TOMLFile(elements)
def load(file_path):
"""
Parses a TOML file into a dict-like object and returns it.
"""
with open(file_path) as fd:
return loads(fd.read())
def dumps(value):
"""
Dumps a data structure to TOML source code.
The given value must be either a dict of dict values, a dict,
or a TOML file constructed by this module.
"""
if not isinstance(value, TOMLFile):
raise RuntimeError(
'Can only dump a TOMLFile instance loaded by load() or loads()'
)
return value.dumps()
def dump(obj, file_path, prettify=False):
"""
Dumps a data structure to the filesystem as TOML.
The given value must be either a dict of dict values, a dict,
or a TOML file constructed by this module.
"""
with open(file_path, 'w') as fp:
fp.write(dumps(obj))
from .prettify.errors import InvalidValueError
from .freshtable import FreshTable
from .prettify import util
class ArrayOfTables(list):
def __init__(self, toml_file, name, iterable=None):
if iterable:
list.__init__(self, iterable)
self._name = name
self._toml_file = toml_file
def append(self, value):
if isinstance(value, dict):
table = FreshTable(parent=self, name=self._name, is_array=True)
table._append_to_parent()
index = len(self._toml_file[self._name]) - 1
for key_seq, value in util.flatten_nested(value).items():
# self._toml_file._setitem_with_key_seq((self._name, index) + key_seq, value)
self._toml_file._array_setitem_with_key_seq(self._name, index, key_seq, value)
# for k, v in value.items():
# table[k] = v
else:
raise InvalidValueError('Can only append a dict to an array of tables')
def __getitem__(self, item):
try:
return list.__getitem__(self, item)
except IndexError:
if item == len(self):
return FreshTable(parent=self, name=self._name, is_array=True)
else:
raise
def append_fresh_table(self, fresh_table):
list.append(self, fresh_table)
if self._toml_file:
self._toml_file.append_fresh_table(fresh_table)
import operator
from functools import reduce
from . import raw
class CascadeDict:
"""
A dict-like object made up of one or more other dict-like objects where querying for an item cascade-gets
it from all the internal dicts in order of their listing, and setting an item sets it on the first dict listed.
"""
def __init__(self, *internal_dicts):
assert internal_dicts, 'internal_dicts cannot be empty'
self._internal_dicts = tuple(internal_dicts)
def cascaded_with(self, one_more_dict):
"""
Returns another instance with one more dict cascaded at the end.
"""
return CascadeDict(self._internal_dicts, one_more_dict,)
def __getitem__(self, item):
for d in self._internal_dicts:
try:
return d[item]
except KeyError:
pass
raise KeyError
def __setitem__(self, key, value):
self._internal_dicts[0][key] = value
def get(self, item, default=None):
try:
return self[item]
except KeyError:
return default
def keys(self):
return set(reduce(operator.or_, (set(d.keys()) for d in self._internal_dicts)))
def items(self):
all_items = reduce(operator.add, (list(d.items()) for d in reversed(self._internal_dicts)))
unique_items = {k: v for k, v in all_items}.items()
return tuple(unique_items)
def __contains__(self, item):
for d in self._internal_dicts:
if item in d:
return True
return False
def __len__(self):
return len(self.keys())
@property
def neutralized(self):
return {k: raw.to_raw(v) for k, v in self.items()}
@property
def primitive_value(self):
return self.neutralized
def __repr__(self):
return repr(self.primitive_value)
from .prettify.elements.table import TableElement
class FreshTable(TableElement):
"""
A fresh TableElement that appended itself to each of parents when it first gets written to at most once.
parents is a sequence of objects providing an append_fresh_table(TableElement) method
"""
def __init__(self, parent, name, is_array=False):
TableElement.__init__(self, sub_elements=[])
self._parent = parent
self._name = name
self._is_array = is_array
# As long as this flag is false, setitem() operations will append the table header and this table
# to the toml_file's elements
self.__appended = False
@property
def name(self):
return self._name
@property
def is_array(self):
return self._is_array
def _append_to_parent(self):
"""
Causes this ephemeral table to be persisted on the TOMLFile.
"""
if self.__appended:
return
if self._parent is not None:
self._parent.append_fresh_table(self)
self.__appended = True
def __setitem__(self, key, value):
TableElement.__setitem__(self, key, value)
self._append_to_parent()
import itertools
class PeekableIterator:
# Returned by peek() when the iterator is exhausted. Truthiness is False.
Nothing = tuple()
def __init__(self, iter):
self._iter = iter
def __next__(self):
return next(self._iter)
def next(self):
return self.__next__()
def __iter__(self):
return self
def peek(self):
"""
Returns PeekableIterator.Nothing when the iterator is exhausted.
"""
try:
v = next(self._iter)
self._iter = itertools.chain((v,), self._iter)
return v
except StopIteration:
return PeekableIterator.Nothing
from ._version import VERSION
__version__ = VERSION
def prettify(toml_text):
"""
Prettifies and returns the TOML file content provided.
"""
from .parser import parse_tokens
from .lexer import tokenize
from .prettifier import prettify as element_prettify
tokens = tokenize(toml_text, is_top_level=True)
elements = parse_tokens(tokens)
prettified = element_prettify(elements)
return ''.join(pretty_element.serialized() for pretty_element in prettified)
def prettify_from_file(file_path):
"""
Reads, prettifies and returns the TOML file specified by the file_path.
"""
with open(file_path, 'r') as fp:
return prettify(fp.read())
"""
TOML file elements (a higher abstraction layer than individual lexical tokens).
"""
from .traversal import TraversalMixin
from .errors import InvalidElementError
from .table import TableElement
from .tableheader import TableHeaderElement
from .common import TYPE_METADATA, TYPE_ATOMIC, TYPE_CONTAINER, TYPE_MARKUP
from . import traversal
from . import factory
from .common import ContainerElement
from . import traversal
class AbstractTable(ContainerElement, traversal.TraversalMixin, dict):
"""
Common code for handling tables as key-value pairs
with metadata elements sprinkled all over.
Assumes input sub_elements are correct.
"""
def __init__(self, sub_elements):
ContainerElement.__init__(self, sub_elements)
self._fallback = None
def _enumerate_items(self):
"""
Returns ((key_index, key_element), (value_index, value_element))
for all the element key-value pairs.
"""
non_metadata = self._enumerate_non_metadata_sub_elements()
while True:
yield next(non_metadata), next(non_metadata)
def items(self):
for (key_i, key), (value_i, value) in self._enumerate_items():
yield key.value, value.value
if self._fallback:
for key, value in self._fallback.items():
yield key, value
def keys(self):
return tuple(key for (key, _) in self.items())
def values(self):
return tuple(value for (_, value) in self.items())
def __len__(self):
return len(tuple(self._enumerate_items()))
def __contains__(self, item):
return item in self.keys()
def _find_key_and_value(self, key):
"""
Returns (key_i, value_i) corresponding to the given key value.
Raises KeyError if no matching key found.
"""
for (key_i, key_element), (value_i, value_element) in self._enumerate_items():
if key_element.value == key:
return key_i, value_i
raise KeyError
def __getitem__(self, item):
for key, value in self.items():
if key == item:
return value
raise KeyError
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def set_fallback(self, fallback):
"""
Sets a fallback dict-like instance to be used to look up values after they are not found
in this instance.
"""
self._fallback = fallback
@property
def primitive_value(self):
"""
Returns a primitive Python value without any formatting or markup metadata.
"""
return {
key:
value.primitive_value if hasattr(value, 'primitive_value') else value for key, value in self.items()
}
from . import factory, traversal
from .common import Element, ContainerElement
from .factory import create_element
from .metadata import NewlineElement
from .errors import InvalidElementError
class ArrayElement(ContainerElement, traversal.TraversalMixin, list):
"""
A sequence-like container element containing other atomic elements or other containers.
Implements list-like interface.
Assumes input sub_elements are correct for an array element.
Raises an InvalidElementError if contains heterogeneous values.
"""
def __init__(self, sub_elements):
super(ArrayElement, self).__init__(sub_elements)
self._check_homogeneity()
def _check_homogeneity(self):
if len(set(type(v) for v in self.primitive_value)) > 1:
raise InvalidElementError('Array should be homogeneous')
def __len__(self):
return len(tuple(self._enumerate_non_metadata_sub_elements()))
def __getitem__(self, i):
"""
Returns the ith entry, which can be a primitive value, a seq-lie, or a dict-like object.
"""
return self._find_value(i)[1].value
def __setitem__(self, i, value):
value_i, _ = self._find_value(i)
new_element = value if isinstance(value, Element) else factory.create_element(value)
self._sub_elements = self.sub_elements[:value_i] + [new_element] + self.sub_elements[value_i+1:]
@property
def value(self):
return self # self is a sequence-like value
@property
def primitive_value(self):
"""
Returns a primitive Python value without any formatting or markup metadata.
"""
return list(
self[i].primitive_value if hasattr(self[i], 'primitive_value')
else self[i]
for i in range(len(self)))
def __str__(self):
return "{}".format(self.primitive_value)
def __repr__(self):
return "Array{}".format(str(self))
def append(self, v):
new_entry = [create_element(v)]
if self: # If not empty, we need a comma and whitespace prefix!
new_entry = [
factory.create_operator_element(','),
factory.create_whitespace_element(),
] + new_entry
insertion_index = self._find_closing_square_bracket()
self._sub_elements = self._sub_elements[:insertion_index] + new_entry + \
self._sub_elements[insertion_index:]
def _find_value(self, i):
"""
Returns (value_index, value) of ith value in this sequence.
Raises IndexError if not found.
"""
return tuple(self._enumerate_non_metadata_sub_elements())[i]
def __delitem__(self, i):
value_i, value = self._find_value(i)
begin, end = value_i, value_i+1
# Rules:
# 1. begin should be index to the preceding comma to the value
# 2. end should be index to the following comma, or the closing bracket
# 3. If no preceding comma found but following comma found then end should be the index of the following value
preceding_comma = self._find_preceding_comma(value_i)
found_preceding_comma = preceding_comma >= 0
if found_preceding_comma:
begin = preceding_comma
following_comma = self._find_following_comma(value_i)
if following_comma >= 0:
if not found_preceding_comma:
end = self._find_following_non_metadata(following_comma)
else:
end = following_comma
else:
end = self._find_following_closing_square_bracket(0)
self._sub_elements = self.sub_elements[:begin] + self._sub_elements[end:]
@property
def is_multiline(self):
return any(isinstance(e, (NewlineElement)) for e in self.elements)
def turn_into_multiline(self):
"""
Turns this array into a multi-line array with each element lying on its own line.
"""
if self.is_multiline:
return
i = self._find_following_comma(-1)
def next_entry_i():
return self._find_following_non_metadata(i)
def next_newline_i():
return self._find_following_newline(i)
def next_closing_bracket_i():
return self._find_following_closing_square_bracket(i)
def next_comma_i():
return self._find_following_comma(i)
while i < len(self.elements)-1:
if next_newline_i() < next_entry_i():
self.elements.insert(i+1, factory.create_newline_element())
if float('-inf') < next_comma_i() < next_closing_bracket_i():
i = next_comma_i()
else:
i = next_closing_bracket_i()
from ..tokens import py2toml, toml2py
from ..util import is_dict_like, is_sequence_like
from . import common
from .errors import InvalidElementError
class AtomicElement(common.TokenElement):
"""
An element containing a sequence of tokens representing a single atomic value that can be updated in place.
Raises:
InvalidElementError: when passed an invalid sequence of tokens.
"""
def __init__(self, _tokens):
common.TokenElement.__init__(self, _tokens, common.TYPE_ATOMIC)
def _validate_tokens(self, _tokens):
if len([token for token in _tokens if not token.type.is_metadata]) != 1:
raise InvalidElementError('Tokens making up an AtomicElement must contain only one non-metadata token')
def serialized(self):
return ''.join(token.source_substring for token in self.tokens)
def _value_token_index(self):
"""
Finds the token where the value is stored.
"""
# TODO: memoize this value
for i, token in enumerate(self.tokens):
if not token.type.is_metadata:
return i
raise RuntimeError('could not find a value token')
@property
def value(self):
"""
Returns a Python value contained in this atomic element.
"""
return toml2py.deserialize(self._tokens[self._value_token_index()])
@property
def primitive_value(self):
return self.value
def set(self, value):
"""
Sets the contained value to the given one.
"""
assert (not is_sequence_like(value)) and (not is_dict_like(value)), 'the value must be an atomic primitive'
token_index = self._value_token_index()
self._tokens[token_index] = py2toml.create_primitive_token(value)
from abc import abstractmethod
TYPE_METADATA = 'element-metadata'
TYPE_ATOMIC = 'element-atomic'
TYPE_CONTAINER = 'element-container'
TYPE_MARKUP = 'element-markup'
class Element:
"""
An Element:
- is one or more Token instances, or one or more other Element instances. Not both.
- knows how to serialize its value back to valid TOML code.
A non-metadata Element is an Element that:
- knows how to deserialize its content into usable Python primitive, seq-like, or dict-like value.
- knows how to update its content from a Python primitive, seq-like, or dict-like value
while maintaining its formatting.
"""
def __init__(self, _type):
self._type = _type
@property
def type(self):
return self._type
@abstractmethod
def serialized(self):
"""
TOML serialization of this element as str.
"""
raise NotImplementedError
class TokenElement(Element):
"""
An Element made up of tokens
"""
def __init__(self, _tokens, _type):
Element.__init__(self, _type)
self._validate_tokens(_tokens)
self._tokens = list(_tokens)
@property
def tokens(self):
return self._tokens
@property
def first_token(self):
return self._tokens[0]
@abstractmethod
def _validate_tokens(self, _tokens):
raise NotImplementedError
def serialized(self):
return ''.join(token.source_substring for token in self._tokens)
def __repr__(self):
return repr(self.tokens)
@property
def primitive_value(self):
"""
Returns a primitive Python value without any formatting or markup metadata.
"""
raise NotImplementedError
class ContainerElement(Element):
"""
An Element containing exclusively other elements.
"""
def __init__(self, sub_elements):
Element.__init__(self, TYPE_CONTAINER)
self._sub_elements = list(sub_elements)
@property
def sub_elements(self):
return self._sub_elements
@property
def elements(self):
return self.sub_elements
def serialized(self):
return ''.join(element.serialized() for element in self.sub_elements)
def __eq__(self, other):
return self.primitive_value == other
def __repr__(self):
return repr(self.primitive_value)
@property
def primitive_value(self):
"""
Returns a primitive Python value without any formatting or markup metadata.
"""
raise NotImplementedError
class InvalidElementError(Exception):
"""
Raised by Element factories when the given sequence of tokens or sub-elements are invalid for the
specific type of Element being created.
"""
def __init__(self, message):
self.message = message
def __repr__(self):
return "InvalidElementError: {}".format(self.message)
import datetime
import six
from .. import tokens
from ..tokens import py2toml
from ..util import join_with
from .atomic import AtomicElement
from .metadata import PunctuationElement, WhitespaceElement, NewlineElement
from .tableheader import TableHeaderElement
def create_element(value, multiline_strings_allowed=True):
"""
Creates and returns the appropriate elements.Element instance from the given Python primitive, sequence-like,
or dict-like value.
"""
from .array import ArrayElement
if isinstance(value, (int, float, bool, datetime.datetime, datetime.date) + six.string_types) or value is None:
primitive_token = py2toml.create_primitive_token(value, multiline_strings_allowed=multiline_strings_allowed)
return AtomicElement((primitive_token,))
elif isinstance(value, (list, tuple)):
preamble = [create_operator_element('[')]
postable = [create_operator_element(']')]
stuffing_elements = [create_element(v) for v in value]
spaced_stuffing = join_with(stuffing_elements,
separator=[create_operator_element(','), create_whitespace_element()])
return ArrayElement(preamble + spaced_stuffing + postable)
elif isinstance(value, dict):
return create_inline_table(value, multiline_table=False, multiline_strings_allowed=multiline_strings_allowed)
else:
raise RuntimeError('Value type unaccounted for: {} of type {}'.format(value, type(value)))
def create_inline_table(from_dict, multiline_table=False, multiline_strings_allowed=True):
"""
Creates an InlineTable element from the given dict instance.
"""
from .inlinetable import InlineTableElement
preamble = [create_operator_element('{')]
postable = [create_operator_element('}')]
stuffing_elements = (
(
create_string_element(k, bare_allowed=True),
create_whitespace_element(),
create_operator_element('='),
create_whitespace_element(),
create_element(v, multiline_strings_allowed=False)
) for (k, v) in from_dict.items())
pair_separator = [create_operator_element(','),
create_newline_element() if multiline_table else create_whitespace_element()]
spaced_elements = join_with(stuffing_elements, separator=pair_separator)
return InlineTableElement(preamble + spaced_elements + postable)
def create_string_element(value, bare_allowed=False):
"""
Creates and returns an AtomicElement wrapping a string value.
"""
return AtomicElement((py2toml.create_string_token(value, bare_allowed),))
def create_operator_element(operator):
"""
Creates a PunctuationElement instance containing an operator token of the specified type. The operator
should be a TOML source str.
"""
operator_type_map = {
',': tokens.TYPE_OP_COMMA,
'=': tokens.TYPE_OP_ASSIGNMENT,
'[': tokens.TYPE_OP_SQUARE_LEFT_BRACKET,
']': tokens.TYPE_OP_SQUARE_RIGHT_BRACKET,
'[[': tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET,
']]': tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET,
'{': tokens.TYPE_OP_CURLY_LEFT_BRACKET,
'}': tokens.TYPE_OP_CURLY_RIGHT_BRACKET,
}
ts = (tokens.Token(operator_type_map[operator], operator),)
return PunctuationElement(ts)
def create_newline_element():
"""
Creates and returns a single NewlineElement.
"""
ts = (tokens.Token(tokens.TYPE_NEWLINE, '\n'),)
return NewlineElement(ts)
def create_whitespace_element(length=1, char=' '):
"""
Creates and returns a WhitespaceElement containing spaces.
"""
ts = (tokens.Token(tokens.TYPE_WHITESPACE, char),) * length
return WhitespaceElement(ts)
def create_table_header_element(names):
name_elements = []
if isinstance(names, six.string_types):
name_elements = [py2toml.create_string_token(names, bare_string_allowed=True)]
else:
for (i, name) in enumerate(names):
name_elements.append(py2toml.create_string_token(name, bare_string_allowed=True))
if i < (len(names)-1):
name_elements.append(py2toml.operator_token(tokens.TYPE_OPT_DOT))
return TableHeaderElement(
[py2toml.operator_token(tokens.TYPE_OP_SQUARE_LEFT_BRACKET)] + name_elements +
[py2toml.operator_token(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET), py2toml.operator_token(tokens.TYPE_NEWLINE)],
)
def create_array_of_tables_header_element(name):
return TableHeaderElement((
py2toml.operator_token(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET),
py2toml.create_string_token(name, bare_string_allowed=True),
py2toml.operator_token(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET),
py2toml.operator_token(tokens.TYPE_NEWLINE),
))
def create_table(dict_value):
"""
Creates a TableElement out of a dict instance.
"""
from .table import TableElement
if not isinstance(dict_value, dict):
raise ValueError('input must be a dict instance.')
table_element = TableElement([create_newline_element()])
for k, v in dict_value.items():
table_element[k] = create_element(v)
return table_element
def create_multiline_string(text, maximum_line_length):
return AtomicElement(_tokens=[py2toml.create_multiline_string(text, maximum_line_length)])
from . import factory, abstracttable
from .common import Element
class InlineTableElement(abstracttable.AbstractTable):
"""
An Element containing key-value pairs, representing an inline table.
Implements dict-like interface.
Assumes input sub_elements are correct for an inline table element.
"""
def __init__(self, sub_elements):
abstracttable.AbstractTable.__init__(self, sub_elements)
def __setitem__(self, key, value):
new_element = value if isinstance(value, Element) else factory.create_element(value)
try:
key_i, value_i = self._find_key_and_value(key)
# Found, then replace the value element with a new one
self._sub_elements = self.sub_elements[:value_i] + [new_element] + self.sub_elements[value_i+1:]
except KeyError: # Key does not exist, adding anew!
new_entry = [
factory.create_string_element(key, bare_allowed=True),
factory.create_whitespace_element(),
factory.create_operator_element('='),
factory.create_whitespace_element(),
new_element,
]
if self: # If not empty
new_entry = [
factory.create_operator_element(','),
factory.create_whitespace_element(),
] + new_entry
insertion_index = self._find_closing_curly_bracket()
self._sub_elements = self.sub_elements[:insertion_index] + new_entry + self.sub_elements[insertion_index:]
def __delitem__(self, key):
key_i, value_i = self._find_key_and_value(key)
begin, end = key_i, value_i+1
# Rules:
# 1. begin should be index to the preceding comma to the key
# 2. end should be index to the following comma, or the closing bracket
# 3. If no preceding comma found but following comma found then end should be the index of the following key
preceding_comma = self._find_preceding_comma(begin)
found_preceding_comma = preceding_comma >= 0
if found_preceding_comma:
begin = preceding_comma
following_comma = self._find_following_comma(value_i)
if following_comma >= 0:
if not found_preceding_comma:
end = self._find_following_non_metadata(following_comma)
else:
end = following_comma
else:
end = self._find_closing_curly_bracket()
self._sub_elements = self.sub_elements[:begin] + self.sub_elements[end:]
def multiline_equivalent(self):
return factory.create_inline_table(self.primitive_value, multiline_table=True, multiline_strings_allowed=True)
@property
def value(self):
return self # self is a dict-like value that is perfectly usable
from .. import tokens
from . import common
from .errors import InvalidElementError
class WhitespaceElement(common.TokenElement):
"""
An element that contains tokens of whitespace
"""
def __init__(self, _tokens):
common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA)
def _validate_tokens(self, _tokens):
for token in _tokens:
if token.type != tokens.TYPE_WHITESPACE:
raise InvalidElementError('Tokens making up a WhitespaceElement must all be whitespace')
@property
def length(self):
"""
The whitespace length of this element
"""
return len(self.tokens)
class NewlineElement(common.TokenElement):
"""
An element containing newline tokens
Raises:
InvalidElementError: when passed an invalid sequence of tokens.
"""
def __init__(self, _tokens):
common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA)
def _validate_tokens(self, _tokens):
for token in _tokens:
if token.type != tokens.TYPE_NEWLINE:
raise InvalidElementError('Tokens making a NewlineElement must all be newlines')
class CommentElement(common.TokenElement):
"""
An element containing a single comment token followed by a newline.
Raises:
InvalidElementError: when passed an invalid sequence of tokens.
"""
def __init__(self, _tokens):
common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA)
def _validate_tokens(self, _tokens):
if len(_tokens) != 2 or _tokens[0].type != tokens.TYPE_COMMENT or _tokens[1].type != tokens.TYPE_NEWLINE:
raise InvalidElementError('CommentElement needs one comment token followed by one newline token')
class PunctuationElement(common.TokenElement):
"""
An element containing a single punctuation token.
Raises:
InvalidElementError: when passed an invalid sequence of tokens.
"""
def __init__(self, _tokens):
common.TokenElement.__init__(self, _tokens, common.TYPE_METADATA)
@property
def token(self):
"""
Returns the token contained in this Element.
"""
return self.tokens[0]
def _validate_tokens(self, _tokens):
if not _tokens or not tokens.is_operator(_tokens[0]):
raise InvalidElementError('PunctuationElement must be made of only a single operator token')
from . import abstracttable, common, factory
from .errors import InvalidElementError
from .common import Element
from .metadata import CommentElement, NewlineElement, WhitespaceElement
class TableElement(abstracttable.AbstractTable):
"""
An Element containing an unnamed top-level table.
Implements dict-like interface.
Assumes input sub_elements are correct.
Raises InvalidElementError on duplicate keys.
"""
def __init__(self, sub_elements):
abstracttable.AbstractTable.__init__(self, sub_elements)
self._check_for_duplicate_keys()
def _check_for_duplicate_keys(self):
if len(set(self.keys())) < len(self.keys()):
raise InvalidElementError('Duplicate keys found')
def __setitem__(self, key, value):
if key in self:
self._update(key, value)
else:
self._insert(key, value)
def _update(self, key, value):
_, value_i = self._find_key_and_value(key)
self._sub_elements[value_i] = value if isinstance(value, Element) else factory.create_element(value)
def _find_insertion_index(self):
"""
Returns the self.sub_elements index in which new entries should be inserted.
"""
non_metadata_elements = tuple(self._enumerate_non_metadata_sub_elements())
if not non_metadata_elements:
return 0
last_entry_i = non_metadata_elements[-1][0]
following_newline_i = self._find_following_line_terminator(last_entry_i)
return following_newline_i + 1
def _detect_indentation_size(self):
"""
Detects the level of indentation used in this table.
"""
def lines():
# Returns a sequence of sequences of elements belonging to each line
start = 0
for i, element in enumerate(self.elements):
if isinstance(element, (CommentElement, NewlineElement)):
yield self.elements[start:i+1]
start = i+1
def indentation(line):
# Counts the number of whitespace tokens at the beginning of this line
try:
first_non_whitespace_i = next(i for (i, e) in enumerate(line) if not isinstance(e, WhitespaceElement))
return sum(space.length for space in line[:first_non_whitespace_i])
except StopIteration:
return 0
def is_empty_line(line):
return all(e.type == common.TYPE_METADATA for e in line)
try:
return min(indentation(line) for line in lines() if len(line) > 1 and not is_empty_line(line))
except ValueError: # Raised by ValueError when no matching lines found
return 0
def _insert(self, key, value):
value_element = value if isinstance(value, Element) else factory.create_element(value)
indentation_size = self._detect_indentation_size()
indentation = [factory.create_whitespace_element(self._detect_indentation_size())] if indentation_size else []
inserted_elements = indentation + [
factory.create_string_element(key, bare_allowed=True),
factory.create_whitespace_element(),
factory.create_operator_element('='),
factory.create_whitespace_element(),
value_element,
factory.create_newline_element(),
]
insertion_index = self._find_insertion_index()
self._sub_elements = \
self.sub_elements[:insertion_index] + inserted_elements + self.sub_elements[insertion_index:]
def __delitem__(self, key):
begin, _ = self._find_key_and_value(key)
preceding_newline = self._find_preceding_newline(begin)
if preceding_newline >= 0:
begin = preceding_newline
end = self._find_following_newline(begin)
if end < 0:
end = len(tuple(self._sub_elements))
self._sub_elements = self.sub_elements[:begin] + self.sub_elements[end:]
def value(self):
return self
def __iter__(self):
return iter(self.keys())
def __str__(self):
return str(self.primitive_value)
from .. import tokens
from ..tokens import toml2py
from . import common
from .common import TokenElement
from .errors import InvalidElementError
_opening_bracket_types = (tokens.TYPE_OP_SQUARE_LEFT_BRACKET, tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET)
_closing_bracket_types = (tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET)
_name_types = (
tokens.TYPE_BARE_STRING,
tokens.TYPE_LITERAL_STRING,
tokens.TYPE_STRING,
)
class TableHeaderElement(TokenElement):
"""
An element containing opening and closing single and double square brackets, strings and dots and ending with
a newline.
Raises InvalidElementError.
"""
def __init__(self, _tokens):
TokenElement.__init__(self, _tokens, common.TYPE_MARKUP)
self._names = tuple(toml2py.deserialize(token) for token in self._tokens if token.type in _name_types)
@property
def is_array_of_tables(self):
opening_bracket = next(token for i, token in enumerate(self._tokens) if token.type in _opening_bracket_types)
return opening_bracket.type == tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET
@property
def names(self):
"""
Returns a sequence of string names making up this table header name.
"""
return self._names
def has_name_prefix(self, names):
"""
Returns True if the header names is prefixed by the given sequence of names.
"""
for i, name in enumerate(names):
if self.names[i] != name:
return False
return True
def serialized(self):
return ''.join(token.source_substring for token in self._tokens)
def is_named(self, names):
"""
Returns True if the given name sequence matches the full name of this header.
"""
return tuple(names) == self.names
def _validate_tokens(self, _tokens):
opening_bracket_i = next((i for i, token in enumerate(_tokens)
if token.type in _opening_bracket_types), float('-inf'))
if opening_bracket_i < 0:
raise InvalidElementError('Expected an opening bracket')
_tokens = _tokens[opening_bracket_i+1:]
first_name_i = next((i for i, token in enumerate(_tokens) if token.type in _name_types), float('-inf'))
if first_name_i < 0:
raise InvalidElementError('Expected a table header name')
_tokens = _tokens[first_name_i+1:]
while True:
next_dot_i = next((i for i, token in enumerate(_tokens) if token.type == tokens.TYPE_OPT_DOT),
float('-inf'))
if next_dot_i < 0:
break
_tokens = _tokens[next_dot_i+1:]
next_name_i = next((i for i, token in enumerate(_tokens) if token.type in _name_types), float('-inf'))
if next_name_i < 0:
raise InvalidElementError('Expected a name after the dot')
_tokens = _tokens[next_name_i+1:]
closing_bracket_i = next((i for i, token in enumerate(_tokens) if token.type in _closing_bracket_types),
float('-inf'))
if closing_bracket_i < 0:
raise InvalidElementError('Expected a closing bracket')
if _tokens[-1].type != tokens.TYPE_NEWLINE:
raise InvalidElementError('Must end with a newline')
from ...tokens import TYPE_OP_COMMA
from ...tokens import TYPE_OP_CURLY_RIGHT_BRACKET
from ..common import TYPE_METADATA
from ..metadata import PunctuationElement, NewlineElement
from . import predicates
class TraversalMixin:
"""
A mix-in that provides convenient sub-element traversal to any class with
an `elements` member that is a sequence of Element instances
"""
def __find_following_element(self, index, predicate):
"""
Finds and returns the index of element in self.elements that evaluates the given predicate to True
and whose index is higher than the given index, or returns -Infinity on failure.
"""
return find_following(self.elements, predicate, index)
def __find_preceding_element(self, index, predicate):
"""
Finds and returns the index of the element in self.elements that evaluates the given predicate to True
and whose index is lower than the given index.
"""
i = find_previous(self.elements, predicate, index)
if i == float('inf'):
return float('-inf')
return i
def __must_find_following_element(self, predicate):
"""
Finds and returns the index to the element in self.elements that evaluatest the predicate to True, or raises
an error.
"""
i = self.__find_following_element(-1, predicate)
if i < 0:
raise RuntimeError('Could not find non-optional element')
return i
def _enumerate_non_metadata_sub_elements(self):
"""
Returns a sequence of of (index, sub_element) of the non-metadata sub-elements.
"""
return ((i, element) for i, element in enumerate(self.elements) if element.type != TYPE_METADATA)
def _find_preceding_comma(self, index):
"""
Returns the index of the preceding comma element to the given index, or -Infinity.
"""
return self.__find_preceding_element(index, predicates.op_comma)
def _find_following_comma(self, index):
"""
Returns the index of the following comma element after the given index, or -Infinity.
"""
def predicate(element):
return isinstance(element, PunctuationElement) and element.token.type == TYPE_OP_COMMA
return self.__find_following_element(index, predicate)
def _find_following_newline(self, index):
"""
Returns the index of the following newline element after the given index, or -Infinity.
"""
return self.__find_following_element(index, lambda e: isinstance(e, NewlineElement))
def _find_following_comment(self, index):
"""
Returns the index of the following comment element after the given index, or -Infinity.
"""
return self.__find_following_element(index, predicates.comment)
def _find_following_line_terminator(self, index):
"""
Returns the index of the following comment or newline element after the given index, or -Infinity.
"""
following_comment = self._find_following_comment(index)
following_newline = self._find_following_newline(index)
if following_comment == float('-inf'):
return following_newline
if following_newline == float('inf'):
return following_comment
if following_newline < following_comment:
return following_newline
else:
return following_comment
def _find_preceding_newline(self, index):
"""
Returns the index of the preceding newline element to the given index, or -Infinity.
"""
return self.__find_preceding_element(index, predicates.newline)
def _find_following_non_metadata(self, index):
"""
Returns the index to the following non-metadata element after the given index, or -Infinity.
"""
return self.__find_following_element(index, predicates.non_metadata)
def _find_closing_square_bracket(self):
"""
Returns the index to the closing square bracket, or raises an Error.
"""
return self.__must_find_following_element(predicates.closing_square_bracket)
def _find_following_opening_square_bracket(self, index):
"""
Returns the index to the opening square bracket, or -Infinity.
"""
return self.__find_following_element(index, predicates.opening_square_bracket)
def _find_following_closing_square_bracket(self, index):
"""
Returns the index to the closing square bracket, or -Infinity.
"""
return self.__find_following_element(index, predicates.closing_square_bracket)
def _find_following_table(self, index):
"""
Returns the index to the next TableElement after the specified index, or -Infinity.
"""
return self.__find_following_element(index, predicates.table)
def _find_preceding_table(self, index):
"""
Returns the index to the preceding TableElement to the specified index, or -Infinity.
"""
return self.__find_preceding_element(index,predicates.table)
def _find_closing_curly_bracket(self):
"""
Returns the index to the closing curly bracket, or raises an Error.
"""
def predicate(element):
return isinstance(element, PunctuationElement) and element.token.type == TYPE_OP_CURLY_RIGHT_BRACKET
return self.__must_find_following_element(predicate)
def _find_following_table_header(self, index):
"""
Returns the index to the table header after the given element index, or -Infinity.
"""
return self.__find_following_element(index, predicates.table_header)
def find_following(element_seq, predicate, index=None):
"""
Finds and returns the index of the next element fulfilling the specified predicate after the specified
index, or -Infinity.
Starts searching linearly from the start_from index.
"""
if isinstance(index, (int, float)) and index < 0:
index = None
for i, element in tuple(enumerate(element_seq))[index+1 if index is not None else index:]:
if predicate(element):
return i
return float('-inf')
def find_previous(element_seq, predicate, index=None):
"""
Finds and returns the index of the previous element fulfilling the specified predicate preceding to the specified
index, or Infinity.
"""
if isinstance(index, (int, float)) and index >= len(element_seq):
index = None
for i, element in reversed(tuple(enumerate(element_seq))[:index]):
if predicate(element):
return i
return float('inf')
"""
The following predicates can be used in the traversal functions directly.
"""
from ...tokens import TYPE_OP_ASSIGNMENT
from ...tokens import TYPE_OP_COMMA
from ...tokens import TYPE_OP_SQUARE_LEFT_BRACKET
from ...tokens import TYPE_OP_SQUARE_RIGHT_BRACKET
from ..atomic import AtomicElement
from ..metadata import PunctuationElement, CommentElement, NewlineElement, WhitespaceElement
from .. import common
atomic = lambda e: isinstance(e, AtomicElement)
op_assignment = lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_ASSIGNMENT
op_comma = lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_COMMA
comment = lambda e: isinstance(e, CommentElement)
newline = lambda e: isinstance(e, NewlineElement)
non_metadata = lambda e: e.type != common.TYPE_METADATA
closing_square_bracket = \
lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_SQUARE_RIGHT_BRACKET
opening_square_bracket = \
lambda e: isinstance(e, PunctuationElement) and e.token.type == TYPE_OP_SQUARE_LEFT_BRACKET
def table(e):
from ..table import TableElement
return isinstance(e, TableElement)
def table_header(e):
from ..tableheader import TableHeaderElement
return isinstance(e, TableHeaderElement)
whitespace = lambda e: isinstance(e, WhitespaceElement)
class TOMLError(Exception):
"""
All errors raised by this module are descendants of this type.
"""
class InvalidTOMLFileError(TOMLError):
pass
class NoArrayFoundError(TOMLError):
"""
An array of tables was requested but none exist by the given name.
"""
class InvalidValueError(TOMLError):
pass
class DuplicateKeysError(TOMLError):
"""
Duplicate keys detected in the parsed file.
"""
class DuplicateTablesError(TOMLError):
"""
Duplicate tables detected in the parsed file.
"""
"""
A regular expression based Lexer/tokenizer for TOML.
"""
from collections import namedtuple
import re
from .. import tokens
from ..errors import TOMLError
TokenSpec = namedtuple('TokenSpec', ('type', 're'))
# Specs of all the valid tokens
_LEXICAL_SPECS = (
TokenSpec(tokens.TYPE_COMMENT, re.compile(r'^(#.*)\n')),
TokenSpec(tokens.TYPE_STRING, re.compile(r'^("(([^"]|\\")+?[^\\]|([^"]|\\")|)")')), # Single line only
TokenSpec(tokens.TYPE_MULTILINE_STRING, re.compile(r'^(""".*?""")', re.DOTALL)),
TokenSpec(tokens.TYPE_LITERAL_STRING, re.compile(r"^('.*?')")),
TokenSpec(tokens.TYPE_MULTILINE_LITERAL_STRING, re.compile(r"^('''.*?''')", re.DOTALL)),
TokenSpec(tokens.TYPE_BARE_STRING, re.compile(r'^([A-Za-z0-9_-]+)')),
TokenSpec(tokens.TYPE_DATE, re.compile(
r'^([0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]*)?)?(([zZ])|((\+|-)[0-9]{2}:[0-9]{2}))?)')),
TokenSpec(tokens.TYPE_WHITESPACE, re.compile(r'^( |\t)', re.DOTALL)),
TokenSpec(tokens.TYPE_INTEGER, re.compile(r'^(((\+|-)[0-9_]+)|([0-9][0-9_]*))')),
TokenSpec(tokens.TYPE_FLOAT,
re.compile(r'^((((\+|-)[0-9_]+)|([1-9][0-9_]*))(\.[0-9_]+)?([eE](\+|-)?[0-9_]+)?)')),
TokenSpec(tokens.TYPE_BOOLEAN, re.compile(r'^(true|false)')),
TokenSpec(tokens.TYPE_OP_SQUARE_LEFT_BRACKET, re.compile(r'^(\[)')),
TokenSpec(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, re.compile(r'^(\])')),
TokenSpec(tokens.TYPE_OP_CURLY_LEFT_BRACKET, re.compile(r'^(\{)')),
TokenSpec(tokens.TYPE_OP_CURLY_RIGHT_BRACKET, re.compile(r'^(\})')),
TokenSpec(tokens.TYPE_OP_ASSIGNMENT, re.compile(r'^(=)')),
TokenSpec(tokens.TYPE_OP_COMMA, re.compile(r'^(,)')),
TokenSpec(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, re.compile(r'^(\[\[)')),
TokenSpec(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, re.compile(r'^(\]\])')),
TokenSpec(tokens.TYPE_OPT_DOT, re.compile(r'^(\.)')),
TokenSpec(tokens.TYPE_NEWLINE, re.compile('^(\n|\r\n)')),
)
def _next_token_candidates(source):
matches = []
for token_spec in _LEXICAL_SPECS:
match = token_spec.re.search(source)
if match:
matches.append(tokens.Token(token_spec.type, match.group(1)))
return matches
def _choose_from_next_token_candidates(candidates):
if len(candidates) == 1:
return candidates[0]
elif len(candidates) > 1:
# Return the maximal-munch with ties broken by natural order of token type.
maximal_munch_length = max(len(token.source_substring) for token in candidates)
maximal_munches = [token for token in candidates if len(token.source_substring) == maximal_munch_length]
return sorted(maximal_munches)[0] # Return the first in sorting by priority
def _munch_a_token(source):
"""
Munches a single Token instance if it could recognize one at the beginning of the
given source text, or None if no token type could be recognized.
"""
candidates = _next_token_candidates(source)
return _choose_from_next_token_candidates(candidates)
class LexerError(TOMLError):
def __init__(self, message):
self._message = message
def __repr__(self):
return self._message
def __str__(self):
return self._message
def tokenize(source, is_top_level=False):
"""
Tokenizes the input TOML source into a stream of tokens.
If is_top_level is set to True, will make sure that the input source has a trailing newline character
before it is tokenized.
Raises a LexerError when it fails recognize another token while not at the end of the source.
"""
# Newlines are going to be normalized to UNIX newlines.
source = source.replace('\r\n', '\n')
if is_top_level and source and source[-1] != '\n':
source += '\n'
next_row = 1
next_col = 1
next_index = 0
while next_index < len(source):
new_token = _munch_a_token(source[next_index:])
if not new_token:
raise LexerError("failed to read the next token at ({}, {}): {}".format(
next_row, next_col, source[next_index:]))
# Set the col and row on the new token
new_token = tokens.Token(new_token.type, new_token.source_substring, next_col, next_row)
# Advance the index, row and col count
next_index += len(new_token.source_substring)
for c in new_token.source_substring:
if c == '\n':
next_row += 1
next_col = 1
else:
next_col += 1
yield new_token
"""
A parser for TOML tokens into TOML elements.
"""
from .elementsanitizer import sanitize
from .errors import ParsingError
from .parser import toml_file_elements
from .tokenstream import TokenStream
def parse_tokens(tokens):
"""
Parses the given token sequence into a sequence of top-level TOML elements.
Raises ParserError on invalid TOML input.
"""
return _parse_token_stream(TokenStream(tokens))
def _parse_token_stream(token_stream):
"""
Parses the given token_stream into a sequence of top-level TOML elements.
Raises ParserError on invalid input TOML.
"""
elements, pending = toml_file_elements(token_stream)
if not pending.at_end:
raise ParsingError('Failed to parse line {}'.format(pending.head.row))
return sanitize(elements)
from ..elements import TYPE_METADATA
from ..elements.table import TableElement
from ..elements.tableheader import TableHeaderElement
from ..errors import InvalidTOMLFileError
from ..util import PeekableIterator
def sanitize(_elements):
"""
Finds TableHeader elements that are not followed by TableBody elements
and inserts empty TableElement right after those.
"""
output = list(_elements)
def find_next_table_header(after=-1):
return next((i for (i, element) in enumerate(output)
if i > after and isinstance(element, TableHeaderElement)), float('-inf'))
def find_next_table_body(after=-1):
return next((i for (i, element) in enumerate(output)
if i > after and isinstance(element, TableElement)), float('-inf'))
next_table_header_i = find_next_table_header()
while next_table_header_i >= 0:
following_table_header_i = find_next_table_header(next_table_header_i)
following_table_body_i = find_next_table_body(next_table_header_i)
if (following_table_body_i < 0) or \
(following_table_header_i >= 0 and (following_table_header_i < following_table_body_i)):
output.insert(next_table_header_i+1, TableElement(tuple()))
next_table_header_i = find_next_table_header(next_table_header_i)
return output
def validate_sanitized(_elements):
# Non-metadata elements must start with an optional TableElement,
# followed by zero or more (TableHeaderElement, TableElement) pairs.
if not _elements:
return
it = PeekableIterator(e for e in _elements if e.type != TYPE_METADATA)
if isinstance(it.peek(), TableElement):
it.next()
while it.peek():
if not isinstance(it.peek(), TableHeaderElement):
raise InvalidTOMLFileError
it.next()
if not isinstance(it.peek(), TableElement):
raise InvalidTOMLFileError
it.next()
from ..errors import TOMLError
class ParsingError(TOMLError):
def __init__(self, message='', token=None):
self.message = message
self.token = token
def __repr__(self):
if self.message and self.token:
return "{} at row {} and col {}".format(
self.message, self.token.row, self.token.col
)
else:
return self.message
def __str__(self):
return repr(self)
from ..elements.array import ArrayElement
from .errors import ParsingError
from .tokenstream import TokenStream
class Capturer:
"""
Recursive-descent matching DSL. Yeah..
"""
def __init__(self, token_stream, value=tuple(), dormant_error=None):
self._token_stream = token_stream
self._value = value
self._dormant_error = dormant_error
def find(self, finder):
"""
Searches the token stream using the given finder.
`finder(ts)` is a function that accepts a `TokenStream` instance and returns `(element, pending_ts)`
where `element` is the found "something" or a sequence of "somethings", and `pending_ts` the unconsumed
`TokenStream`.
`finder(ts)` can raise `ParsingError` to indicate that it couldn't find anything, or
a `TokenStream.EndOfStream` to indicate a premature end of the TokenStream.
This method returns a Capturer instance that can be further used to find more and more "somethings". The value
at any given moment can be retrieved via the `Capturer.value()` method.
"""
try:
# Execute finder!
element, pending_ts = finder(self._token_stream)
# If result is not a sequence, make it so
if isinstance(element, ArrayElement) or not isinstance(element, (tuple, list)):
element = (element,)
# Return a Capturer with accumulated findings
return Capturer(pending_ts, value=self.value() + element)
except ParsingError as e:
# Failed to find, store error in returned value
return Capturer(self._token_stream, dormant_error=e)
except TokenStream.EndOfStream as e:
# Premature end of stream, store error in returned value
return Capturer(self._token_stream, dormant_error=e)
def value(self, parsing_expectation_msg=None):
"""
Returns the accumulated values found as a sequence of values, or raises an encountered dormant error.
If parsing_expectation_msg is specified and a dormant_error is a ParsingError, the expectation message is used
instead in it.
"""
if self._dormant_error:
if parsing_expectation_msg and isinstance(self._dormant_error, ParsingError):
raise ParsingError(parsing_expectation_msg, token=self._token_stream.head)
else:
raise self._dormant_error
return self._value
@property
def pending_tokens(self):
"""
Returns a TokenStream with the pending tokens yet to be processed.
"""
return self._token_stream
def or_find(self, finder):
"""
If a dormant_error is present, try this new finder instead. If not, does nothing.
"""
if self._dormant_error:
return Capturer(self._token_stream).find(finder)
else:
return self
def or_end_of_file(self):
"""
Discards any errors if at end of the stream.
"""
if isinstance(self._dormant_error, TokenStream.EndOfStream):
return Capturer(self.pending_tokens, value=self._value)
else:
return self
def or_empty(self):
"""
Discards any previously-encountered dormant error.
"""
if self._dormant_error:
return Capturer(self.pending_tokens, value=self._value)
else:
return self
def and_find(self, finder):
"""
Accumulate new "somethings" to the stored value using the given finder.
"""
if self._dormant_error:
return Capturer(self.pending_tokens, dormant_error=self._dormant_error)
return Capturer(self.pending_tokens, self.value()).find(finder)
def capture_from(token_stream):
return Capturer(token_stream)
class TokenStream:
"""
An immutable subset of a token sequence
"""
class EndOfStream(Exception):
pass
Nothing = tuple()
def __init__(self, _tokens, offset=0):
if isinstance(_tokens, tuple):
self._tokens = _tokens
else:
self._tokens = tuple(_tokens)
self._head_index = offset
def __len__(self):
return len(self._tokens) - self.offset
@property
def head(self):
try:
return self._tokens[self._head_index]
except IndexError:
raise TokenStream.EndOfStream
@property
def tail(self):
return TokenStream(self._tokens, offset=self._head_index+1)
@property
def offset(self):
return self._head_index
@property
def at_end(self):
return self.offset >= len(self._tokens)
"""
TOML lexical tokens.
"""
class TokenType:
"""
A TokenType is a concrete type of a source token along with a defined priority and a higher-order kind.
The priority will be used in determining the tokenization behaviour of the lexer in the following manner:
whenever more than one token is recognizable as the next possible token and they are all of equal source
length, this priority is going to be used to break the tie by favoring the token type of the lowest priority
value. A TokenType instance is naturally ordered by its priority.
"""
def __init__(self, name, priority, is_metadata):
self._priority = priority
self._name = name
self._is_metadata = is_metadata
@property
def is_metadata(self):
return self._is_metadata
@property
def priority(self):
return self._priority
def __repr__(self):
return "{}-{}".format(self.priority, self._name)
def __lt__(self, other):
return isinstance(other, TokenType) and self._priority < other.priority
# Possible types of tokens
TYPE_BOOLEAN = TokenType('boolean', 0, is_metadata=False)
TYPE_INTEGER = TokenType('integer', 0, is_metadata=False)
TYPE_OP_COMMA = TokenType('comma', 0, is_metadata=True)
TYPE_OP_SQUARE_LEFT_BRACKET = TokenType('square_left_bracket', 0, is_metadata=True)
TYPE_OP_SQUARE_RIGHT_BRACKET = TokenType('square_right_bracket', 0, is_metadata=True)
TYPE_OP_CURLY_LEFT_BRACKET = TokenType('curly_left_bracket', 0, is_metadata=True)
TYPE_OP_CURLY_RIGHT_BRACKET = TokenType('curly_right_bracket', 0, is_metadata=True)
TYPE_OP_ASSIGNMENT = TokenType('assignment', 0, is_metadata=True)
TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET = TokenType('double_square_left_bracket', 0, is_metadata=True)
TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET = TokenType('double_square_right_bracket', 0, is_metadata=True)
TYPE_FLOAT = TokenType('float', 1, is_metadata=False)
TYPE_DATE = TokenType('date', 40, is_metadata=False)
TYPE_OPT_DOT = TokenType('dot', 40, is_metadata=True)
TYPE_BARE_STRING = TokenType('bare_string', 50, is_metadata=False)
TYPE_STRING = TokenType('string', 90, is_metadata=False)
TYPE_MULTILINE_STRING = TokenType('multiline_string', 90, is_metadata=False)
TYPE_LITERAL_STRING = TokenType('literal_string', 90, is_metadata=False)
TYPE_MULTILINE_LITERAL_STRING = TokenType('multiline_literal_string', 90, is_metadata=False)
TYPE_NEWLINE = TokenType('newline', 91, is_metadata=True)
TYPE_WHITESPACE = TokenType('whitespace', 93, is_metadata=True)
TYPE_COMMENT = TokenType('comment', 95, is_metadata=True)
def is_operator(token):
"""
Returns True if the given token is an operator token.
"""
return token.type in (
TYPE_OP_COMMA,
TYPE_OP_SQUARE_LEFT_BRACKET,
TYPE_OP_SQUARE_RIGHT_BRACKET,
TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET,
TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET,
TYPE_OP_CURLY_LEFT_BRACKET,
TYPE_OP_CURLY_RIGHT_BRACKET,
TYPE_OP_ASSIGNMENT,
TYPE_OPT_DOT,
)
def is_string(token):
return token.type in (
TYPE_STRING,
TYPE_MULTILINE_STRING,
TYPE_LITERAL_STRING,
TYPE_BARE_STRING,
TYPE_MULTILINE_LITERAL_STRING
)
class Token:
"""
A token/lexeme in a TOML source file.
A Token instance is naturally ordered by its type.
"""
def __init__(self, _type, source_substring, col=None, row=None):
self._source_substring = source_substring
self._type = _type
self._col = col
self._row = row
def __eq__(self, other):
if not isinstance(other, Token):
return False
return self.source_substring == other.source_substring and self.type == other.type
@property
def col(self):
"""
Column number (1-indexed).
"""
return self._col
@property
def row(self):
"""
Row number (1-indexed).
"""
return self._row
@property
def type(self):
"""
One of of the TOKEN_TYPE_* constants.
"""
return self._type
@property
def source_substring(self):
"""
The substring of the initial source file containing this token.
"""
return self._source_substring
def __lt__(self, other):
return isinstance(other, Token) and self.type < other.type
def __repr__(self):
return "{}: {}".format(self.type, self.source_substring)
from ..errors import TOMLError
class DeserializationError(TOMLError):
pass
class BadEscapeCharacter(TOMLError):
pass
class MalformedDateError(DeserializationError):
pass
"""
A converter of python values to TOML Token instances.
"""
import codecs
import datetime
import six
import re
from .. import tokens
from ..errors import TOMLError
from ..tokens import Token
from ..util import chunkate_string
class NotPrimitiveError(TOMLError):
pass
_operator_tokens_by_type = {
tokens.TYPE_OP_SQUARE_LEFT_BRACKET: tokens.Token(tokens.TYPE_OP_SQUARE_LEFT_BRACKET, u'['),
tokens.TYPE_OP_SQUARE_RIGHT_BRACKET: tokens.Token(tokens.TYPE_OP_SQUARE_RIGHT_BRACKET, u']'),
tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET: tokens.Token(tokens.TYPE_OP_DOUBLE_SQUARE_LEFT_BRACKET, u'[['),
tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET: tokens.Token(tokens.TYPE_OP_DOUBLE_SQUARE_RIGHT_BRACKET, u']]'),
tokens.TYPE_OP_COMMA: tokens.Token(tokens.TYPE_OP_COMMA, u','),
tokens.TYPE_NEWLINE: tokens.Token(tokens.TYPE_NEWLINE, u'\n'),
tokens.TYPE_OPT_DOT: tokens.Token(tokens.TYPE_OPT_DOT, u'.'),
}
def operator_token(token_type):
return _operator_tokens_by_type[token_type]
def create_primitive_token(value, multiline_strings_allowed=True):
"""
Creates and returns a single token for the given primitive atomic value.
Raises NotPrimitiveError when the given value is not a primitive atomic value
"""
if value is None:
return create_primitive_token('')
elif isinstance(value, bool):
return tokens.Token(tokens.TYPE_BOOLEAN, u'true' if value else u'false')
elif isinstance(value, int):
return tokens.Token(tokens.TYPE_INTEGER, u'{}'.format(value))
elif isinstance(value, float):
return tokens.Token(tokens.TYPE_FLOAT, u'{}'.format(value))
elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
return tokens.Token(tokens.TYPE_DATE, strict_rfc3339.timestamp_to_rfc3339_utcoffset(ts))
elif isinstance(value, six.string_types):
return create_string_token(value, multiline_strings_allowed=multiline_strings_allowed)
raise NotPrimitiveError("{} of type {}".format(value, type(value)))
_bare_string_regex = re.compile('^[a-zA-Z0-9_-]*$')
def create_string_token(text, bare_string_allowed=False, multiline_strings_allowed=True):
"""
Creates and returns a single string token.
Raises ValueError on non-string input.
"""
if not isinstance(text, six.string_types):
raise ValueError('Given value must be a string')
if text == '':
return tokens.Token(tokens.TYPE_STRING, '""'.format(_escape_single_line_quoted_string(text)))
elif bare_string_allowed and _bare_string_regex.match(text):
return tokens.Token(tokens.TYPE_BARE_STRING, text)
elif multiline_strings_allowed and (len(tuple(c for c in text if c == '\n')) >= 2 or len(text) > 80):
# If containing two or more newlines or is longer than 80 characters we'll use the multiline string format
return _create_multiline_string_token(text)
else:
return tokens.Token(tokens.TYPE_STRING, '"{}"'.format(_escape_single_line_quoted_string(text)))
def _escape_single_line_quoted_string(text):
if six.PY2:
return text.encode('unicode-escape').encode('string-escape').replace('"', '\\"').replace("\\'", "'")
else:
return codecs.encode(text, 'unicode-escape').decode().replace('"', '\\"')
def _create_multiline_string_token(text):
escaped = text.replace(u'"""', u'\"\"\"')
if len(escaped) > 50:
return tokens.Token(tokens.TYPE_MULTILINE_STRING, u'"""\n{}\\\n"""'.format(_break_long_text(escaped)))
else:
return tokens.Token(tokens.TYPE_MULTILINE_STRING, u'"""{}"""'.format(escaped))
def _break_long_text(text, maximum_length=75):
"""
Breaks into lines of 75 character maximum length that are terminated by a backslash.
"""
def next_line(remaining_text):
# Returns a line and the remaining text
if '\n' in remaining_text and remaining_text.index('\n') < maximum_length:
i = remaining_text.index('\n')
return remaining_text[:i+1], remaining_text[i+2:]
elif len(remaining_text) > maximum_length and ' ' in remaining_text:
i = remaining_text[:maximum_length].rfind(' ')
return remaining_text[:i+1] + '\\\n', remaining_text[i+2:]
else:
return remaining_text, ''
remaining_text = text
lines = []
while remaining_text:
line, remaining_text = next_line(remaining_text)
lines += [line]
return ''.join(lines)
def create_whitespace(source_substring):
return Token(tokens.TYPE_WHITESPACE, source_substring)
def create_multiline_string(text, maximum_line_length=120):
def escape(t):
return t.replace(u'"""', six.u(r'\"\"\"'))
source_substring = u'"""\n{}"""'.format(u'\\\n'.join(chunkate_string(escape(text), maximum_line_length)))
return Token(tokens.TYPE_MULTILINE_STRING, source_substring)
import codecs
import functools
import operator
import re
import string
from . import TYPE_BOOLEAN, TYPE_INTEGER, TYPE_FLOAT, TYPE_DATE, \
TYPE_MULTILINE_STRING, TYPE_BARE_STRING, TYPE_MULTILINE_LITERAL_STRING, TYPE_LITERAL_STRING, \
TYPE_STRING
from .errors import MalformedDateError
from .errors import BadEscapeCharacter
def deserialize(token):
"""
Deserializes the value of a single tokens.Token instance based on its type.
Raises DeserializationError when appropriate.
"""
if token.type == TYPE_BOOLEAN:
return _to_boolean(token)
elif token.type == TYPE_INTEGER:
return _to_int(token)
elif token.type == TYPE_FLOAT:
return _to_float(token)
elif token.type == TYPE_DATE:
return _to_date(token)
elif token.type in (TYPE_STRING, TYPE_MULTILINE_STRING, TYPE_BARE_STRING,
TYPE_LITERAL_STRING, TYPE_MULTILINE_LITERAL_STRING):
return _to_string(token)
else:
raise Exception('This should never happen!')
def _unescape_str(text):
"""
Unescapes a string according the TOML spec. Raises BadEscapeCharacter when appropriate.
"""
# Detect bad escape jobs
bad_escape_regexp = re.compile(r'([^\\]|^)\\[^btnfr"\\uU]')
if bad_escape_regexp.findall(text):
raise BadEscapeCharacter
# Do the unescaping
return codecs.decode(_unicode_escaped_string(text), 'unicode-escape')
def _unicode_escaped_string(text):
"""
Escapes all unicode characters in the given string
"""
def is_unicode(c):
return c.lower() not in string.ascii_letters + string.whitespace + string.punctuation + string.digits
def escape_unicode_char(x):
return codecs.encode(x, 'unicode-escape')
if any(is_unicode(c) for c in text):
homogeneous_chars = tuple(escape_unicode_char(c) if is_unicode(c) else c.encode() for c in text)
homogeneous_bytes = functools.reduce(operator.add, homogeneous_chars)
return homogeneous_bytes.decode()
else:
return text
def _to_string(token):
if token.type == TYPE_BARE_STRING:
return token.source_substring
elif token.type == TYPE_STRING:
escaped = token.source_substring[1:-1]
return _unescape_str(escaped)
elif token.type == TYPE_MULTILINE_STRING:
escaped = token.source_substring[3:-3]
# Drop the first newline if existed
if escaped and escaped[0] == '\n':
escaped = escaped[1:]
# Remove all occurrences of a slash-newline-zero-or-more-whitespace patterns
escaped = re.sub(r'\\\n\s*', repl='', string=escaped, flags=re.DOTALL)
return _unescape_str(escaped)
elif token.type == TYPE_LITERAL_STRING:
return token.source_substring[1:-1]
elif token.type == TYPE_MULTILINE_LITERAL_STRING:
text = token.source_substring[3:-3]
if text[0] == '\n':
text = text[1:]
return text
raise RuntimeError('Control should never reach here.')
def _to_int(token):
return int(token.source_substring.replace('_', ''))
def _to_float(token):
assert token.type == TYPE_FLOAT
string = token.source_substring.replace('_', '')
return float(string)
def _to_boolean(token):
return token.source_substring == 'true'
_correct_date_format = re.compile(
r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(Z|([+-])(\d{2}):(\d{2}))'
)
def _to_date(token):
if not _correct_date_format.match(token.source_substring):
raise MalformedDateError
return token.source_substring
import itertools
def is_sequence_like(x):
"""
Returns True if x exposes a sequence-like interface.
"""
required_attrs = (
'__len__',
'__getitem__'
)
return all(hasattr(x, attr) for attr in required_attrs)
def is_dict_like(x):
"""
Returns True if x exposes a dict-like interface.
"""
required_attrs = (
'__len__',
'__getitem__',
'keys',
'values',
)
return all(hasattr(x, attr) for attr in required_attrs)
def join_with(iterable, separator):
"""
Joins elements from iterable with separator and returns the produced sequence as a list.
separator must be addable to a list.
"""
inputs = list(iterable)
b = []
for i, element in enumerate(inputs):
if isinstance(element, (list, tuple, set)):
b += tuple(element)
else:
b += [element]
if i < len(inputs)-1:
b += separator
return b
def chunkate_string(text, length):
"""
Iterates over the given seq in chunks of at maximally the given length. Will never break a whole word.
"""
iterator_index = 0
def next_newline():
try:
return next(i for (i, c) in enumerate(text) if i > iterator_index and c == '\n')
except StopIteration:
return len(text)
def next_breaker():
try:
return next(i for (i, c) in reversed(tuple(enumerate(text)))
if i >= iterator_index and
(i < iterator_index+length) and
c in (' ', '\t'))
except StopIteration:
return len(text)
while iterator_index < len(text):
next_chunk = text[iterator_index:min(next_newline(), next_breaker()+1)]
iterator_index += len(next_chunk)
yield next_chunk
def flatten_nested(nested_dicts):
"""
Flattens dicts and sequences into one dict with tuples of keys representing the nested keys.
Example
>>> dd = { \
'dict1': {'name': 'Jon', 'id': 42}, \
'dict2': {'name': 'Sam', 'id': 41}, \
'seq1': [{'one': 1, 'two': 2}] \
}
>>> flatten_nested(dd) == { \
('dict1', 'name'): 'Jon', ('dict1', 'id'): 42, \
('dict2', 'name'): 'Sam', ('dict2', 'id'): 41, \
('seq1', 0, 'one'): 1, ('seq1', 0, 'two'): 2, \
}
True
"""
assert isinstance(nested_dicts, (dict, list, tuple)), 'Only works with a collection parameter'
def items(c):
if isinstance(c, dict):
return c.items()
elif isinstance(c, (list, tuple)):
return enumerate(c)
else:
raise RuntimeError('c must be a collection')
def flatten(dd):
output = {}
for k, v in items(dd):
if isinstance(v, (dict, list, tuple)):
for child_key, child_value in flatten(v).items():
output[(k,) + child_key] = child_value
else:
output[(k,)] = v
return output
return flatten(nested_dicts)
class PeekableIterator:
# Returned by peek() when the iterator is exhausted. Truthiness is False.
Nothing = tuple()
def __init__(self, iter):
self._iter = iter
def __next__(self):
return next(self._iter)
def next(self):
return self.__next__()
def __iter__(self):
return self
def peek(self):
"""
Returns PeekableIterator.Nothing when the iterator is exhausted.
"""
try:
v = next(self._iter)
self._iter = itertools.chain((v,), self._iter)
return v
except StopIteration:
return PeekableIterator.Nothing
from .prettify.elements.abstracttable import AbstractTable
def to_raw(x):
from .cascadedict import CascadeDict
if isinstance(x, AbstractTable):
return x.primitive_value
elif isinstance(x, CascadeDict):
return x.neutralized
elif isinstance(x, (list, tuple)):
return [to_raw(y) for y in x]
elif isinstance(x, dict):
return {k: to_raw(v) for (k, v) in x.items()}
else:
return x
from . import toplevels
from .cascadedict import CascadeDict
class NamedDict(dict):
"""
A dict that can use Name instances as keys.
"""
def __init__(self, other_dict=None):
dict.__init__(self)
if other_dict:
for k, v in other_dict.items():
self[k] = v
def __setitem__(self, key, value):
"""
key can be an Name instance.
When key is a path in the form of an Name instance, all the parents and grandparents of the value are
created along the way as instances of NamedDict. If the parent of the value exists, it is replaced with a
CascadeDict() that cascades the old parent value with a new NamedDict that contains the given child name
and value.
"""
if isinstance(key, toplevels.Name):
if len(key.sub_names) == 1:
name = key.sub_names[0]
if name in self:
self[name] = CascadeDict(self[name], value)
else:
self[name] = value
elif len(key.sub_names) > 1:
name = key.sub_names[0]
rest_of_key = key.drop(1)
if name in self:
named_dict = NamedDict()
named_dict[rest_of_key] = value
self[name] = CascadeDict(self[name], named_dict)
else:
self[name] = NamedDict()
self[name][rest_of_key] = value
else:
return dict.__setitem__(self, key, value)
def __contains__(self, item):
try:
_ = self[item]
return True
except KeyError:
return False
def append(self, key, value):
"""
Makes sure the value pointed to by key exists
and is a list and appends the given value to it.
"""
if key in self:
self[key].append(value)
else:
self[key] = [value]
def __getitem__(self, item):
if isinstance(item, toplevels.Name):
d = self
for name in item.sub_names:
d = d[name]
return d
else:
return dict.__getitem__(self, item)
def structure(table_toplevels):
"""
Accepts an ordered sequence of TopLevel instances and returns a navigable object structure representation of the
TOML file.
"""
table_toplevels = tuple(table_toplevels)
obj = NamedDict()
last_array_of_tables = None # The Name of the last array-of-tables header
for toplevel in table_toplevels:
if isinstance(toplevel, toplevels.AnonymousTable):
obj[''] = toplevel.table_element
elif isinstance(toplevel, toplevels.Table):
if last_array_of_tables and toplevel.name.is_prefixed_with(last_array_of_tables):
seq = obj[last_array_of_tables]
unprefixed_name = toplevel.name.without_prefix(last_array_of_tables)
seq[-1] = CascadeDict(seq[-1], NamedDict({unprefixed_name: toplevel.table_element}))
else:
obj[toplevel.name] = toplevel.table_element
else: # It's an ArrayOfTables
if last_array_of_tables and toplevel.name != last_array_of_tables and \
toplevel.name.is_prefixed_with(last_array_of_tables):
seq = obj[last_array_of_tables]
unprefixed_name = toplevel.name.without_prefix(last_array_of_tables)
if unprefixed_name in seq[-1]:
seq[-1][unprefixed_name].append(toplevel.table_element)
else:
cascaded_with = NamedDict({unprefixed_name: [toplevel.table_element]})
seq[-1] = CascadeDict(seq[-1], cascaded_with)
else:
obj.append(toplevel.name, toplevel.table_element)
last_array_of_tables = toplevel.name
return obj
from .prettify.errors import NoArrayFoundError
from . import structurer, toplevels, raw
from .array import ArrayOfTables
from .freshtable import FreshTable
from .prettify.elements import factory as element_factory
from .prettify import util
class TOMLFile(dict):
"""
A TOMLFile object that tries its best to prserve formatting and order of mappings of the input source.
Raises InvalidTOMLFileError on invalid input elements.
Raises DuplicateKeysError, DuplicateTableError when appropriate.
"""
def __init__(self, _elements):
self._elements = []
self._navigable = {}
self.append_elements(_elements)
def __getitem__(self, item):
try:
value = self._navigable[item]
if isinstance(value, (list, tuple)):
return ArrayOfTables(toml_file=self, name=item, iterable=value)
else:
return value
except KeyError:
return FreshTable(parent=self, name=item, is_array=False)
def __contains__(self, item):
return item in self.keys()
def _setitem_with_key_seq(self, key_seq, value):
"""
Sets a the value in the TOML file located by the given key sequence.
Example:
self._setitem(('key1', 'key2', 'key3'), 'text_value')
is equivalent to doing
self['key1']['key2']['key3'] = 'text_value'
"""
table = self
key_so_far = tuple()
for key in key_seq[:-1]:
key_so_far += (key,)
self._make_sure_table_exists(key_so_far)
table = table[key]
table[key_seq[-1]] = value
def _array_setitem_with_key_seq(self, array_name, index, key_seq, value):
"""
Sets a the array value in the TOML file located by the given key sequence.
Example:
self._array_setitem(array_name, index, ('key1', 'key2', 'key3'), 'text_value')
is equivalent to doing
self.array(array_name)[index]['key1']['key2']['key3'] = 'text_value'
"""
table = self.array(array_name)[index]
key_so_far = tuple()
for key in key_seq[:-1]:
key_so_far += (key,)
new_table = self._array_make_sure_table_exists(array_name, index, key_so_far)
if new_table is not None:
table = new_table
else:
table = table[key]
table[key_seq[-1]] = value
def _make_sure_table_exists(self, name_seq):
"""
Makes sure the table with the full name comprising of name_seq exists.
"""
t = self
for key in name_seq[:-1]:
t = t[key]
name = name_seq[-1]
if name not in t:
self.append_elements([element_factory.create_table_header_element(name_seq),
element_factory.create_table({})])
def _array_make_sure_table_exists(self, array_name, index, name_seq):
"""
Makes sure the table with the full name comprising of name_seq exists.
"""
t = self[array_name][index]
for key in name_seq[:-1]:
t = t[key]
name = name_seq[-1]
if name not in t:
new_table = element_factory.create_table({})
self.append_elements([element_factory.create_table_header_element((array_name,) + name_seq), new_table])
return new_table
def __delitem__(self, key):
table_element_index = self._elements.index(self._navigable[key])
self._elements[table_element_index] = element_factory.create_table({})
self._on_element_change()
def __setitem__(self, key, value):
# Setting an array-of-tables
if key and isinstance(value, (tuple, list)) and value and all(isinstance(v, dict) for v in value):
for table in value:
self.array(key).append(table)
# Or setting a whole single table
elif isinstance(value, dict):
if key and key in self:
del self[key]
for key_seq, child_value in util.flatten_nested({key: value}).items():
self._setitem_with_key_seq(key_seq, child_value)
# if key in self._navigable:
# del self[key]
# index = self._elements.index(self._navigable[key])
# self._elements = self._elements[:index] + [element_factory.create_table(value)] + self._elements[index+1:]
# else:
# if key:
# self._elements.append(element_factory.create_table_header_element(key))
# self._elements.append(element_factory.create_table(value))
# Or updating the anonymous section table
else:
# It's mea
self[''][key] = value
self._on_element_change()
def _detect_toplevels(self):
"""
Returns a sequence of TopLevel instances for the current state of this table.
"""
return tuple(e for e in toplevels.identify(self.elements) if isinstance(e, toplevels.Table))
def _update_table_fallbacks(self, table_toplevels):
"""
Updates the fallbacks on all the table elements to make relative table access possible.
Raises DuplicateKeysError if appropriate.
"""
if len(self.elements) <= 1:
return
def parent_of(toplevel):
# Returns an TopLevel parent of the given entry, or None.
for parent_toplevel in table_toplevels:
if toplevel.name.sub_names[:-1] == parent_toplevel.name.sub_names:
return parent_toplevel
for entry in table_toplevels:
if entry.name.is_qualified:
parent = parent_of(entry)
if parent:
child_name = entry.name.without_prefix(parent.name)
parent.table_element.set_fallback({child_name.sub_names[0]: entry.table_element})
def _recreate_navigable(self):
if self._elements:
self._navigable = structurer.structure(toplevels.identify(self._elements))
def array(self, name):
"""
Returns the array of tables with the given name.
"""
if name in self._navigable:
if isinstance(self._navigable[name], (list, tuple)):
return self[name]
else:
raise NoArrayFoundError
else:
return ArrayOfTables(toml_file=self, name=name)
def _on_element_change(self):
self._recreate_navigable()
table_toplevels = self._detect_toplevels()
self._update_table_fallbacks(table_toplevels)
def append_elements(self, elements):
"""
Appends more elements to the contained internal elements.
"""
self._elements = self._elements + list(elements)
self._on_element_change()
def prepend_elements(self, elements):
"""
Prepends more elements to the contained internal elements.
"""
self._elements = list(elements) + self._elements
self._on_element_change()
def dumps(self):
"""
Returns the TOML file serialized back to str.
"""
return ''.join(element.serialized() for element in self._elements)
def dump(self, file_path):
with open(file_path, mode='w') as fp:
fp.write(self.dumps())
def keys(self):
return set(self._navigable.keys()) | {''}
def values(self):
return self._navigable.values()
def items(self):
items = list(self._navigable.items())
def has_anonymous_entry():
return any(key == '' for (key, _) in items)
if has_anonymous_entry():
return items
else:
return items + [('', self[''])]
def get(self, item, default=None):
return self._navigable.get(item, default)
@property
def primitive(self):
"""
Returns a primitive object representation for this container (which is a dict).
WARNING: The returned container does not contain any markup or formatting metadata.
"""
raw_container = raw.to_raw(self._navigable)
# Collapsing the anonymous table onto the top-level container is present
if '' in raw_container:
raw_container.update(raw_container[''])
del raw_container['']
return raw_container
def append_fresh_table(self, fresh_table):
"""
Gets called by FreshTable instances when they get written to.
"""
if fresh_table.name:
elements = []
if fresh_table.is_array:
elements += [element_factory.create_array_of_tables_header_element(fresh_table.name)]
else:
elements += [element_factory.create_table_header_element(fresh_table.name)]
elements += [fresh_table, element_factory.create_newline_element()]
self.append_elements(elements)
else:
# It's an anonymous table
self.prepend_elements([fresh_table, element_factory.create_newline_element()])
@property
def elements(self):
return self._elements
def __str__(self):
is_empty = (not self['']) and (not tuple(k for k in self.keys() if k))
def key_name(key):
return '[ANONYMOUS]' if not key else key
def pair(key, value):
return '%s = %s' % (key_name(key), str(value))
content_text = '' if is_empty else \
'\n\t' + ',\n\t'.join(pair(k, v) for (k, v) in self.items() if v) + '\n'
return "TOMLFile{%s}" % content_text
def __repr__(self):
return str(self)
"""
Top-level entries in a TOML file.
"""
from .prettify import elements
from .prettify.elements import TableElement, TableHeaderElement
from .peekableit import PeekableIterator
class TopLevel:
"""
A abstract top-level entry.
"""
def __init__(self, names, table_element):
self._table_element = table_element
self._names = Name(names)
@property
def table_element(self):
return self._table_element
@property
def name(self):
"""
The distinct name of a table entry as an Name instance.
"""
return self._names
class Name:
def __init__(self, names):
self._names = names
@property
def sub_names(self):
return self._names
def drop(self, n=0):
"""
Returns the name after dropping the first n entries of it.
"""
return Name(names=self._names[n:])
def is_prefixed_with(self, names):
if isinstance(names, Name):
return self.is_prefixed_with(names.sub_names)
for i, name in enumerate(names):
if self._names[i] != name:
return False
return True
def without_prefix(self, names):
if isinstance(names, Name):
return self.without_prefix(names.sub_names)
for i, name in enumerate(names):
if name != self._names[i]:
return Name(self._names[i:])
return Name(names=self.sub_names[len(names):])
@property
def is_qualified(self):
return len(self._names) > 1
def __str__(self):
return '.'.join(self.sub_names)
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return not self.__eq__(other)
class AnonymousTable(TopLevel):
def __init__(self, table_element):
TopLevel.__init__(self, ('',), table_element)
class Table(TopLevel):
def __init__(self, names, table_element):
TopLevel.__init__(self, names=names, table_element=table_element)
class ArrayOfTables(TopLevel):
def __init__(self, names, table_element):
TopLevel.__init__(self, names=names, table_element=table_element)
def _validate_file_elements(file_elements):
pass
def identify(file_elements):
"""
Outputs an ordered sequence of instances of TopLevel types.
Elements start with an optional TableElement, followed by zero or more pairs of (TableHeaderElement, TableElement).
"""
if not file_elements:
return
_validate_file_elements(file_elements)
# An iterator over enumerate(the non-metadata) elements
iterator = PeekableIterator((element_i, element) for (element_i, element) in enumerate(file_elements)
if element.type != elements.TYPE_METADATA)
try:
_, first_element = iterator.peek()
if isinstance(first_element, TableElement):
iterator.next()
yield AnonymousTable(first_element)
except KeyError:
pass
except StopIteration:
return
for element_i, element in iterator:
if not isinstance(element, TableHeaderElement):
continue
# If TableHeader of a regular table, return Table following it
if not element.is_array_of_tables:
table_element_i, table_element = next(iterator)
yield Table(names=element.names, table_element=table_element)
# If TableHeader of an array of tables, do your thing
else:
table_element_i, table_element = next(iterator)
yield ArrayOfTables(names=element.names, table_element=table_element)
import toml
from pathlib import Path
from toml import dumps
from toml import loads
from poetry.toml import dumps
from poetry.toml import loads
from poetry.toml import TOMLFile
class TomlFile:
......@@ -17,7 +20,12 @@ class TomlFile:
return loads(self._path.read_text())
def write(self, data) -> None:
self._path.write_text(dumps(data))
if not isinstance(data, TOMLFile):
data = toml.dumps(data)
else:
data = dumps(data)
self._path.write_text(data)
def exists(self) -> bool:
return self._path.exists()
from poetry.toml import dumps
from poetry.toml import loads
from poetry.toml.prettify.errors import TOMLError
from poetry.toml.prettify.errors import DuplicateKeysError
from poetry.toml.prettify.errors import DuplicateTablesError
from poetry.toml.prettify.errors import InvalidTOMLFileError
def test_loading_toml_without_trailing_newline():
toml_text = '[main]\nname = "azmy"'
toml = loads(toml_text)
assert toml['main']['name'] == 'azmy'
def test_array_edge_cases():
# Parsing an empty array value
toml_text = """[section]
key = []"""
toml = loads(toml_text)
assert 'section' in toml
assert len(toml['section']['key']) == 0
def test_loading_an_empty_toml_source():
toml_text = ''
loads(toml_text)
# Should not fail
def test_parsing_section_with_indentation_and_comment_lines():
toml = """[main]
listen = ":8966"
redis_host = "localhost:6379"
redis_password = ""
[influxdb]
host = "localhost:8086"
db = "agentcontroller"
user = "ac"
password = "acctrl"
[handlers]
binary = "python2.7"
cwd = "./handlers"
[handlers.env]
PYTHONPATH = "/opt/jumpscale7/lib:../client"
SYNCTHING_URL = "http://localhost:8384/"
SYNCTHING_SHARED_FOLDER_ID = "jumpscripts"
#SYNCTHING_API_KEY = ""
REDIS_ADDRESS = "localhost"
REDIS_PORT = "6379"
#REDIS_PASSWORD = ""
"""
f = loads(toml)
assert f['handlers']['env']['REDIS_ADDRESS'] == 'localhost'
assert 'REDIS_PASSWORD' not in f['handlers']['env']
f['handlers']['env']['REDIS_PASSWORD'] = 'MYPASSWORD'
expected = """[main]
listen = ":8966"
redis_host = "localhost:6379"
redis_password = ""
[influxdb]
host = "localhost:8086"
db = "agentcontroller"
user = "ac"
password = "acctrl"
[handlers]
binary = "python2.7"
cwd = "./handlers"
[handlers.env]
PYTHONPATH = "/opt/jumpscale7/lib:../client"
SYNCTHING_URL = "http://localhost:8384/"
SYNCTHING_SHARED_FOLDER_ID = "jumpscripts"
#SYNCTHING_API_KEY = ""
REDIS_ADDRESS = "localhost"
REDIS_PORT = "6379"
REDIS_PASSWORD = "MYPASSWORD"
#REDIS_PASSWORD = ""
"""
assert expected == f.dumps()
def test_loading_complex_file_1():
toml = """
[main]
gid = 1
nid = 10
max_jobs = 100
message_id_file = "./.mid"
history_file = "./.history"
agent_controllers = ["http://localhost:8966/"]
[cmds]
[cmds.execute_js_py]
binary = "python2.7"
cwd = "./jumpscripts"
script = "{domain}/{name}.py"
[cmds.sync]
#syncthing extension
binary = "python2.7"
cwd = "./extensions/sync"
script = "{name}.py"
[cmds.sync.env]
PYTHONPATH = "../"
JUMPSCRIPTS_HOME = "../../jumpscripts"
SYNCTHING_URL = "http://localhost:8384"
[channel]
cmds = [0] # long polling from agent 0
[logging]
[logging.db]
type = "DB"
log_dir = "./logs"
levels = [2, 4, 7, 8, 9] # (all error messages) empty for all
[logging.ac]
type = "AC"
flush_int = 300 # seconds (5min)
batch_size = 1000 # max batch size, force flush if reached this count.
agent_controllers = [] # to all agents
levels = [2, 4, 7, 8, 9] # (all error messages) empty for all
[logging.console]
type = "console"
levels = [2, 4, 7, 8, 9]
[stats]
interval = 60 # seconds
agent_controllers = []
"""
loads(toml)
def test_weird_edge_case_1():
toml_text = """l = "t"
creativity = "on vacation"
"""
f = loads(toml_text)
assert f['']['l'] == 't'
def test_accessing_deeply_nested_dicts():
t = """[cmds]
[cmds.sync]
#syncthing extension
binary = "python2.7"
cwd = "./extensions/sync"
script = "{name}.py"
[cmds.sync.env]
PYTHONPATH = "../"
JUMPSCRIPTS_HOME = "../../jumpscripts"
SYNCTHING_URL = "http://localhost:8384"
"""
f = loads(t)
assert f['cmds']['sync']['env']['SYNCTHING_URL'] == 'http://localhost:8384'
f['cmds']['sync']['env']['SYNCTHING_URL'] = 'Nowhere'
expected_toml = """[cmds]
[cmds.sync]
#syncthing extension
binary = "python2.7"
cwd = "./extensions/sync"
script = "{name}.py"
[cmds.sync.env]
PYTHONPATH = "../"
JUMPSCRIPTS_HOME = "../../jumpscripts"
SYNCTHING_URL = "Nowhere"
"""
assert expected_toml == f.dumps()
def test_table_with_pound_in_title():
toml = """["key#group"]
answer = 42"""
parsed = loads(toml)
assert parsed.primitive['key#group']['answer'] == 42
def test_fails_to_parse_bad_escape_characters():
toml = r"""
invalid-escape = r"This string has a bad \a escape character."
"""
try:
loads(toml)
assert False, "Should raise an exception before getting here"
except TOMLError:
pass
def test_parsing_multiline_strings_correctly():
toml = r'''multiline_empty_one = """"""
multiline_empty_two = """
"""
multiline_empty_three = """\
"""
multiline_empty_four = """\
\
\
"""
equivalent_one = "The quick brown fox jumps over the lazy dog."
equivalent_two = """
The quick brown \
fox jumps over \
the lazy dog."""
equivalent_three = """\
The quick brown \
fox jumps over \
the lazy dog.\
"""
'''
parsed = loads(toml)
assert parsed['']['multiline_empty_one'] == parsed['']['multiline_empty_two'] == \
parsed['']['multiline_empty_three'] == parsed['']['multiline_empty_four']
def test_unicode_string_literals():
toml = u'answer = "δ"\n'
parsed = loads(toml)
assert parsed['']['answer'] == u"δ"
def test_one_entry_array_of_tables():
t = '''[[people]]
first_name = "Bruce"
last_name = "Springsteen"
'''
parsed = loads(t)
assert parsed['people'][0]['first_name'] == 'Bruce'
assert parsed['people'][0]['last_name'] == 'Springsteen'
def non_empty(iterable):
return tuple(filter(bool, iterable))
# This is a TOML document.
title = "TOML Example"
[owner]
name = "Tom Preston-Werner"
dob = 1979-05-27T07:32:00-08:00 # First class dates
[database]
server = "192.168.1.1"
ports = [ 8001, 8001, 8002 ]
connection_max = 5000
enabled = true
[servers]
# Indentation (tabs and/or spaces) is allowed but not required
[servers.alpha]
ip = "10.0.0.1"
dc = "eqdc10"
[servers.beta]
ip = "10.0.0.2"
dc = "eqdc10"
[clients]
data = [ ["gamma", "delta"], [1, 2] ]
# Line breaks are OK when inside arrays
hosts = [
"alpha",
"omega"
]
str_multiline = """
Roses are red
Violets are blue"""
str_quoted = "I'm a string. \"You can quote me\". Name\tJos\u00E9\nLocation\tSF."
str2 = """
The quick brown \
fox jumps over \
the lazy dog."""
key3 = """\
The quick brown \
fox jumps over \
the lazy dog.\
"""
# What you see is what you get.
winpath = 'C:\Users\nodejs\templates'
winpath2 = '\\ServerX\admin$\system32\'
quoted = 'Tom "Dubs" Preston-Werner'
regex = '<\i\c*\s*>'
regex2 = '''I [dw]on't need \d{2} apples'''
lines = '''
The first newline is
trimmed in raw strings.
All other whitespace
is preserved.
'''
[[fruit]]
name = "apple"
[fruit.physical]
color = "red"
shape = "round"
[[fruit.variety]]
name = "red delicious"
[[fruit.variety]]
name = "granny smith"
[[fruit]]
name = "banana"
[[fruit.variety]]
name = "plantain"
points = [ { x = 1, y = 2, z = 3 }, # This value is so special to me
{ x = 7, y = 8, z = 9 },
{ x = 2, y = 4, z = 8 } ]
import pytest
from pathlib import Path
from poetry.utils.toml_file import TomlFile
@pytest.fixture()
def fixture():
return Path(__file__).parent / 'fixtures' / 'test.toml'
def test_toml_file(fixture):
f = TomlFile(fixture)
content = f.read()
assert content['']['title'] == 'TOML Example'
assert content['owner']['name'] == 'Tom Preston-Werner'
assert isinstance(content['owner'], dict)
assert isinstance(content['database']['ports'], list)
assert content['database']['ports'] == [8001, 8001, 8002]
assert content['database']['connection_max'] == 5000
assert content['database']['enabled']
servers = content['servers']
assert len(servers) == 2
alpha = servers['alpha']
assert len(alpha) == 2
assert alpha['ip'] == '10.0.0.1'
assert alpha['dc'] == 'eqdc10'
beta = servers['beta']
assert len(beta) == 2
assert beta['ip'] == '10.0.0.2'
assert beta['dc'] == 'eqdc10'
clients = content['clients']
assert len(clients['data']) == 2
assert clients['data'] == [['gamma', 'delta'], [1, 2]]
assert clients['hosts'] == ['alpha', 'omega']
assert clients['str_multiline'] == 'Roses are red\nViolets are blue'
fruits = content['fruit']
assert len(fruits) == 2
apple = fruits[0]
assert len(apple) == 3
banana = fruits[1]
assert len(banana['variety'][0]['points']) == 3
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment