290 lines
8.1 KiB
Python
290 lines
8.1 KiB
Python
from contextlib import contextmanager
|
|
import ics
|
|
import time
|
|
import argparse
|
|
|
|
|
|
def serial_base36enc(ser_no):
|
|
"""
|
|
Encode serial as base36 if needed and return the string representation of the serial number
|
|
|
|
Args:
|
|
ser_no: Serial number integer
|
|
"""
|
|
if int("AA0000", 36) < ser_no < int("ZZZZZZ", 36):
|
|
return ics.base36enc(ser_no)
|
|
else:
|
|
return str(ser_no) # Old devices don't do base36
|
|
|
|
|
|
def serial_base36dec(ser_no):
|
|
"""
|
|
Decode serial as base36 if needed and return the integer representation of the serial number
|
|
|
|
Args:
|
|
ser_no: Serial number string
|
|
"""
|
|
serial36 = int(ser_no, 36)
|
|
if int("AA0000", 36) < serial36 < int("ZZZZZZ", 36):
|
|
return serial36
|
|
else:
|
|
return int(ser_no, 10) # Old devices don't do base36
|
|
|
|
|
|
def filter_by_netid(msgs, netid):
|
|
"""
|
|
Filters the list of messages by netid
|
|
|
|
Args:
|
|
msgs: A list of spy messages, likely from ics.get_messages
|
|
netid: The desired netid
|
|
|
|
Returns:
|
|
The filtered message list
|
|
|
|
"""
|
|
netids = [netid]
|
|
return filter_by_netids(msgs, netids)
|
|
|
|
|
|
def filter_by_netids(msgs, netids):
|
|
"""
|
|
Filters the provided messages by those that match the provided netids
|
|
|
|
Args:
|
|
msgs: The messages to filter
|
|
netids: The valid netids to keep
|
|
|
|
Returns:
|
|
All messages with the provided netids
|
|
|
|
"""
|
|
ret = [msg for msg in msgs if get_netid(msg) in netids]
|
|
return ret
|
|
|
|
|
|
def get_netid(msg):
|
|
"""
|
|
Gets the netid for the message
|
|
|
|
Args:
|
|
msg: The message to check
|
|
|
|
Returns:
|
|
The netid (the number) for the message
|
|
|
|
"""
|
|
netid = (msg.NetworkID2 << 8) | (msg.NetworkID & 0xFF)
|
|
return netid
|
|
|
|
|
|
@contextmanager
|
|
def open_device(ser_no, tries=10, delay=1.0):
|
|
"""
|
|
Context manager for a neovi device. Opens the device, then auto-closes
|
|
once the context manager falls out of scope
|
|
|
|
Args:
|
|
ser_no: Serial number string of a connected device, will be validated
|
|
|
|
Yields:
|
|
an open device
|
|
|
|
Examples:
|
|
with open("GS0137") as device:
|
|
ics_do_stuff(device)
|
|
|
|
"""
|
|
device = None
|
|
serial = serial_base36dec(ser_no)
|
|
|
|
for i in range(tries):
|
|
try:
|
|
found = False
|
|
# work around for supporting neovi server connections
|
|
# ics.open_device will not work if already open in Vspy with server
|
|
devices = ics.find_devices()
|
|
for d in devices:
|
|
if d.SerialNumber == serial:
|
|
device = ics.open_device(d)
|
|
found = True
|
|
break
|
|
if found:
|
|
# successfully opened
|
|
break
|
|
else:
|
|
raise Exception(f"Could not find device to open {ser_no}")
|
|
except Exception:
|
|
device = None
|
|
print(f"Failed to Open {ser_no}, Trying again... ({i+1}/{tries})")
|
|
time.sleep(delay)
|
|
|
|
if device is None:
|
|
# could not find device and multiple retries
|
|
devices = ics.find_devices()
|
|
print("ERROR: Device not found. Known devices are:")
|
|
print([serial_base36enc(dev.SerialNumber) for dev in devices])
|
|
exit(1)
|
|
|
|
try:
|
|
yield device
|
|
except Exception as e:
|
|
print("ERROR: Open device succeeded, but yielding failed?")
|
|
raise e
|
|
finally:
|
|
if device is not None:
|
|
ics.close_device(device)
|
|
|
|
|
|
def get_hwnetid(name):
|
|
"""
|
|
Get hardware network ID from string name in one of the following forms:
|
|
COREMINI_NETWORK_ETHERNET, NETID_ETHERNET, or ETHERNET
|
|
|
|
Args:
|
|
name: Network name
|
|
|
|
Returns:
|
|
Hardware network ID
|
|
"""
|
|
# NETID_XXX, use as is
|
|
if name.startswith("NETID_"):
|
|
if hasattr(ics, name):
|
|
return getattr(ics, name)
|
|
|
|
# COREMINI_NETWORK_XXX, convert
|
|
if name.startswith("COREMINI_NETWORK_"):
|
|
newname = "NETID_" + name[len("COREMINI_NETWORK_") :]
|
|
if hasattr(ics, newname):
|
|
return getattr(ics, newname)
|
|
|
|
# assume no prefix, so try adding it
|
|
newname = "NETID_" + name
|
|
if hasattr(ics, newname):
|
|
return getattr(ics, newname)
|
|
|
|
raise AttributeError(f"Could not match network for {name}")
|
|
|
|
|
|
class MyArgParseHelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
|
|
def _split_lines(self, text, width):
|
|
if text.startswith("R|"):
|
|
lines = text.splitlines()[1:]
|
|
return lines
|
|
return super()._split_lines(text, width)
|
|
|
|
|
|
def set_process_priority(pid=None, priority=2):
|
|
"""
|
|
Set The Priority of a Process. Priority is a value between 0-5 where
|
|
2 is normal priority, 5 is highest priority. Default sets the priority of the current
|
|
python process but can take any valid process ID.
|
|
"""
|
|
import sys
|
|
|
|
try:
|
|
sys.getwindowsversion()
|
|
except AttributeError:
|
|
isWindows = False
|
|
else:
|
|
isWindows = True
|
|
|
|
if isWindows:
|
|
# Based on:
|
|
# "Recipe 496767: Set Process Priority In Windows" on ActiveState
|
|
# http://code.activestate.com/recipes/496767/
|
|
import win32api
|
|
import win32process
|
|
import win32con
|
|
|
|
priorityclasses = [
|
|
win32process.IDLE_PRIORITY_CLASS,
|
|
win32process.BELOW_NORMAL_PRIORITY_CLASS,
|
|
win32process.NORMAL_PRIORITY_CLASS,
|
|
win32process.ABOVE_NORMAL_PRIORITY_CLASS,
|
|
win32process.HIGH_PRIORITY_CLASS,
|
|
win32process.REALTIME_PRIORITY_CLASS,
|
|
]
|
|
if pid is None:
|
|
pid = win32api.GetCurrentProcessId()
|
|
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
|
|
win32process.SetPriorityClass(handle, priorityclasses[priority])
|
|
else:
|
|
# untested!
|
|
import os
|
|
|
|
nice_levels = [19, 9, 0, -7, -14, -20]
|
|
os.nice(nice_levels[priority])
|
|
|
|
|
|
def dict_align_str(d, sort=False, nz=False):
|
|
"""
|
|
Prints a dict nicely and key value pairs in aligned rows
|
|
Primarily limited to simple "a": "b" dicts, might not look nice for complex value types
|
|
|
|
Args:
|
|
d: Some dictionary
|
|
sort: Sort dictionary items
|
|
nz: Excludes key value pairs with int value = 0
|
|
|
|
Returns:
|
|
A string that looks like this:
|
|
aardvark: banana
|
|
monkey: apple
|
|
ant: pear
|
|
"""
|
|
strings = []
|
|
longest_key_len = len(max(d.keys(), key=len))
|
|
if sort:
|
|
d = dict(sorted(d.items()))
|
|
for k, v in d.items():
|
|
key_len = len(k)
|
|
diff = longest_key_len - key_len
|
|
spaces = " " * diff
|
|
string = f"{k}:{spaces} {v}"
|
|
if nz:
|
|
if not isinstance(v, int) or v > 0:
|
|
strings.append(string)
|
|
else:
|
|
strings.append(string)
|
|
ret = "\n".join(strings) + "\n"
|
|
return ret
|
|
|
|
|
|
# Print iterations progress
|
|
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-terminal-with-block-characters
|
|
def print_progress_bar(
|
|
iteration,
|
|
total,
|
|
prefix="",
|
|
suffix="",
|
|
decimals=1,
|
|
length=100,
|
|
fill="█",
|
|
printEnd="\r",
|
|
start=None,
|
|
):
|
|
"""
|
|
Call in a loop to create terminal progress bar
|
|
@params:
|
|
iteration - Required : current iteration (Int)
|
|
total - Required : total iterations (Int)
|
|
prefix - Optional : prefix string (Str)
|
|
suffix - Optional : suffix string (Str)
|
|
decimals - Optional : positive number of decimals in percent complete (Int)
|
|
length - Optional : character length of bar (Int)
|
|
fill - Optional : bar fill character (Str)
|
|
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
|
|
"""
|
|
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
|
|
filledLength = int(length * iteration // total)
|
|
bar = fill * filledLength + "-" * (length - filledLength)
|
|
if iteration == total:
|
|
if start is not None:
|
|
timestr = str(timedelta(seconds=time.time() - start))
|
|
suffix = f"{suffix} -- {timestr}"
|
|
print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd)
|
|
# Print New Line on Complete
|
|
if iteration == total:
|
|
print()
|