Source code for tskit.util

# MIT License
#
# Copyright (c) 2018-2019 Tskit Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Module responsible for various utility functions used in other modules.
"""
import json
import os

import numpy as np

from tskit import UNKNOWN_TIME


def canonical_json(obj):
    """
    Returns string of encoded JSON with keys sorted and whitespace removed to enable
    byte-level comparison of encoded data.

    :param Any obj: Python object to encode
    :return: The encoded string
    :rtype: str
    """
    return json.dumps(obj, sort_keys=True, separators=(",", ":"))


def is_unknown_time(time):
    """
    As the default unknown mutation time is NAN equality always fails. This
    method compares the bitfield such that unknown times can be detected.
    Either single floats can be passed or lists/arrays.

    :param float or array-like time: Value or array to check.
    :return: A single boolean or array of booleans the same shape as ``time``.
    :rtype: bool or np.array(dtype=bool)
    """
    return np.asarray(time, dtype=np.float64).view(np.uint64) == np.float64(
        UNKNOWN_TIME
    ).view(np.uint64)


def safe_np_int_cast(int_array, dtype, copy=False):
    """
    A few functions require arrays of certain dtypes (e.g. node indices are np.int32,
    genotypes are np.int8, etc. Standard numpy integer arrays are of (dtype=np.int64),
    so need casting. This function casts but checks bounds to avoid wrap-around
    conversion errors. Strangely, numpy seems not to have this functionality built-in.

    If copy=False, and the original array is a numpy array of exactly the same dtype
    required, simply return the original rather than making a copy (same as the numpy
    .astype(copy=...) function)
    """
    if not isinstance(int_array, np.ndarray):
        int_array = np.array(int_array)
        # Since this is a new numpy array anyway, it's always a copy, so economize by
        # setting copy=False
        copy = False
    if int_array.size == 0:
        return int_array.astype(dtype, copy=copy)  # Allow empty arrays of any type
    try:
        return int_array.astype(dtype, casting="safe", copy=copy)
    except TypeError:
        if int_array.dtype == np.dtype("O"):
            # this occurs e.g. if we're passed a list of lists of different lengths
            raise TypeError("Cannot convert to a rectangular array.")
        bounds = np.iinfo(dtype)
        if np.any(int_array < bounds.min) or np.any(int_array > bounds.max):
            raise OverflowError(f"Cannot convert safely to {dtype} type")
        if int_array.dtype.kind == "i" and np.dtype(dtype).kind == "u":
            # Allow casting from int to unsigned int, since we have checked bounds
            casting = "unsafe"
        else:
            # Raise a TypeError when we try to convert from, e.g., a float.
            casting = "same_kind"
        return int_array.astype(dtype, casting=casting, copy=copy)


#
# Pack/unpack lists of data into flattened numpy arrays.
#


[docs]def pack_bytes(data): """ Packs the specified list of bytes into a flattened numpy array of 8 bit integers and corresponding offsets. See :ref:`sec_encoding_ragged_columns` for details of this encoding. :param list[bytes] data: The list of bytes values to encode. :return: The tuple (packed, offset) of numpy arrays representing the flattened input data and offsets. :rtype: numpy.ndarray (dtype=np.int8), numpy.ndarray (dtype=np.uint32) """ n = len(data) offsets = np.zeros(n + 1, dtype=np.uint32) for j in range(n): offsets[j + 1] = offsets[j] + len(data[j]) column = np.zeros(offsets[-1], dtype=np.int8) for j, value in enumerate(data): column[offsets[j] : offsets[j + 1]] = bytearray(value) return column, offsets
[docs]def unpack_bytes(packed, offset): """ Unpacks a list of bytes from the specified numpy arrays of packed byte data and corresponding offsets. See :ref:`sec_encoding_ragged_columns` for details of this encoding. :param numpy.ndarray packed: The flattened array of byte values. :param numpy.ndarray offset: The array of offsets into the ``packed`` array. :return: The list of bytes values unpacked from the parameter arrays. :rtype: list[bytes] """ # This could be done a lot more efficiently... ret = [] for j in range(offset.shape[0] - 1): raw = packed[offset[j] : offset[j + 1]].tobytes() ret.append(raw) return ret
[docs]def pack_strings(strings, encoding="utf8"): """ Packs the specified list of strings into a flattened numpy array of 8 bit integers and corresponding offsets using the specified text encoding. See :ref:`sec_encoding_ragged_columns` for details of this encoding of columns of variable length data. :param list[str] data: The list of strings to encode. :param str encoding: The text encoding to use when converting string data to bytes. See the :mod:`codecs` module for information on available string encodings. :return: The tuple (packed, offset) of numpy arrays representing the flattened input data and offsets. :rtype: numpy.ndarray (dtype=np.int8), numpy.ndarray (dtype=np.uint32) """ return pack_bytes([bytearray(s.encode(encoding)) for s in strings])
[docs]def unpack_strings(packed, offset, encoding="utf8"): """ Unpacks a list of strings from the specified numpy arrays of packed byte data and corresponding offsets using the specified text encoding. See :ref:`sec_encoding_ragged_columns` for details of this encoding of columns of variable length data. :param numpy.ndarray packed: The flattened array of byte values. :param numpy.ndarray offset: The array of offsets into the ``packed`` array. :param str encoding: The text encoding to use when converting string data to bytes. See the :mod:`codecs` module for information on available string encodings. :return: The list of strings unpacked from the parameter arrays. :rtype: list[str] """ return [b.decode(encoding) for b in unpack_bytes(packed, offset)]
def pack_arrays(list_of_lists): """ Packs the specified list of numberic lists into a flattened numpy array of numpy float64 and corresponding offsets. See :ref:`sec_encoding_ragged_columns` for details of this encoding of columns of variable length data. :param list[list] list_of_lists: The list of numeric lists to encode. :return: The tuple (packed, offset) of numpy arrays representing the flattened input data and offsets. :rtype: numpy.array (dtype=np.float64), numpy.array (dtype=np.uint32) """ # TODO must be possible to do this more efficiently with numpy n = len(list_of_lists) offset = np.zeros(n + 1, dtype=np.uint32) for j in range(n): offset[j + 1] = offset[j] + len(list_of_lists[j]) data = np.empty(offset[-1], dtype=np.float64) for j in range(n): data[offset[j] : offset[j + 1]] = list_of_lists[j] return data, offset def unpack_arrays(packed, offset): """ Unpacks a list of arrays from the specified numpy arrays of packed data and corresponding offsets. See :ref:`sec_encoding_ragged_columns` for details of this encoding of columns of variable length data. :param numpy.ndarray packed: The flattened array of data. :param numpy.ndarray offset: The array of offsets into the ``packed`` array. :return: The list numpy arrays unpacked from the parameter arrays. :rtype: list[numpy.ndarray] """ ret = [] for j in range(offset.shape[0] - 1): ret.append(packed[offset[j] : offset[j + 1]]) return ret # # Interval utilities # def intervals_to_np_array(intervals, start, end): """ Converts the specified intervals to a numpy array and checks for errors. """ intervals = np.array(intervals, dtype=np.float64) # Special case the empty list of intervals if len(intervals) == 0: intervals = np.zeros((0, 2), dtype=np.float64) if len(intervals.shape) != 2: raise ValueError("Intervals must be a 2D numpy array") if intervals.shape[1] != 2: raise ValueError("Intervals array shape must be (N, 2)") # TODO do this with numpy operations. last_right = start for left, right in intervals: if left < start or right > end: raise ValueError(f"Intervals must be within {start} and {end}") if right <= left: raise ValueError("Bad interval: right <= left") if left < last_right: raise ValueError("Intervals must be disjoint.") last_right = right return intervals def negate_intervals(intervals, start, end): """ Returns the set of intervals *not* covered by the specified set of disjoint intervals in the specified range. """ intervals = intervals_to_np_array(intervals, start, end) other_intervals = [] last_right = start for left, right in intervals: if left != last_right: other_intervals.append((last_right, left)) last_right = right if last_right != end: other_intervals.append((last_right, end)) return np.array(other_intervals) def naturalsize(value): """ Format a number of bytes like a human readable filesize (e.g. 10 kiB) """ # Taken from https://github.com/jmoiron/humanize suffix = ("KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") base = 1024 format_ = "%.1f" bytes_ = float(value) abs_bytes = abs(bytes_) if abs_bytes == 1: return "%d Byte" % bytes_ elif abs_bytes < base: return "%d Bytes" % bytes_ for i, s in enumerate(suffix): unit = base ** (i + 2) if abs_bytes < unit: return (format_ + " %s") % ((base * bytes_ / unit), s) return (format_ + " %s") % ((base * bytes_ / unit), s) def obj_to_collapsed_html(d, name=None, open_depth=0): """ Recursively make an HTML representation of python objects. :param str name: Name for this object :param int open_depth: By default sub-sections are collapsed. If this number is non-zero the first layers up to open_depth will be opened. :return: The HTML as a string :rtype: str """ opened = "open" if open_depth > 0 else "" open_depth -= 1 name = str(name) + ":" if name is not None else "" if type(d) == dict: return f""" <div> <span class="tskit-details-label">{name}</span> <details {opened}> <summary>dict</summary> {"".join(f"{obj_to_collapsed_html(val, key, open_depth)}<br/>" for key, val in d.items())} </details> </div> """ elif type(d) == list: return f""" <div> <span class="tskit-details-label">{name}</span> <details {opened}> <summary>list</summary> {"".join(f"{obj_to_collapsed_html(val, None, open_depth)}<br/>" for val in d)} </details> </div> """ else: return f"{name} {d}" def unicode_table(rows, title=None, header=None): """ Convert a table (list of lists) of strings to a unicode table. :param list[list[str]] rows: List of rows, each of which is a list of strings for each cell. The first column will be left justified, the others right. Each row must have the same number of cells. :param str title: If specified the first output row will be a single cell containing this string, left-justified. [optional] :param list[str] header: Specifies a row above the main rows which will be in double lined borders and left justified. Must be same length as each row. [optional] :return: The table as a string :rtype: str """ if header is not None: all_rows = [header] + rows else: all_rows = rows widths = [ max(len(row[i_col]) for row in all_rows) for i_col in range(len(all_rows[0])) ] out = [] if title is not None: w = sum(widths) + len(rows[1]) - 1 out += [ f"╔{'═' * w}\n" f"║{title.ljust(w)}\n", f"╠{'╤'.join('═' * w for w in widths)}\n", ] if header is not None: out += [ f"╔{'╤'.join('═' * w for w in widths)}\n" f"║{'│'.join(cell.ljust(w) for cell,w in zip(header,widths))}\n", f"╠{'╪'.join('═' * w for w in widths)}\n", ] out += [ f"╟{'┼'.join('─' * w for w in widths)}\n".join( f"║{row[0].ljust(widths[0])}│" + f"{'│'.join(cell.rjust(w) for cell, w in zip(row[1:], widths[1:]))}\n" for row in rows ), f"╚{'╧'.join('═' * w for w in widths)}\n", ] return "".join(out) def tree_sequence_html(ts): table_rows = "".join( f""" <tr> <td>{name.capitalize()}</td> <td>{table.num_rows}</td> <td>{naturalsize(table.nbytes)}</td> <td style="text-align: center;"> {'✅' if hasattr(table, "metadata") and len(table.metadata) > 0 else ''} </td> </tr> """ for name, table in ts.tables.name_map.items() ) return f""" <div> <style> .tskit-table thead tr th {{text-align: left;}} .tskit-table tbody tr td:first-of-type {{text-align: left;}} .tskit-details-label {{vertical-align: top; padding-right:5px;}} .tskit-table-set {{display: inline-flex;flex-wrap: wrap;margin: -12px 0 0 -12px;width: calc(100% + 12px);}} .tskit-table-set-table {{margin: 12px 0 0 12px;}} details {{display: inline-block;}} summary {{cursor: pointer; outline: 0; display: list-item;}} </style> <div class="tskit-table-set"> <div class="tskit-table-set-table"> <table class="tskit-table"> <thead> <tr> <th style="padding:0;line-height:21px;"> <img style="height: 32px;display: inline-block;padding: 3px 5px 3px 0;"src="https://raw.githubusercontent.com/tskit-dev/administrative/main/tskit_logo.svg"/> <a target="_blank" href="https://tskit.readthedocs.io/en/latest/python-api.html#the-treesequence-class"> Tree Sequence </th> </tr> </thead> <tbody> <tr><td>Trees</td><td>{ts.num_trees}</td></tr> <tr><td>Sequence Length</td><td>{ts.sequence_length}</td></tr> <tr><td>Sample Nodes</td><td>{ts.num_samples}</td></tr> <tr><td>Total Size</td><td>{naturalsize(ts.nbytes)}</td></tr> <tr> <td>Metadata</td><td style="text-align: left;">{obj_to_collapsed_html(ts.metadata, None, 1) if len(ts.tables.metadata_bytes) > 0 else "No Metadata"}</td></tr> </tbody> </table> </div> <div class="tskit-table-set-table"> <table class="tskit-table"> <thead> <tr> <th style="line-height:21px;">Table</th> <th>Rows</th> <th>Size</th> <th>Has Metadata</th> </tr> </thead> <tbody> {table_rows} </tbody> </table> </div> </div> </div> """ # noqa: B950 def convert_file_like_to_open_file(file_like, mode): # Get ourselves a local version of the file. The semantics here are complex # because need to support a range of inputs and the free behaviour is # slightly different on each. _file = None local_file = True try: # First, see if we can interpret the argument as a pathlike object. path = os.fspath(file_like) _file = open(path, mode) except TypeError: pass if _file is None: # Now we try to open file. If it's not a pathlike object, it could be # an integer fd or object with a fileno method. In this case we # must make sure that close is **not** called on the fd. try: _file = open(file_like, mode, closefd=False, buffering=0) except TypeError: pass if _file is None: # Assume that this is a file **but** we haven't opened it, so we must # not close it. if mode == "wb" and not hasattr(file_like, "write"): raise TypeError("file object must have a write method") _file = file_like local_file = False return _file, local_file