python code i have known and loved

For almost all of my research I employ python code for the higher level work. Below is a collection of python code that I have written for myself that I often find myself using in most of my research. My hope is that you can use it too! There are examples of how to use each piece of code, wherever applicable. Sorted in alphabetical order, wherever applicable.

I mostly use the standard library and the numpy package, wherever there is an import it will be specified at the top of the code block. Most of the code is rather version agnostic after interpreter version 3.9< and mostly relies on the built-in typing support and walrus operator from these most recent versions, which was not available in earlier versions. For my personal work I use numpy version 1.26 for most things, but am warming up to 2.x for some of the crazier CUDA work that is coming out on it. All recently supported numpy versions should work swimmingly for the code below.

ENJOY AND COPY AWAY!
(I am not in any way responsible or liable for your usage or derivatives of the code below, it is all only intended to be free reference material)

sections

  1. __main__
  2. AutomatedLoggingParent
  3. csv_get
  4. csv_to_hfd5
  5. dict_filter
  6. dict_transform
  7. flattener
  8. get_arr_size
  9. get_multithread_chunksize
  10. gts
  11. hfd5_read
  12. strecord

__main__

# start each project off right! 

if __name__ == '__main__':
    print(f"running {__file__}")

AutomatedLoggingParent

import csv
import inspect
import os
import time

class AutomatedLoggingParent:
    """
    Python support for automated logging has historically been extremely
    inflexible, use case restricted, and not easily portable. Folks will
    often write their own logging decorators and manually place them onto
    all of their own functions, these are often rigid in practice and
    require lots of time to write. Even then it has proven very difficult
    for folks to write logging functions/decorators that can safely and
    reliably permeate class structures and their methods. By simply
    inheriting this class as the parent of any class, it will automatically
    have full scope logging functionality working. There are 5 benefits that
    this class offers:
    1: zero time requirements, simply adding this as a parent is all the work
        that is required to automatically log everything in the target class
    2: automatically logs every call to every 'public' method, this includes
        the class+name of method, all its inputs, all its outputs, what time
        it returned, and generates a log every single time its used
    3: automatically shares logs between all classes that inherit this class
        by use of each class's __call__ method, wherein once a child class
        calls on another child class's __call__ method the called child
        enters all its logs into the caller child's logs
    4. allows users to easily enter their own bespoke log messages,
        containing they want, anywhere they want, and invoked how they want
    5. allows users to easily look up whatever log entries they are looking
        for by invoking a simple user-defined labeling system
    Logs can also be directly saved to a csv file at any point. The csv file
    has three rows: Label, Timestamp, and Message. The Label acts as a
    keyword filter to look up any specific type of logs, the Timestamp is
    unix nanosecond the log was taken at, and the Message is a combination
    of information from both the automated logging capabilities and the
    bespoke user log entries.

    While all child instances retain their own logs (available by calling it
    directly with (child_instance.log), this parent class also retains its own
    log that chronologically collects every log from every child instance. To
    access the parents log (which is the chronicle collection of every log from
    every instance) just call the master log attribute from any instance; E.G.
    child_instance.master_log

    Limitations: Child method tracebacks will be different than they would
    be if they did not inherit this class. Each of their function calls
    passes through the included logging decorator and this will be reflected
    in their tracebacks. If a form of automated programmatic traceback
    handling is not robust then it may not yield expected results.

    To use this, simply just inherit and initialize it like any other parent:
    >>> class MyClass(AutomatedLoggingParent):
    >>> def __init__(self,): super().__init__()
    """

    # define a master log of the parent class
    master_log = []

    def __init__(self):
        # define internal log attribute
        self._log = []

    def __init_subclass__(cls, *a: any, **k: any) -> None:
        """
        Puts logging functionality onto callable children attributes.

        :param a: positional argument pass through
        :param k: keyword argument pass through
        :return: None
        """

        super().__init_subclass__(*a, **k)

        # loop through all of child's accessible attributes
        for i in dir(cls):

            # decorate non-private callables and init + call and ensure
            # none of the logging tools get recursively captured
            if ((callable(getattr(cls, i)) and not
               (i.startswith('_' + cls.mro()[-2].__name__ + '__') or
               i.startswith('__'))) or i in ['__init__', '__call__']) and (
                    i not in ['log', '__make_log', 'save_log']):

                # record the original attr attrs
                _tmp = getattr(cls, i)
                _original_attr_dict = {j: getattr(_tmp, j) for j in dir(_tmp)}

                # add flag for defined @staticmethods
                getattr(cls, i).__dict__['_logref_istaticmeth'] = (
                    isinstance(inspect.getattr_static(cls, i), staticmethod))

                # set attr into child as a logged attr and rerecord attr attrs
                setattr(cls, i, cls._AutomatedLoggingParent__make_log(
                    getattr(cls, i)))
                getattr(cls, i).__dict__.update(_original_attr_dict)

    @property
    def log(self): return self._log

    @log.setter
    def log(self, _in: str | tuple[str]) -> None:
        """
        Appends message and optional label into instance's log

        :param _in: information to append into log; if _in has one string
            element then it is considered as the message, if _in has two
            string elements then the first is a label and the second is
            the message.
        :return: None
        """

        # create a generic label if not already present
        _l, _m = ('Log Entry', _in) if isinstance(_in, str) else _in

        # for data that is meant to just be inserted
        if _l == '__log_insert':
            self._log.extend(_m)
            self.master_log.extend(_m)
        # for data that is being logged as is
        else:
            self._log.append([_l, time.time_ns(), _m])
            self.master_log.append([_l, time.time_ns(), _m])

    @staticmethod
    def __make_log(method_in: callable) -> any:
        """
        Log 'decorator' itself, this appends non-private callable attributes
        into log automatically and will pass logs between children of this
        class via a __call__ method call in one another.

        :param method_in: callable attribute (and its inputs)
        :return: output from the callable attribute
        """

        # handing off existing attributes if they need to be rereferenced
        for i in [j for j in dir(method_in)]:

            # hand off each attribute if possible
            try: setattr(internal_wrapper, i, getattr(method_in, i))
            except: pass

        def internal_wrapper(self, *a: any, **k: any) -> any:
            """
            Wrapper function for callable attribute logging.

            :param self: assumed self variable
            :param a: positional argument pass through
            :param k: keyword argument pass through
            :return: callable output
            """

            # execute the callable with its inputs and collect the outputs
            # defined staticmethods are called without self, while normal
            # methods and classmethods are called with a self/cls
            _out = method_in(*a, **k) if method_in._logref_istaticmeth else (
                method_in(self, *a, **k))

            # logging message in format: function ---> [inputs] ---> [outputs]
            _m = (f"{'.'.join(method_in.__qualname__.split('.')[-2:])} "
                  f"---> [{a}] && [{k}] ---> {_out}")

            # when one child is called from another with logs then the calling
            # one gets a copy of the called one's current log
            if method_in.__qualname__.split('.')[-1] == '__call__':
                try:
                    # pass back contents of current instance log + current call
                    inspect.stack()[1].frame.f_locals["self"].log.extend(
                        self.log)
                    inspect.stack()[1].frame.f_locals["self"].log = (
                        'Method Call', _m)

                # if there is an issue passing back a log then just continue
                except:pass

            # append message into log and return callable attribute's output
            self.log = 'Method Call', _m

        # execute and return the internal wrapper
        return internal_wrapper

    def save_log(self, master: bool = False, path: str | None = None) -> str:
        """
        Saves current (child/all childrens') log to csv file

        :param master: True: save logs of all children, False: only current
        :param path: (optional) user defined save path: path, dir, or name

        :return: path which log is saved to
        """

        # generate unique name by using this current files
        # name _ current instance hash _ unix nanosecond _ random int
        _tmp_name = (f"_RuntimeLog_{os.path.basename(__file__)}_"
                     f"{hash(self)}_{time.time_ns()}_"
                     f"{str(int.from_bytes(os.urandom(2))).zfill(5)}.csv")

        # if no path is provided then save unique name to cwd
        if path is None: _p = os.path.join(os.getcwd(), _tmp_name)
        # if provided path is dir then save unique name to dir
        elif os.path.isdir(path): _p = os.path.join(path, _tmp_name)
        # if provided path has no dir then save name to cwd
        elif os.path.dirname(path) == '': _p = os.path.join(os.getcwd(), path)
        # else it is assumed to be a relative/complete path
        else: _p = os.path.abspath(path)

        # write file with added headers
        with open(_p, 'w', newline='') as f:
            csv.writer(f).writerows(
                [['Label', 'Timestamp', 'Message']] +
                (self.master_log if master else self.log))

        # notify and return
        print(f"{self.__class__.__name__} current " 
              f"{'parent' if master else 'instance'}'s "
              f"runtime log saved to: \n{_p}")
        return _p

# ---------------- example ----------------


class TestClass_1(AutomatedLoggingParent):
    def __init__(self, *a, **k): super().__init__()

    def __call__(self, example_func, *a, **k):
        example_func(a)
        self.testfunc_1(a)
        return a

    def testfunc_1(self, *a):
        self.log = 'MyOwnLabel_1', 123
        return a

    @staticmethod
    def staticmethod(*a):
        return f"static inputs: {a}"

    @classmethod
    def classmethod(cls, *a):
        return f"class inputs: {cls} : {a}"


class TestClass_2(AutomatedLoggingParent):
    def __init__(self, *a, **k):
        super().__init__()
        self.other_log_child = TestClass_1(456, some_kwarg='parameter')
        self.other_log_child.staticmethod('static arg 1')
        self.other_log_child.classmethod('class arg 1')

    def __call__(self, *a, **k):
        self.testfunc_2(789)
        self.other_log_child(self.testfunc_2, efgh='ijkl')
        return a

    def testfunc_2(self, *a):
        self.log = 'MyOwnLabel_2', 'abcd'
        self.log = 'bespoke message here'
        return a

class TestClass_3(AutomatedLoggingParent):
    def __init__(self):
        super().__init__()
        self.log = 'MASTER TEST', 'custom message for master log'

logging_test = TestClass_2()
logging_test()
TestClass_3()
print(f"Current log is as follows: \n{logging_test.log}")
log_path = logging_test.save_log()
print(f"Master log is as follows: \n{logging_test.master_log}")
master_log_path = logging_test.save_log(master=True)
os.startfile(log_path)

csv_get

import csv
import os
import numpy as np
def csv_get(*csv_paths: str | list[str] | list,
            lck: bool = False,
            dne_crash: bool = False,
            only_str: bool = False,
            ):
    """
    Read in list of csv paths to list of corresponding dict
    which are composed of potentially ragged value lengths.
    CSV headers (row 0) used for key assignment. CSV can also be
    read in as string
    ** ragged csvs are read in to the shortest element if only_str is False **

    :param csv_paths: csv paths as str|path or arbitrary iterables of str|path
    :param lck: enforce all keys to be lowercase
    :param dne_crash: ends program if csv does not exist, or continues
    :param only_str: if True, will only return read in csvs a big string
    :return: path data, either list of dicts of array float|str, or str list
    """

    # define internal help functions
    def __flattener(_in) -> list:
        """
        Internal helper function, flattens arbitrarily neste iterables
        :param _in: Any level of arbitrarily nested lists/tuples/sets
        :return: A flattened list
        """

        # verify that incoming object is of desired type, else return it
        if not isinstance(_in, (list, set, tuple, np.ndarray)):
            return [_in]

        # initiate the outgoing list and loop through elements of incoming list
        __out = []
        for _i in _in:
            # make recursive call if iterable else append item to __out
            __out += flattener(_i) if (
                isinstance(_i, (list, set, tuple, np.ndarray))
            ) else [_i]

        # return flattened list
        return __out

    def __arr_get(_in):
        """
        Internal helper function, converts object to float or string array
        :param _in: iterable object
        :return: float or string array
        """

        # attempt to return numerical array else string array
        try: return np.squeeze(np.array(_in, dtype=float))
        except: return np.squeeze(np.array(_in, dtype=str))

    # initialize list of outgoing csv data and loop through paths
    _out = []
    for i in __flattener(csv_paths):
        # for files that do not exist, crash or pass accordingly
        if not os.path.isfile(str(i)):
            print(_m := f"attempted to read csv file, but path not found: {i}")
            if dne_crash: raise Exception(_m)
            else: continue

        # read in data
        with open(i, newline='') as f: reader = list(map(list, csv.reader(f)))

        # return in desired structure, string or dict of arrays
        if only_str: _out.append(reader)
        else:
            _out.append({str(k) if not lck else str(k).lower(): __arr_get(v)
                         for k, v in
                         zip(reader.pop(0), map(list, zip(*reader)))})
    # return list of read in csv data
    return _out

# ---------------- example ----------------
# create 2 random csv paths
csvpath_1 = os.path.join(os.getcwd(), f"csvExample_{[hash(os.urandom(9))]*4}.csv")
csvpath_2 = os.path.join(os.getcwd(), f"csvExample_{[hash(os.urandom(9))]*4}.csv")
# create 2 random csv datasets of floats and strings
csvdata_1 = [['numbers_1', *[int.from_bytes(os.urandom(1)) for _ in range(22)]],
          ['strings_1', *[chr(33 + (int.from_bytes(os.urandom(1)) % 77)) for _ in range(22)]]]
csvdata_2 = [['numbers_2', *[int.from_bytes(os.urandom(1)) for _ in range(66)]],
          ['strings_2', *[chr(33 + (int.from_bytes(os.urandom(1)) % 77)) for _ in range(66)]]]
# write these datasets to these paths
with open(csvpath_1, 'w+', newline='') as csvf:
    csv.writer(csvf).writerows(map(list, zip(*csvdata_1)))
with open(csvpath_2, 'w+', newline='') as csvf:
    csv.writer(csvf).writerows(map(list, zip(*csvdata_2)))
# read them back in
csvdata_read = csv_get(csvpath_1, csvpath_2)
print(f"CSV data from {csvpath_1}: \n\n {csvdata_read[0]} \n\n\n"
      f"CSV data from {csvpath_2}: \n\n {csvdata_read[1]} \n\n\n")
# remove random data
os.remove(csvpath_1), os.remove(csvpath_2)

csv_to_hfd5

* NOTE* this function has two additional dependencies: (1) the csv_get function found on this page, and (2) the third party h5py package

import csv
import os
import h5py
import numpy as np
def csv_to_hfd5(*paths_in: str | list[str] | list,
                paths_out: None | list[str] = None,
                dir_out: str = None,
                ) -> list[str]:
    """
    Copies data from csv files into HDF5 files, assumes headers as keys

    :param paths_in: csv paths as str|path or arbitrary iterables of str|path
    :param paths_out: (optional) list of outgoing relative paths or filenames
    :param dir_out: (optional) str of root directory of outgoing paths
    :return: list of paths to the respective hdf5 files
    """

    # define internal help functions
    def __flattener(_in) -> list:
        """
        Internal helper function, flattens arbitrarily nested iterables
        :param _in: Any level of arbitrarily nested lists/tuples/sets
        :return: A flattened list
        """

        # verify that incoming object is of desired type, else return it
        if not isinstance(_in, (list, set, tuple, np.ndarray)):
            return [_in]

        # initiate the outgoing list and loop through elements of incoming list
        __out = []
        for _i in _in:
            # make recursive call if iterable else append item to __out
            __out += flattener(_i) if (
                isinstance(_i, (list, set, tuple, np.ndarray))
            ) else [_i]

        # return flattened list
        return __out

    # read all csv data into list
    _paths_in = __flattener(paths_in)
    data_in = csv_get(_paths_in, dne_crash=True)

    # make outgoing filenames, depending on paths_out and dir_out permutations
    if dir_out is None:
        _out = [os.path.splitext(i)[0] + '.hdf5' for i in _paths_in
                ] if (paths_out is None) else ([
            os.path.join(os.path.split(i)[0], j) for i, j in zip(
                _paths_in, paths_out)])
    else:
        _out = [os.path.join(
            dir_out, os.path.splitext(os.path.basename(i))[0] + '.hdf5')
            for i in _paths_in] if (paths_out is None) else ([
            os.path.join(dir_out, i) for i in paths_out])

    # loop through all datasets and columns
    for i, j in zip(data_in, _out):
        with h5py.File(j, 'w') as f:
            for k, l in i.items(): f.create_dataset(k, data=l)

    # return the hdf5 paths
    return _out

dict_filter

import operator
import numpy as np
def dict_filter(dict_in: dict[str, np.ndarray],
                operations: list[list[object, callable, object]]
                ) -> dict[str, np.ndarray]:
    """
    Quick way to filter all values of all keys of a dictionary based on
    applying any number of operators on the same number of keys. Dict
    values assumed to be equilength 1D numpy arrays, operators assumed to be
    operator objects, and values assumed to be of the same type as array
    that is being operated on. Returns copy of the filtered dict.

    :param dict_in: dict of equilength 1D numpy arrays
    :param operations: list of list of operations, each sublist has 3 parts:
        [key, operator, value]
            example format for filtering down to where key 'a' is above 1
            and key 'b' is equal to 'c' would be:
                [['a', operator.gt, 1], ['b', operator.eq, 'c'], ]
    :return: copy of filtered dict
    """

    # build a mask for each key operation
    single_masks = [
        np.sum(
            [i[1](dict_in[i[0]], j) for j in i[2]],
            axis=0).astype(bool) if
        type(i[2]) == list else
        i[1](dict_in[i[0]], i[2]) for i in operations
    ]

    # merge all single masks into a mask across all keys
    mask_all = np.prod(np.vstack(single_masks), axis=0).astype(bool)

    # apply the mask to all keys and return
    return {k: v for k, v in zip(
        dict_in.keys(),
        map(lambda x: np.array(x)[mask_all],
            map(lambda x: dict_in[x], dict_in.keys())))}

# ---------------- example ----------------
foo = {'a': np.arange(5), 'b': np.array(['c', 'c', 'c', 'c', 'z']), }
bar = [['a', operator.gt, 1], ['b', operator.eq, 'c'], ]
print(dict_filter(foo, bar))

dict_transform


def dict_transform(dict_in: dict,
                   user_func: callable,
                   keys_out: list[str] | None = None,
                   ) -> dict:
    """
    Applies a user defined function the (optionally) user provided keys
    of a dict. If the keys are not specified then function is applied over
    all keys. Returns a copy of all the used keys in a single dict.
    :param dict_in: dictionary to apply function to
    :param user_func: function which to apply
    :param keys_out: optional, apply func_in to this list of keys, else all
    :return: dict_in copy, only containing keys to which function was applied
    """

    # check if there are outgoing keys provided, else use all incoming keys
    keys_out = dict_in.keys() if keys_out is None else keys_out

    # return specific keys with function applied
    return {k: v for k, v in zip(
        keys_out,
        map(user_func,
            map(lambda x: dict_in[x], keys_out)))}

# ---------------- example ----------------
foo = {'a': 'to me', 'b': 'to you', 'c': 'world', }
bar = lambda x: f"Hello {x}!!"
print(dict_transform(foo, bar, ['b', 'c']))

flattener

import collections.abc


def flattener(in_: collections.abc.Iterable,
              type_tuple: tuple[type] = (list, set, tuple,)) -> list:
    """
    Recursive function provides ordered flattening of arbitrarily
    nested permutations of user defined iterables into a single
    unnested outgoing list.

    :param in_: Any level of arbitrarily nested iterable objects
    :param type_tuple: a tuple of types that should be unpacked
    :return: A flattened list
    """
    # verify that incoming object is of desired type, else return it
    if not isinstance(in_, type_tuple): return [in_]

    # initiate the outgoing list and loop through elements of incoming list
    _out = []
    # make recursive call to flatten
    for i in in_: _out += flattener(i) if (isinstance(i, type_tuple)) else [i]

    # return flattened list
    return _out

get_arr_size

import sys
import numpy as np

def get_arr_size(x: np.ndarray, disp: bool = True) -> int:
    """
    Quick way to see the amount of memory a np array is using
    :param x: array in question
    :param disp: bool to print the array size or not
    :return: int of array size in bytes
    """
    # ensure it's a np array
    assert(isinstance(x, np.ndarray))

    # get byte size from sys, or from the array attrs if sys isn't imported
    try:
        bsize = sys.getsizeof(x)
    except NameError:
        # add 128 bytes for the np array structure overhead
        bsize = x.nbytes+128

    if disp:
        # get magnitude and prefixes
        mag = int((len(str(round(bsize))) - 1)/3)
        pre = ['', 'K', 'M', 'G', 'T']
        print(f"Array is {round(bsize/(10**(mag*3)), 2)} {pre[mag]}b")

    return bsize

get_multithread_chunksize

# -- std
import collections.abc
import functools
import multiprocessing
import os
import time
import threading
# -- third party
import psutil

def get_multithread_chunksize(
        func: collections.abc.Callable,
        iterable: collections.abc.Iterable,
        loops: int = 3,
        base: int = 5,
        step: int = 30,
        refine: int = 4,
        smooth: int = 3,
        disp: int = 1,
        poolobj: str = "multiprocessing.Pool(os.cpu_count()-4)",
        poolattr: str = "map",
        is_main: bool = True,
        fixed_iter: collections.abc.Iterable | None = None) -> int:
    """
    When running multithreaded/multiprocess codes, getting the optimal
        "chunksize" can be the difference between extremely fast runtimes or
        code that may take years to complete. If chunksize is not set as the
        input argument then most 'pool.map' process/threaded like objects
        use a heuristic to set this variable. However, this is often just
        'good enough', but never optimal. This function empirically tests
        what the optimal chunksize is at this current moment; this can
        fluctuate wildly depending on, architecture, how much compute
        resource is tied up, and other system settings. With this function,
        one can empirically test what the best chunksize is for their
        current application at this current moment. Ideally a subset of data
        is fed to the target function, and its runtimes are measured. This
        function performs a refined grid-search to find the optimal chunksize.
        Returns an int that is intended to be given to the intended
        map-like object in the forma of (return x) --> .map(...,chunksize=x).
            * NOTE: CPU UTILIZATION is measured as the difference of
            * sleeping cpus versus cpus during runtime. If some
            * background action was occurring when the sleep
            * utilization was measured then it is possible that the
            * cpu utilization values could be negative, in which case
            * they can safely be ignored since the sleep measurement
            * that they are being compared against is not correct.

    :param func: The function that is being provided to the map
    :param iterable: The iterable that is being provided to the map; too many
        iterable elements will increase this functions run time, too few
        iterable elements may not capture the correct optimal chunksize
        if the map is not being fully utilized
    :param loops: How many times to repeat the testing, more loops
        increases the runtime of this function but may yield more
        optimal results
    :param base: The minimum chunksize to start with
    :param step: The coarsest step size to start with, this will be
        linearly decreased as the grid search gets refined. If step
        is too small then a local minima may be returned instead of a
        global optima, if step is too large then it may widely step
        over the global optima
    :param refine: How many times the grid-search should be refined into
        a smaller targeted area. Too many refinements may not prove useful
        as they just sweep the chunksize by 1 and needlessly increase
        runtime. Too few refinements may not prove useful to actually
        converging on the global optima
    :param smooth: How may chunksize results the grid-search should average
        over to determine when its converged. Setting smooth too small will
        only consider a tiny amount of the most recent grid-search results
        and some amount of accidental noise may force it to converge on
        noise, Setting smooth too large will increase the runtime of the
        function because it will be considering a larger amount of points
        before it has determined convergence and thus requires more testing
    :param disp: int:[0, 1, 2], level of messages to print to console
        0 = print no messages to console
        1 = print only headlines to console
        2 = print runtime measurements and headlines to console
    :param poolobj: String of the multiprocess/multithread pool object. This
        will be directly initialized with an eval statement: eval(poolobj).
        Example 1: "multiprocessing.Pool(os.cpu_count()-3)"
        Example 2: "multiprocessing.pool.ThreadPool(4)"
        Example 3: "multiprocessing.Pool(processes=None, initializer=None)"
    :param poolattr: String of the multiprocess/multithread pool object
        attribute that will be executing the runtime. Must be attr of poolobj.
        Examples: "map", "imap_unordered", "map_async", "starmap"
    :param is_main: bool, if True it will run the function within __main__,
        if False will run outside of __main__
    :param fixed_iter: If there are other fixed iterables that need to
        be fed to the func, they can set here. If iterables are provided
        to this argument they are initialized with a partial function
        which will set these as the first positional inputs and the
        iterable provided in the iter argument will be given as the
        last position. Note that tuples and lists are assumed to contain
        multiple input arguements and are unpacked, if you want to give just
        one argument "a" that is a list as input, set this to fixed_iter=[a].
    :return: int: optimal chunksize
    """

    # ensure that this function is not recursively called by multiple processes
    if is_main:
        if __name__ == "__main__": pass
        else:
            print('get_multithread_chunksize not running outside "__main__", '
                  'if it should then set the "is_main" argument to False; '
                  'if you see this message multiple times it is likely that '
                  'you called the get_multithread_chunksize in a global '
                  'context, this is a safeguard against that.')
            return None

    # enforce strong type checking
    assert callable(func)
    assert isinstance(iterable, collections.abc.Iterable)
    for i in (loops, base, step, refine, smooth, disp):
        assert isinstance(i, int)
    assert disp in [0, 1, 2]
    for i in (poolobj, poolattr): assert isinstance(i, str)
    assert (isinstance(fixed_iter, collections.abc.Iterable) or
            fixed_iter is None)

    # define helper functions for mean and measuring resource utilization
    def __mean(x): return sum(x)/len(x)

    def __resource(_event, _cpu, _ram):
        if "psutil" in globals().keys():
            while not _event.is_set():
                _cpu += [psutil.cpu_percent()]
                _ram += [psutil.virtual_memory().percent]
                time.sleep(0.1)
        else:
            _cpu += [0, 0]
            _ram += [0, 0]


            # measure background utilization to compare to runtime utilization
    # initialize mutables (_cpu, _ram) here so they don't return
    _cpu, _ram = [], []
    _e = threading.Event()
    _t = threading.Thread(target=__resource, args=(_e, _cpu, _ram))
    _t.start(); time.sleep(1); _e.set(); _t.join()
    _bg_cpu, _bg_ram = round(__mean(_cpu[1:]), 2), round(__mean(_ram), 2)
    if disp > 1: print(f"background resource usage :: background memory "
                       f"utilization is {_bg_ram}%, background cpu "
                       f"utilization is {_bg_cpu}%")

    # if there are some iters that are fixed then init them as a wrapper
    if fixed_iter is None: _f = func
    else: _f = functools.partial(func, *fixed_iter)

    # initialize pool before running tests so test times do not include init
    with eval(poolobj) as p:

        # verify pool object has the requested attribute, and set it
        if poolattr not in dir(p):
            raise Exception(f"pool object (poolobj): {p} does not have "
                            f"the requested pool attribute (poolattr): "
                            f"'{poolattr}'. \nHere is a selection of "
                            f"attributes that the provided poolobj has: "
                            f"{[i for i in dir(p) if not i.startswith('_')]}")
        else: mapper = getattr(p, poolattr)

        # get function name
        try: __fn = func.__name__
        except AttributeError: __fn = "'func'"

        if disp > 0: print(f"{time.strftime('%H:%M:%S', time.localtime())} ::"
                           f" Starting chunksize testing on :: "
                           f"{poolobj}.{poolattr}({__fn}, chunksize = ?, ...)")
        best_chunks = []

        for loop_num in range(loops):
            # initialize runtime base to provided one
            rt_base = base + 1
            for i in range(1, refine+1):

                # initialize chunk testing variables
                chunk_list, _cpu_utilization, _ram_utilization = [], [], []
                step_size, _rts, k = int(step/i) + 1, [9e9] * (smooth + 1), 0
                rt_means = [__mean(_rts[:smooth]), __mean(_rts[smooth:])]
                if disp > 1: print(f"--base is {rt_base}; step is {step_size}")

                # continue to run while the mean is the same or decreasing
                while (rt_means[-1] - rt_means[-2]) <= 0:
                    # calculate chunksize to test and record it
                    tmp_chunk = rt_base + (k * step_size)
                    chunk_list += [tmp_chunk]

                    # begin measuring cpu and ram usage on separate thread,
                    # initialize mutables (_cpu, _ram) here so they don't
                    # return
                    _cpu, _ram = [], []
                    _e = threading.Event()
                    _t = threading.Thread(
                        target=__resource, args=(_e, _cpu, _ram))
                    _t.start()

                    # start timing and run mapper
                    _st = time.time()
                    list(mapper(
                        func=_f, iterable=iterable, chunksize=tmp_chunk))
                    _runtime = time.time() - _st

                    # stop measuring cpu and ram usage on separate thread
                    _e.set(); _t.join()

                    # record times & resources and update running mean
                    _rts += [_runtime]
                    rt_means += [__mean(_rts[-smooth:])]
                    _cpu_utilization += [__mean(_cpu[1:]) - _bg_cpu]
                    _ram_utilization += [__mean(_ram) - _bg_ram]
                    k += 1
                    if disp > 1: print(
                        f"chunksize is {tmp_chunk}, "
                        f"runtime is {round(_runtime, 2)}s, "
                        f"memory util is {round(_ram_utilization[-1], 2)}%, "
                        f"cpu util is {round(_cpu_utilization[-1], 2)}%")

                # update the base for the next loops grid-search refinement
                rt_base = chunk_list[_rts.index(min(_rts)) - 2 * (smooth + 1)]

            # retrieve best results
            _best_ind = _rts.index(min(_rts)) - (smooth + 1)
            _best_size = chunk_list[_best_ind]
            _best_cpu_util = round(_cpu_utilization[_best_ind], 2)
            _best_ram_util = round(_ram_utilization[_best_ind], 2)
            if disp > 0: print((f"{time.strftime('%H:%M:%S', time.localtime())}"
                               f" :: Optimal chunksize for loop {loop_num+1} "
                                f"of {loops} is {_best_size}; it utilized "
                                f"roughly {_best_cpu_util}% of all available "
                                f"cpu resources, and {_best_ram_util}% of all "
                                f"available memory resources").replace(
                " 0.0% ", " N/A "))

            # record the best chunk of the current outer loop iteration
            best_chunks += [_best_size]

    # take the mean of the best chunksizes
    optimal_chunk = int(__mean(best_chunks))
    if disp > 0: print(f"{time.strftime('%H:%M:%S', time.localtime())} :: "
                       f"Optimal chunksize found to be {optimal_chunk} --> \n"
                       f"{poolobj}.{poolattr}({__fn}, iterable, chunksize = "
                       f"{optimal_chunk})")

    return optimal_chunk


# ---------------------------------- EXAMPLE ----------------------------------

# example test, randomly generate a test array with 50**3 random rows
# and randomly generate 100 rows. Test to see if any of the 50*50*50 rows
# is in the randomly generated 100 rows
import numpy as np
test_rows = np.array(np.random.rand(50**3, 5)*1e2, dtype=int)
ref_rows = [i.tolist() for i in
            np.array(np.random.rand(100, test_rows.shape[-1])*1e2, dtype=int)]


# (e)xample (f)unc for pool map
def _ef(reference_rows, test_row):
    if test_row.tolist() in reference_rows:
        print("FOUND TEST ROW IN SET OF REFERENCE ROWS")


# run test
if __name__ == '__main__':
    get_multithread_chunksize(_ef, test_rows, fixed_iter=[ref_rows]); quit()

gts

import time
def gts() -> str:
    """
    get current timezone timestamp

    :return: a string of numbers from yyyy to s (underscore) ns
    """

    return (f"{time.strftime('%Y%M%d%H%M%S', time.localtime())}_"
            f"{str(time.time()).split('.')[1]}")

# ---------------- example ----------------
print(gts())

hfd5_read

* NOTE* this function has one additional dependency: the third party h5py package

import csv
import os
import h5py
import numpy as np
def hfd5_read(*paths_in: str | list[str] | list,
              dne_crash: bool = False,
              ) -> list[dict[str|np.ndarray]]:
    """
    Reads in data from HDF5 files

    :param paths_in: hdf5 paths as str|path or arbitrary iterables of str|path
    :param dne_crash: ends program if path does not exist, or continues
    :return: list of paths to the respective hdf5 files
    """

    # define internal help functions
    def __flattener(_in) -> list:
        """
        Internal helper function, flattens arbitrarily nested iterables
        :param _in: Any level of arbitrarily nested lists/tuples/sets
        :return: A flattened list
        """

        # verify that incoming object is of desired type, else return it
        if not isinstance(_in, (list, set, tuple, np.ndarray)):
            return [_in]

        # initiate the outgoing list and loop through elements of incoming list
        __out = []
        for _i in _in:
            # make recursive call if iterable else append item to __out
            __out += flattener(_i) if (
                isinstance(_i, (list, set, tuple, np.ndarray))
            ) else [_i]

        # return flattened list
        return __out

    # pull out paths and verify they all exist, raise exception if user wants
    _paths = __flattener(paths_in)
    if (not all(os.path.isfile(i) for i in _paths)) and dne_crash:
        raise Exception("Hdf5 file(s) provided for reading, but not found")

    # initialize list of outgoing data and loop through hdf5 paths
    _out = []
    for i in _paths:
        # verify the current path exists
        if not os.path.isfile(i): continue

        # read into outgoing object
        _tmp_dict = {}
        with h5py.File(i, "r") as f:
            # move all key's values from hdf5 object to _out
            for j in f.keys():
                _tmp_dict[j] = f[j][()]
        _out.append(_tmp_dict)

    # return read in hdf5 data
    return _out

strecord

import csv
import json
import os
import time

global_status_path = os.path.join(
    os.path.expanduser("~"),
    "Desktop",
    f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.txt")


def strecord(*m: any, disp: bool = True, new_path: None | str = None) -> None:
    """
    Quick way to show status messages and write them out to a csv/json/text file
    :param m: status messages to be appended into file, multiple messages
        will become comma separated
    :param disp: bool, if status message should be printed to console
    :param new_path: optional path to write to, global used if not provided
    :return: None
    """

    # cache the path and update it if needed
    if new_path or "pcache" not in dir(strecord):
        if new_path: setattr(strecord, "pcache", new_path)
        else:
            global global_status_path
            setattr(strecord, "pcache", global_status_path)

    # parse out message
    _m = m if isinstance(m, str) else ", ".join(str(i) for i in m)

    if bool(disp): print(_m)

    file_type = os.path.splitext(strecord.pcache)[-1].lower()
    _ts = time.strftime('%Y%M%d_%H%M%S', time.localtime())

    # infer file type and write message with timestamp and linebreaks
    with open(strecord.pcache, mode='+at') as f:
        if 'json' in file_type: json.dump({_ts: _m}, f, indent=1)
        elif 'csv' in file_type: csv.writer(f).writerow((_ts,
                                                         *[str(i) for i in m]))
        else: f.write(f"{_ts} :: {_m}\n\n")

# ---------------------------------- EXAMPLE ----------------------------------
# save to csv
new_path_1 = os.path.join(
    os.path.expanduser("~"),
    "Desktop",
    f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.csv")

# save to json
new_path_2 = os.path.join(
    os.path.expanduser("~"),
    "Desktop",
    f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.json")

for i in None, new_path_1, new_path_2:
    strecord("some words", "test1", new_path=i)
    strecord("some words", "test2")
    strecord("some words", "test3")