Demo_MACsec/MACsec_Utilitie/util.py

290 lines
8.1 KiB
Python

from contextlib import contextmanager
import ics
import time
import argparse
def serial_base36enc(ser_no):
"""
Encode serial as base36 if needed and return the string representation of the serial number
Args:
ser_no: Serial number integer
"""
if int("AA0000", 36) < ser_no < int("ZZZZZZ", 36):
return ics.base36enc(ser_no)
else:
return str(ser_no) # Old devices don't do base36
def serial_base36dec(ser_no):
"""
Decode serial as base36 if needed and return the integer representation of the serial number
Args:
ser_no: Serial number string
"""
serial36 = int(ser_no, 36)
if int("AA0000", 36) < serial36 < int("ZZZZZZ", 36):
return serial36
else:
return int(ser_no, 10) # Old devices don't do base36
def filter_by_netid(msgs, netid):
"""
Filters the list of messages by netid
Args:
msgs: A list of spy messages, likely from ics.get_messages
netid: The desired netid
Returns:
The filtered message list
"""
netids = [netid]
return filter_by_netids(msgs, netids)
def filter_by_netids(msgs, netids):
"""
Filters the provided messages by those that match the provided netids
Args:
msgs: The messages to filter
netids: The valid netids to keep
Returns:
All messages with the provided netids
"""
ret = [msg for msg in msgs if get_netid(msg) in netids]
return ret
def get_netid(msg):
"""
Gets the netid for the message
Args:
msg: The message to check
Returns:
The netid (the number) for the message
"""
netid = (msg.NetworkID2 << 8) | (msg.NetworkID & 0xFF)
return netid
@contextmanager
def open_device(ser_no, tries=10, delay=1.0):
"""
Context manager for a neovi device. Opens the device, then auto-closes
once the context manager falls out of scope
Args:
ser_no: Serial number string of a connected device, will be validated
Yields:
an open device
Examples:
with open("GS0137") as device:
ics_do_stuff(device)
"""
device = None
serial = serial_base36dec(ser_no)
for i in range(tries):
try:
found = False
# work around for supporting neovi server connections
# ics.open_device will not work if already open in Vspy with server
devices = ics.find_devices()
for d in devices:
if d.SerialNumber == serial:
device = ics.open_device(d)
found = True
break
if found:
# successfully opened
break
else:
raise Exception(f"Could not find device to open {ser_no}")
except Exception:
device = None
print(f"Failed to Open {ser_no}, Trying again... ({i+1}/{tries})")
time.sleep(delay)
if device is None:
# could not find device and multiple retries
devices = ics.find_devices()
print("ERROR: Device not found. Known devices are:")
print([serial_base36enc(dev.SerialNumber) for dev in devices])
exit(1)
try:
yield device
except Exception as e:
print("ERROR: Open device succeeded, but yielding failed?")
raise e
finally:
if device is not None:
ics.close_device(device)
def get_hwnetid(name):
"""
Get hardware network ID from string name in one of the following forms:
COREMINI_NETWORK_ETHERNET, NETID_ETHERNET, or ETHERNET
Args:
name: Network name
Returns:
Hardware network ID
"""
# NETID_XXX, use as is
if name.startswith("NETID_"):
if hasattr(ics, name):
return getattr(ics, name)
# COREMINI_NETWORK_XXX, convert
if name.startswith("COREMINI_NETWORK_"):
newname = "NETID_" + name[len("COREMINI_NETWORK_") :]
if hasattr(ics, newname):
return getattr(ics, newname)
# assume no prefix, so try adding it
newname = "NETID_" + name
if hasattr(ics, newname):
return getattr(ics, newname)
raise AttributeError(f"Could not match network for {name}")
class MyArgParseHelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
def _split_lines(self, text, width):
if text.startswith("R|"):
lines = text.splitlines()[1:]
return lines
return super()._split_lines(text, width)
def set_process_priority(pid=None, priority=2):
"""
Set The Priority of a Process. Priority is a value between 0-5 where
2 is normal priority, 5 is highest priority. Default sets the priority of the current
python process but can take any valid process ID.
"""
import sys
try:
sys.getwindowsversion()
except AttributeError:
isWindows = False
else:
isWindows = True
if isWindows:
# Based on:
# "Recipe 496767: Set Process Priority In Windows" on ActiveState
# http://code.activestate.com/recipes/496767/
import win32api
import win32process
import win32con
priorityclasses = [
win32process.IDLE_PRIORITY_CLASS,
win32process.BELOW_NORMAL_PRIORITY_CLASS,
win32process.NORMAL_PRIORITY_CLASS,
win32process.ABOVE_NORMAL_PRIORITY_CLASS,
win32process.HIGH_PRIORITY_CLASS,
win32process.REALTIME_PRIORITY_CLASS,
]
if pid is None:
pid = win32api.GetCurrentProcessId()
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
win32process.SetPriorityClass(handle, priorityclasses[priority])
else:
# untested!
import os
nice_levels = [19, 9, 0, -7, -14, -20]
os.nice(nice_levels[priority])
def dict_align_str(d, sort=False, nz=False):
"""
Prints a dict nicely and key value pairs in aligned rows
Primarily limited to simple "a": "b" dicts, might not look nice for complex value types
Args:
d: Some dictionary
sort: Sort dictionary items
nz: Excludes key value pairs with int value = 0
Returns:
A string that looks like this:
aardvark: banana
monkey: apple
ant: pear
"""
strings = []
longest_key_len = len(max(d.keys(), key=len))
if sort:
d = dict(sorted(d.items()))
for k, v in d.items():
key_len = len(k)
diff = longest_key_len - key_len
spaces = " " * diff
string = f"{k}:{spaces} {v}"
if nz:
if not isinstance(v, int) or v > 0:
strings.append(string)
else:
strings.append(string)
ret = "\n".join(strings) + "\n"
return ret
# Print iterations progress
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-terminal-with-block-characters
def print_progress_bar(
iteration,
total,
prefix="",
suffix="",
decimals=1,
length=100,
fill="",
printEnd="\r",
start=None,
):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + "-" * (length - filledLength)
if iteration == total:
if start is not None:
timestr = str(timedelta(seconds=time.time() - start))
suffix = f"{suffix} -- {timestr}"
print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd)
# Print New Line on Complete
if iteration == total:
print()