For almost all of my research I employ python code for the higher level work. Below is a collection of python code that I have written for myself that I often find myself using in most of my research. My hope is that you can use it too! There are examples of how to use each piece of code, wherever applicable. Sorted in alphabetical order, wherever applicable.
I mostly use the standard library and the numpy package, wherever there is an import it will be specified at the top of the code block. Most of the code is rather version agnostic after interpreter version 3.9< and mostly relies on the built-in typing support and walrus operator from these most recent versions, which was not available in earlier versions. For my personal work I use numpy version 1.26 for most things, but am warming up to 2.x for some of the crazier CUDA work that is coming out on it. All recently supported numpy versions should work swimmingly for the code below.
ENJOY AND COPY AWAY!
(I am not in any way responsible or liable for your usage or derivatives of the code below, it is all only intended to be free reference material)
__main__
# start each project off right!
if __name__ == '__main__':
print(f"running {__file__}")
AutomatedLoggingParent
import csv
import inspect
import os
import time
class AutomatedLoggingParent:
"""
Python support for automated logging has historically been extremely
inflexible, use case restricted, and not easily portable. Folks will
often write their own logging decorators and manually place them onto
all of their own functions, these are often rigid in practice and
require lots of time to write. Even then it has proven very difficult
for folks to write logging functions/decorators that can safely and
reliably permeate class structures and their methods. By simply
inheriting this class as the parent of any class, it will automatically
have full scope logging functionality working. There are 5 benefits that
this class offers:
1: zero time requirements, simply adding this as a parent is all the work
that is required to automatically log everything in the target class
2: automatically logs every call to every 'public' method, this includes
the class+name of method, all its inputs, all its outputs, what time
it returned, and generates a log every single time its used
3: automatically shares logs between all classes that inherit this class
by use of each class's __call__ method, wherein once a child class
calls on another child class's __call__ method the called child
enters all its logs into the caller child's logs
4. allows users to easily enter their own bespoke log messages,
containing they want, anywhere they want, and invoked how they want
5. allows users to easily look up whatever log entries they are looking
for by invoking a simple user-defined labeling system
Logs can also be directly saved to a csv file at any point. The csv file
has three rows: Label, Timestamp, and Message. The Label acts as a
keyword filter to look up any specific type of logs, the Timestamp is
unix nanosecond the log was taken at, and the Message is a combination
of information from both the automated logging capabilities and the
bespoke user log entries.
While all child instances retain their own logs (available by calling it
directly with (child_instance.log), this parent class also retains its own
log that chronologically collects every log from every child instance. To
access the parents log (which is the chronicle collection of every log from
every instance) just call the master log attribute from any instance; E.G.
child_instance.master_log
Limitations: Child method tracebacks will be different than they would
be if they did not inherit this class. Each of their function calls
passes through the included logging decorator and this will be reflected
in their tracebacks. If a form of automated programmatic traceback
handling is not robust then it may not yield expected results.
To use this, simply just inherit and initialize it like any other parent:
>>> class MyClass(AutomatedLoggingParent):
>>> def __init__(self,): super().__init__()
"""
# define a master log of the parent class
master_log = []
def __init__(self):
# define internal log attribute
self._log = []
def __init_subclass__(cls, *a: any, **k: any) -> None:
"""
Puts logging functionality onto callable children attributes.
:param a: positional argument pass through
:param k: keyword argument pass through
:return: None
"""
super().__init_subclass__(*a, **k)
# loop through all of child's accessible attributes
for i in dir(cls):
# decorate non-private callables and init + call and ensure
# none of the logging tools get recursively captured
if ((callable(getattr(cls, i)) and not
(i.startswith('_' + cls.mro()[-2].__name__ + '__') or
i.startswith('__'))) or i in ['__init__', '__call__']) and (
i not in ['log', '__make_log', 'save_log']):
# record the original attr attrs
_tmp = getattr(cls, i)
_original_attr_dict = {j: getattr(_tmp, j) for j in dir(_tmp)}
# add flag for defined @staticmethods
getattr(cls, i).__dict__['_logref_istaticmeth'] = (
isinstance(inspect.getattr_static(cls, i), staticmethod))
# set attr into child as a logged attr and rerecord attr attrs
setattr(cls, i, cls._AutomatedLoggingParent__make_log(
getattr(cls, i)))
getattr(cls, i).__dict__.update(_original_attr_dict)
@property
def log(self): return self._log
@log.setter
def log(self, _in: str | tuple[str]) -> None:
"""
Appends message and optional label into instance's log
:param _in: information to append into log; if _in has one string
element then it is considered as the message, if _in has two
string elements then the first is a label and the second is
the message.
:return: None
"""
# create a generic label if not already present
_l, _m = ('Log Entry', _in) if isinstance(_in, str) else _in
# for data that is meant to just be inserted
if _l == '__log_insert':
self._log.extend(_m)
self.master_log.extend(_m)
# for data that is being logged as is
else:
self._log.append([_l, time.time_ns(), _m])
self.master_log.append([_l, time.time_ns(), _m])
@staticmethod
def __make_log(method_in: callable) -> any:
"""
Log 'decorator' itself, this appends non-private callable attributes
into log automatically and will pass logs between children of this
class via a __call__ method call in one another.
:param method_in: callable attribute (and its inputs)
:return: output from the callable attribute
"""
# handing off existing attributes if they need to be rereferenced
for i in [j for j in dir(method_in)]:
# hand off each attribute if possible
try: setattr(internal_wrapper, i, getattr(method_in, i))
except: pass
def internal_wrapper(self, *a: any, **k: any) -> any:
"""
Wrapper function for callable attribute logging.
:param self: assumed self variable
:param a: positional argument pass through
:param k: keyword argument pass through
:return: callable output
"""
# execute the callable with its inputs and collect the outputs
# defined staticmethods are called without self, while normal
# methods and classmethods are called with a self/cls
_out = method_in(*a, **k) if method_in._logref_istaticmeth else (
method_in(self, *a, **k))
# logging message in format: function ---> [inputs] ---> [outputs]
_m = (f"{'.'.join(method_in.__qualname__.split('.')[-2:])} "
f"---> [{a}] && [{k}] ---> {_out}")
# when one child is called from another with logs then the calling
# one gets a copy of the called one's current log
if method_in.__qualname__.split('.')[-1] == '__call__':
try:
# pass back contents of current instance log + current call
inspect.stack()[1].frame.f_locals["self"].log.extend(
self.log)
inspect.stack()[1].frame.f_locals["self"].log = (
'Method Call', _m)
# if there is an issue passing back a log then just continue
except:pass
# append message into log and return callable attribute's output
self.log = 'Method Call', _m
# execute and return the internal wrapper
return internal_wrapper
def save_log(self, master: bool = False, path: str | None = None) -> str:
"""
Saves current (child/all childrens') log to csv file
:param master: True: save logs of all children, False: only current
:param path: (optional) user defined save path: path, dir, or name
:return: path which log is saved to
"""
# generate unique name by using this current files
# name _ current instance hash _ unix nanosecond _ random int
_tmp_name = (f"_RuntimeLog_{os.path.basename(__file__)}_"
f"{hash(self)}_{time.time_ns()}_"
f"{str(int.from_bytes(os.urandom(2))).zfill(5)}.csv")
# if no path is provided then save unique name to cwd
if path is None: _p = os.path.join(os.getcwd(), _tmp_name)
# if provided path is dir then save unique name to dir
elif os.path.isdir(path): _p = os.path.join(path, _tmp_name)
# if provided path has no dir then save name to cwd
elif os.path.dirname(path) == '': _p = os.path.join(os.getcwd(), path)
# else it is assumed to be a relative/complete path
else: _p = os.path.abspath(path)
# write file with added headers
with open(_p, 'w', newline='') as f:
csv.writer(f).writerows(
[['Label', 'Timestamp', 'Message']] +
(self.master_log if master else self.log))
# notify and return
print(f"{self.__class__.__name__} current "
f"{'parent' if master else 'instance'}'s "
f"runtime log saved to: \n{_p}")
return _p
# ---------------- example ----------------
class TestClass_1(AutomatedLoggingParent):
def __init__(self, *a, **k): super().__init__()
def __call__(self, example_func, *a, **k):
example_func(a)
self.testfunc_1(a)
return a
def testfunc_1(self, *a):
self.log = 'MyOwnLabel_1', 123
return a
@staticmethod
def staticmethod(*a):
return f"static inputs: {a}"
@classmethod
def classmethod(cls, *a):
return f"class inputs: {cls} : {a}"
class TestClass_2(AutomatedLoggingParent):
def __init__(self, *a, **k):
super().__init__()
self.other_log_child = TestClass_1(456, some_kwarg='parameter')
self.other_log_child.staticmethod('static arg 1')
self.other_log_child.classmethod('class arg 1')
def __call__(self, *a, **k):
self.testfunc_2(789)
self.other_log_child(self.testfunc_2, efgh='ijkl')
return a
def testfunc_2(self, *a):
self.log = 'MyOwnLabel_2', 'abcd'
self.log = 'bespoke message here'
return a
class TestClass_3(AutomatedLoggingParent):
def __init__(self):
super().__init__()
self.log = 'MASTER TEST', 'custom message for master log'
logging_test = TestClass_2()
logging_test()
TestClass_3()
print(f"Current log is as follows: \n{logging_test.log}")
log_path = logging_test.save_log()
print(f"Master log is as follows: \n{logging_test.master_log}")
master_log_path = logging_test.save_log(master=True)
os.startfile(log_path)
csv_get
import csv
import os
import numpy as np
def csv_get(*csv_paths: str | list[str] | list,
lck: bool = False,
dne_crash: bool = False,
only_str: bool = False,
):
"""
Read in list of csv paths to list of corresponding dict
which are composed of potentially ragged value lengths.
CSV headers (row 0) used for key assignment. CSV can also be
read in as string
** ragged csvs are read in to the shortest element if only_str is False **
:param csv_paths: csv paths as str|path or arbitrary iterables of str|path
:param lck: enforce all keys to be lowercase
:param dne_crash: ends program if csv does not exist, or continues
:param only_str: if True, will only return read in csvs a big string
:return: path data, either list of dicts of array float|str, or str list
"""
# define internal help functions
def __flattener(_in) -> list:
"""
Internal helper function, flattens arbitrarily neste iterables
:param _in: Any level of arbitrarily nested lists/tuples/sets
:return: A flattened list
"""
# verify that incoming object is of desired type, else return it
if not isinstance(_in, (list, set, tuple, np.ndarray)):
return [_in]
# initiate the outgoing list and loop through elements of incoming list
__out = []
for _i in _in:
# make recursive call if iterable else append item to __out
__out += flattener(_i) if (
isinstance(_i, (list, set, tuple, np.ndarray))
) else [_i]
# return flattened list
return __out
def __arr_get(_in):
"""
Internal helper function, converts object to float or string array
:param _in: iterable object
:return: float or string array
"""
# attempt to return numerical array else string array
try: return np.squeeze(np.array(_in, dtype=float))
except: return np.squeeze(np.array(_in, dtype=str))
# initialize list of outgoing csv data and loop through paths
_out = []
for i in __flattener(csv_paths):
# for files that do not exist, crash or pass accordingly
if not os.path.isfile(str(i)):
print(_m := f"attempted to read csv file, but path not found: {i}")
if dne_crash: raise Exception(_m)
else: continue
# read in data
with open(i, newline='') as f: reader = list(map(list, csv.reader(f)))
# return in desired structure, string or dict of arrays
if only_str: _out.append(reader)
else:
_out.append({str(k) if not lck else str(k).lower(): __arr_get(v)
for k, v in
zip(reader.pop(0), map(list, zip(*reader)))})
# return list of read in csv data
return _out
# ---------------- example ----------------
# create 2 random csv paths
csvpath_1 = os.path.join(os.getcwd(), f"csvExample_{[hash(os.urandom(9))]*4}.csv")
csvpath_2 = os.path.join(os.getcwd(), f"csvExample_{[hash(os.urandom(9))]*4}.csv")
# create 2 random csv datasets of floats and strings
csvdata_1 = [['numbers_1', *[int.from_bytes(os.urandom(1)) for _ in range(22)]],
['strings_1', *[chr(33 + (int.from_bytes(os.urandom(1)) % 77)) for _ in range(22)]]]
csvdata_2 = [['numbers_2', *[int.from_bytes(os.urandom(1)) for _ in range(66)]],
['strings_2', *[chr(33 + (int.from_bytes(os.urandom(1)) % 77)) for _ in range(66)]]]
# write these datasets to these paths
with open(csvpath_1, 'w+', newline='') as csvf:
csv.writer(csvf).writerows(map(list, zip(*csvdata_1)))
with open(csvpath_2, 'w+', newline='') as csvf:
csv.writer(csvf).writerows(map(list, zip(*csvdata_2)))
# read them back in
csvdata_read = csv_get(csvpath_1, csvpath_2)
print(f"CSV data from {csvpath_1}: \n\n {csvdata_read[0]} \n\n\n"
f"CSV data from {csvpath_2}: \n\n {csvdata_read[1]} \n\n\n")
# remove random data
os.remove(csvpath_1), os.remove(csvpath_2)
csv_to_hfd5
* NOTE* this function has two additional dependencies: (1) the csv_get function found on this page, and (2) the third party h5py package
import csv
import os
import h5py
import numpy as np
def csv_to_hfd5(*paths_in: str | list[str] | list,
paths_out: None | list[str] = None,
dir_out: str = None,
) -> list[str]:
"""
Copies data from csv files into HDF5 files, assumes headers as keys
:param paths_in: csv paths as str|path or arbitrary iterables of str|path
:param paths_out: (optional) list of outgoing relative paths or filenames
:param dir_out: (optional) str of root directory of outgoing paths
:return: list of paths to the respective hdf5 files
"""
# define internal help functions
def __flattener(_in) -> list:
"""
Internal helper function, flattens arbitrarily nested iterables
:param _in: Any level of arbitrarily nested lists/tuples/sets
:return: A flattened list
"""
# verify that incoming object is of desired type, else return it
if not isinstance(_in, (list, set, tuple, np.ndarray)):
return [_in]
# initiate the outgoing list and loop through elements of incoming list
__out = []
for _i in _in:
# make recursive call if iterable else append item to __out
__out += flattener(_i) if (
isinstance(_i, (list, set, tuple, np.ndarray))
) else [_i]
# return flattened list
return __out
# read all csv data into list
_paths_in = __flattener(paths_in)
data_in = csv_get(_paths_in, dne_crash=True)
# make outgoing filenames, depending on paths_out and dir_out permutations
if dir_out is None:
_out = [os.path.splitext(i)[0] + '.hdf5' for i in _paths_in
] if (paths_out is None) else ([
os.path.join(os.path.split(i)[0], j) for i, j in zip(
_paths_in, paths_out)])
else:
_out = [os.path.join(
dir_out, os.path.splitext(os.path.basename(i))[0] + '.hdf5')
for i in _paths_in] if (paths_out is None) else ([
os.path.join(dir_out, i) for i in paths_out])
# loop through all datasets and columns
for i, j in zip(data_in, _out):
with h5py.File(j, 'w') as f:
for k, l in i.items(): f.create_dataset(k, data=l)
# return the hdf5 paths
return _out
dict_filter
import operator
import numpy as np
def dict_filter(dict_in: dict[str, np.ndarray],
operations: list[list[object, callable, object]]
) -> dict[str, np.ndarray]:
"""
Quick way to filter all values of all keys of a dictionary based on
applying any number of operators on the same number of keys. Dict
values assumed to be equilength 1D numpy arrays, operators assumed to be
operator objects, and values assumed to be of the same type as array
that is being operated on. Returns copy of the filtered dict.
:param dict_in: dict of equilength 1D numpy arrays
:param operations: list of list of operations, each sublist has 3 parts:
[key, operator, value]
example format for filtering down to where key 'a' is above 1
and key 'b' is equal to 'c' would be:
[['a', operator.gt, 1], ['b', operator.eq, 'c'], ]
:return: copy of filtered dict
"""
# build a mask for each key operation
single_masks = [
np.sum(
[i[1](dict_in[i[0]], j) for j in i[2]],
axis=0).astype(bool) if
type(i[2]) == list else
i[1](dict_in[i[0]], i[2]) for i in operations
]
# merge all single masks into a mask across all keys
mask_all = np.prod(np.vstack(single_masks), axis=0).astype(bool)
# apply the mask to all keys and return
return {k: v for k, v in zip(
dict_in.keys(),
map(lambda x: np.array(x)[mask_all],
map(lambda x: dict_in[x], dict_in.keys())))}
# ---------------- example ----------------
foo = {'a': np.arange(5), 'b': np.array(['c', 'c', 'c', 'c', 'z']), }
bar = [['a', operator.gt, 1], ['b', operator.eq, 'c'], ]
print(dict_filter(foo, bar))
dict_transform
def dict_transform(dict_in: dict,
user_func: callable,
keys_out: list[str] | None = None,
) -> dict:
"""
Applies a user defined function the (optionally) user provided keys
of a dict. If the keys are not specified then function is applied over
all keys. Returns a copy of all the used keys in a single dict.
:param dict_in: dictionary to apply function to
:param user_func: function which to apply
:param keys_out: optional, apply func_in to this list of keys, else all
:return: dict_in copy, only containing keys to which function was applied
"""
# check if there are outgoing keys provided, else use all incoming keys
keys_out = dict_in.keys() if keys_out is None else keys_out
# return specific keys with function applied
return {k: v for k, v in zip(
keys_out,
map(user_func,
map(lambda x: dict_in[x], keys_out)))}
# ---------------- example ----------------
foo = {'a': 'to me', 'b': 'to you', 'c': 'world', }
bar = lambda x: f"Hello {x}!!"
print(dict_transform(foo, bar, ['b', 'c']))
flattener
import collections.abc
def flattener(in_: collections.abc.Iterable,
type_tuple: tuple[type] = (list, set, tuple,)) -> list:
"""
Recursive function provides ordered flattening of arbitrarily
nested permutations of user defined iterables into a single
unnested outgoing list.
:param in_: Any level of arbitrarily nested iterable objects
:param type_tuple: a tuple of types that should be unpacked
:return: A flattened list
"""
# verify that incoming object is of desired type, else return it
if not isinstance(in_, type_tuple): return [in_]
# initiate the outgoing list and loop through elements of incoming list
_out = []
# make recursive call to flatten
for i in in_: _out += flattener(i) if (isinstance(i, type_tuple)) else [i]
# return flattened list
return _out
get_arr_size
import sys
import numpy as np
def get_arr_size(x: np.ndarray, disp: bool = True) -> int:
"""
Quick way to see the amount of memory a np array is using
:param x: array in question
:param disp: bool to print the array size or not
:return: int of array size in bytes
"""
# ensure it's a np array
assert(isinstance(x, np.ndarray))
# get byte size from sys, or from the array attrs if sys isn't imported
try:
bsize = sys.getsizeof(x)
except NameError:
# add 128 bytes for the np array structure overhead
bsize = x.nbytes+128
if disp:
# get magnitude and prefixes
mag = int((len(str(round(bsize))) - 1)/3)
pre = ['', 'K', 'M', 'G', 'T']
print(f"Array is {round(bsize/(10**(mag*3)), 2)} {pre[mag]}b")
return bsize
get_multithread_chunksize
# -- std
import collections.abc
import functools
import multiprocessing
import os
import time
import threading
# -- third party
import psutil
def get_multithread_chunksize(
func: collections.abc.Callable,
iterable: collections.abc.Iterable,
loops: int = 3,
base: int = 5,
step: int = 30,
refine: int = 4,
smooth: int = 3,
disp: int = 1,
poolobj: str = "multiprocessing.Pool(os.cpu_count()-4)",
poolattr: str = "map",
is_main: bool = True,
fixed_iter: collections.abc.Iterable | None = None) -> int:
"""
When running multithreaded/multiprocess codes, getting the optimal
"chunksize" can be the difference between extremely fast runtimes or
code that may take years to complete. If chunksize is not set as the
input argument then most 'pool.map' process/threaded like objects
use a heuristic to set this variable. However, this is often just
'good enough', but never optimal. This function empirically tests
what the optimal chunksize is at this current moment; this can
fluctuate wildly depending on, architecture, how much compute
resource is tied up, and other system settings. With this function,
one can empirically test what the best chunksize is for their
current application at this current moment. Ideally a subset of data
is fed to the target function, and its runtimes are measured. This
function performs a refined grid-search to find the optimal chunksize.
Returns an int that is intended to be given to the intended
map-like object in the forma of (return x) --> .map(...,chunksize=x).
* NOTE: CPU UTILIZATION is measured as the difference of
* sleeping cpus versus cpus during runtime. If some
* background action was occurring when the sleep
* utilization was measured then it is possible that the
* cpu utilization values could be negative, in which case
* they can safely be ignored since the sleep measurement
* that they are being compared against is not correct.
:param func: The function that is being provided to the map
:param iterable: The iterable that is being provided to the map; too many
iterable elements will increase this functions run time, too few
iterable elements may not capture the correct optimal chunksize
if the map is not being fully utilized
:param loops: How many times to repeat the testing, more loops
increases the runtime of this function but may yield more
optimal results
:param base: The minimum chunksize to start with
:param step: The coarsest step size to start with, this will be
linearly decreased as the grid search gets refined. If step
is too small then a local minima may be returned instead of a
global optima, if step is too large then it may widely step
over the global optima
:param refine: How many times the grid-search should be refined into
a smaller targeted area. Too many refinements may not prove useful
as they just sweep the chunksize by 1 and needlessly increase
runtime. Too few refinements may not prove useful to actually
converging on the global optima
:param smooth: How may chunksize results the grid-search should average
over to determine when its converged. Setting smooth too small will
only consider a tiny amount of the most recent grid-search results
and some amount of accidental noise may force it to converge on
noise, Setting smooth too large will increase the runtime of the
function because it will be considering a larger amount of points
before it has determined convergence and thus requires more testing
:param disp: int:[0, 1, 2], level of messages to print to console
0 = print no messages to console
1 = print only headlines to console
2 = print runtime measurements and headlines to console
:param poolobj: String of the multiprocess/multithread pool object. This
will be directly initialized with an eval statement: eval(poolobj).
Example 1: "multiprocessing.Pool(os.cpu_count()-3)"
Example 2: "multiprocessing.pool.ThreadPool(4)"
Example 3: "multiprocessing.Pool(processes=None, initializer=None)"
:param poolattr: String of the multiprocess/multithread pool object
attribute that will be executing the runtime. Must be attr of poolobj.
Examples: "map", "imap_unordered", "map_async", "starmap"
:param is_main: bool, if True it will run the function within __main__,
if False will run outside of __main__
:param fixed_iter: If there are other fixed iterables that need to
be fed to the func, they can set here. If iterables are provided
to this argument they are initialized with a partial function
which will set these as the first positional inputs and the
iterable provided in the iter argument will be given as the
last position. Note that tuples and lists are assumed to contain
multiple input arguements and are unpacked, if you want to give just
one argument "a" that is a list as input, set this to fixed_iter=[a].
:return: int: optimal chunksize
"""
# ensure that this function is not recursively called by multiple processes
if is_main:
if __name__ == "__main__": pass
else:
print('get_multithread_chunksize not running outside "__main__", '
'if it should then set the "is_main" argument to False; '
'if you see this message multiple times it is likely that '
'you called the get_multithread_chunksize in a global '
'context, this is a safeguard against that.')
return None
# enforce strong type checking
assert callable(func)
assert isinstance(iterable, collections.abc.Iterable)
for i in (loops, base, step, refine, smooth, disp):
assert isinstance(i, int)
assert disp in [0, 1, 2]
for i in (poolobj, poolattr): assert isinstance(i, str)
assert (isinstance(fixed_iter, collections.abc.Iterable) or
fixed_iter is None)
# define helper functions for mean and measuring resource utilization
def __mean(x): return sum(x)/len(x)
def __resource(_event, _cpu, _ram):
if "psutil" in globals().keys():
while not _event.is_set():
_cpu += [psutil.cpu_percent()]
_ram += [psutil.virtual_memory().percent]
time.sleep(0.1)
else:
_cpu += [0, 0]
_ram += [0, 0]
# measure background utilization to compare to runtime utilization
# initialize mutables (_cpu, _ram) here so they don't return
_cpu, _ram = [], []
_e = threading.Event()
_t = threading.Thread(target=__resource, args=(_e, _cpu, _ram))
_t.start(); time.sleep(1); _e.set(); _t.join()
_bg_cpu, _bg_ram = round(__mean(_cpu[1:]), 2), round(__mean(_ram), 2)
if disp > 1: print(f"background resource usage :: background memory "
f"utilization is {_bg_ram}%, background cpu "
f"utilization is {_bg_cpu}%")
# if there are some iters that are fixed then init them as a wrapper
if fixed_iter is None: _f = func
else: _f = functools.partial(func, *fixed_iter)
# initialize pool before running tests so test times do not include init
with eval(poolobj) as p:
# verify pool object has the requested attribute, and set it
if poolattr not in dir(p):
raise Exception(f"pool object (poolobj): {p} does not have "
f"the requested pool attribute (poolattr): "
f"'{poolattr}'. \nHere is a selection of "
f"attributes that the provided poolobj has: "
f"{[i for i in dir(p) if not i.startswith('_')]}")
else: mapper = getattr(p, poolattr)
# get function name
try: __fn = func.__name__
except AttributeError: __fn = "'func'"
if disp > 0: print(f"{time.strftime('%H:%M:%S', time.localtime())} ::"
f" Starting chunksize testing on :: "
f"{poolobj}.{poolattr}({__fn}, chunksize = ?, ...)")
best_chunks = []
for loop_num in range(loops):
# initialize runtime base to provided one
rt_base = base + 1
for i in range(1, refine+1):
# initialize chunk testing variables
chunk_list, _cpu_utilization, _ram_utilization = [], [], []
step_size, _rts, k = int(step/i) + 1, [9e9] * (smooth + 1), 0
rt_means = [__mean(_rts[:smooth]), __mean(_rts[smooth:])]
if disp > 1: print(f"--base is {rt_base}; step is {step_size}")
# continue to run while the mean is the same or decreasing
while (rt_means[-1] - rt_means[-2]) <= 0:
# calculate chunksize to test and record it
tmp_chunk = rt_base + (k * step_size)
chunk_list += [tmp_chunk]
# begin measuring cpu and ram usage on separate thread,
# initialize mutables (_cpu, _ram) here so they don't
# return
_cpu, _ram = [], []
_e = threading.Event()
_t = threading.Thread(
target=__resource, args=(_e, _cpu, _ram))
_t.start()
# start timing and run mapper
_st = time.time()
list(mapper(
func=_f, iterable=iterable, chunksize=tmp_chunk))
_runtime = time.time() - _st
# stop measuring cpu and ram usage on separate thread
_e.set(); _t.join()
# record times & resources and update running mean
_rts += [_runtime]
rt_means += [__mean(_rts[-smooth:])]
_cpu_utilization += [__mean(_cpu[1:]) - _bg_cpu]
_ram_utilization += [__mean(_ram) - _bg_ram]
k += 1
if disp > 1: print(
f"chunksize is {tmp_chunk}, "
f"runtime is {round(_runtime, 2)}s, "
f"memory util is {round(_ram_utilization[-1], 2)}%, "
f"cpu util is {round(_cpu_utilization[-1], 2)}%")
# update the base for the next loops grid-search refinement
rt_base = chunk_list[_rts.index(min(_rts)) - 2 * (smooth + 1)]
# retrieve best results
_best_ind = _rts.index(min(_rts)) - (smooth + 1)
_best_size = chunk_list[_best_ind]
_best_cpu_util = round(_cpu_utilization[_best_ind], 2)
_best_ram_util = round(_ram_utilization[_best_ind], 2)
if disp > 0: print((f"{time.strftime('%H:%M:%S', time.localtime())}"
f" :: Optimal chunksize for loop {loop_num+1} "
f"of {loops} is {_best_size}; it utilized "
f"roughly {_best_cpu_util}% of all available "
f"cpu resources, and {_best_ram_util}% of all "
f"available memory resources").replace(
" 0.0% ", " N/A "))
# record the best chunk of the current outer loop iteration
best_chunks += [_best_size]
# take the mean of the best chunksizes
optimal_chunk = int(__mean(best_chunks))
if disp > 0: print(f"{time.strftime('%H:%M:%S', time.localtime())} :: "
f"Optimal chunksize found to be {optimal_chunk} --> \n"
f"{poolobj}.{poolattr}({__fn}, iterable, chunksize = "
f"{optimal_chunk})")
return optimal_chunk
# ---------------------------------- EXAMPLE ----------------------------------
# example test, randomly generate a test array with 50**3 random rows
# and randomly generate 100 rows. Test to see if any of the 50*50*50 rows
# is in the randomly generated 100 rows
import numpy as np
test_rows = np.array(np.random.rand(50**3, 5)*1e2, dtype=int)
ref_rows = [i.tolist() for i in
np.array(np.random.rand(100, test_rows.shape[-1])*1e2, dtype=int)]
# (e)xample (f)unc for pool map
def _ef(reference_rows, test_row):
if test_row.tolist() in reference_rows:
print("FOUND TEST ROW IN SET OF REFERENCE ROWS")
# run test
if __name__ == '__main__':
get_multithread_chunksize(_ef, test_rows, fixed_iter=[ref_rows]); quit()
gts
import time
def gts() -> str:
"""
get current timezone timestamp
:return: a string of numbers from yyyy to s (underscore) ns
"""
return (f"{time.strftime('%Y%M%d%H%M%S', time.localtime())}_"
f"{str(time.time()).split('.')[1]}")
# ---------------- example ----------------
print(gts())
hfd5_read
* NOTE* this function has one additional dependency: the third party h5py package
import csv
import os
import h5py
import numpy as np
def hfd5_read(*paths_in: str | list[str] | list,
dne_crash: bool = False,
) -> list[dict[str|np.ndarray]]:
"""
Reads in data from HDF5 files
:param paths_in: hdf5 paths as str|path or arbitrary iterables of str|path
:param dne_crash: ends program if path does not exist, or continues
:return: list of paths to the respective hdf5 files
"""
# define internal help functions
def __flattener(_in) -> list:
"""
Internal helper function, flattens arbitrarily nested iterables
:param _in: Any level of arbitrarily nested lists/tuples/sets
:return: A flattened list
"""
# verify that incoming object is of desired type, else return it
if not isinstance(_in, (list, set, tuple, np.ndarray)):
return [_in]
# initiate the outgoing list and loop through elements of incoming list
__out = []
for _i in _in:
# make recursive call if iterable else append item to __out
__out += flattener(_i) if (
isinstance(_i, (list, set, tuple, np.ndarray))
) else [_i]
# return flattened list
return __out
# pull out paths and verify they all exist, raise exception if user wants
_paths = __flattener(paths_in)
if (not all(os.path.isfile(i) for i in _paths)) and dne_crash:
raise Exception("Hdf5 file(s) provided for reading, but not found")
# initialize list of outgoing data and loop through hdf5 paths
_out = []
for i in _paths:
# verify the current path exists
if not os.path.isfile(i): continue
# read into outgoing object
_tmp_dict = {}
with h5py.File(i, "r") as f:
# move all key's values from hdf5 object to _out
for j in f.keys():
_tmp_dict[j] = f[j][()]
_out.append(_tmp_dict)
# return read in hdf5 data
return _out
strecord
import csv
import json
import os
import time
global_status_path = os.path.join(
os.path.expanduser("~"),
"Desktop",
f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.txt")
def strecord(*m: any, disp: bool = True, new_path: None | str = None) -> None:
"""
Quick way to show status messages and write them out to a csv/json/text file
:param m: status messages to be appended into file, multiple messages
will become comma separated
:param disp: bool, if status message should be printed to console
:param new_path: optional path to write to, global used if not provided
:return: None
"""
# cache the path and update it if needed
if new_path or "pcache" not in dir(strecord):
if new_path: setattr(strecord, "pcache", new_path)
else:
global global_status_path
setattr(strecord, "pcache", global_status_path)
# parse out message
_m = m if isinstance(m, str) else ", ".join(str(i) for i in m)
if bool(disp): print(_m)
file_type = os.path.splitext(strecord.pcache)[-1].lower()
_ts = time.strftime('%Y%M%d_%H%M%S', time.localtime())
# infer file type and write message with timestamp and linebreaks
with open(strecord.pcache, mode='+at') as f:
if 'json' in file_type: json.dump({_ts: _m}, f, indent=1)
elif 'csv' in file_type: csv.writer(f).writerow((_ts,
*[str(i) for i in m]))
else: f.write(f"{_ts} :: {_m}\n\n")
# ---------------------------------- EXAMPLE ----------------------------------
# save to csv
new_path_1 = os.path.join(
os.path.expanduser("~"),
"Desktop",
f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.csv")
# save to json
new_path_2 = os.path.join(
os.path.expanduser("~"),
"Desktop",
f"Runtime_status_{time.strftime('%Y%M%d_%H%M%S', time.localtime())}.json")
for i in None, new_path_1, new_path_2:
strecord("some words", "test1", new_path=i)
strecord("some words", "test2")
strecord("some words", "test3")
#tech