Skip to content
Snippets Groups Projects
Unverified Commit 9f42f45b authored by kmittman's avatar kmittman Committed by GitHub
Browse files

Merge pull request #13 from amilonenv/black-reformat

parse_redist.py: reformat according to PEP8 using Black
parents 6f28f6b4 18f7a003
Branches main
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ import tarfile
import zipfile
import sys
import requests
__version__ = "0.3.0"
ARCHIVES = {}
......@@ -37,214 +38,277 @@ VALIDATE = True
UNROLLED = True
COLLAPSE = True
def err(msg):
"""Print error message and exit"""
print("ERROR: " + msg)
sys.exit(1)
"""Print error message and exit"""
print("ERROR: " + msg)
sys.exit(1)
def fetch_file(full_path, filename):
"""Download file to disk"""
download = requests.get(full_path)
if download.status_code != 200:
print(" -> Failed: " + filename)
else:
print(":: Fetching: " + full_path)
with open(filename, "wb") as file:
file.write(download.content)
print(" -> Wrote: " + filename)
"""Download file to disk"""
download = requests.get(full_path)
if download.status_code != 200:
print(" -> Failed: " + filename)
else:
print(":: Fetching: " + full_path)
with open(filename, "wb") as file:
file.write(download.content)
print(" -> Wrote: " + filename)
def get_hash(filename):
"""Calculate SHA256 checksum for file"""
buffer_size = 65536
sha256 = hashlib.sha256()
with open(filename, "rb") as file:
while True:
chunk = file.read(buffer_size)
if not chunk:
break
sha256.update(chunk)
return sha256.hexdigest()
"""Calculate SHA256 checksum for file"""
buffer_size = 65536
sha256 = hashlib.sha256()
with open(filename, "rb") as file:
while True:
chunk = file.read(buffer_size)
if not chunk:
break
sha256.update(chunk)
return sha256.hexdigest()
def check_hash(filename, checksum):
"""Compare checksum with expected"""
sha256 = get_hash(filename)
if checksum == sha256:
print(" Verified sha256sum: " + sha256)
else:
print(" => Mismatch sha256sum:")
print(" -> Calculation: " + sha256)
print(" -> Expectation: " + checksum)
"""Compare checksum with expected"""
sha256 = get_hash(filename)
if checksum == sha256:
print(" Verified sha256sum: " + sha256)
else:
print(" => Mismatch sha256sum:")
print(" -> Calculation: " + sha256)
print(" -> Expectation: " + checksum)
def flatten_tree(src, dest, tag=None):
if tag:
dest += "/" + tag
if tag:
dest += "/" + tag
"""Merge hierarchy from multiple directories"""
try:
shutil.copytree(
src, dest, symlinks=1, dirs_exist_ok=1, ignore_dangling_symlinks=1
)
except FileExistsError:
pass
shutil.rmtree(src)
"""Merge hierarchy from multiple directories"""
try:
shutil.copytree(src, dest, symlinks=1, dirs_exist_ok=1, ignore_dangling_symlinks=1)
except FileExistsError:
pass
shutil.rmtree(src)
def parse_artifact(parent, MANIFEST, component, platform, variant=None):
if variant:
full_path = parent + MANIFEST[component][platform][variant]['relative_path']
else:
full_path = parent + MANIFEST[component][platform]['relative_path']
filename = os.path.basename(full_path)
if RETRIEVE and not os.path.exists(filename) and not os.path.exists(parent + filename):
# Download archive
fetch_file(full_path, filename)
ARCHIVES[platform].append(filename)
elif os.path.exists(filename):
print(" -> Found: " + filename)
ARCHIVES[platform].append(filename)
elif os.path.exists(parent + filename):
print(" -> Found: " + parent + filename)
ARCHIVES[platform].append(parent + filename)
else:
print(" -> Artifact: " + filename)
if VALIDATE and os.path.exists(filename):
if variant:
checksum = MANIFEST[component][platform][variant]['sha256']
else:
checksum = MANIFEST[component][platform]['sha256']
# Compare checksum
check_hash(filename, checksum)
if variant:
full_path = parent + MANIFEST[component][platform][variant]["relative_path"]
else:
full_path = parent + MANIFEST[component][platform]["relative_path"]
filename = os.path.basename(full_path)
if (
RETRIEVE
and not os.path.exists(filename)
and not os.path.exists(parent + filename)
):
# Download archive
fetch_file(full_path, filename)
ARCHIVES[platform].append(filename)
elif os.path.exists(filename):
print(" -> Found: " + filename)
ARCHIVES[platform].append(filename)
elif os.path.exists(parent + filename):
print(" -> Found: " + parent + filename)
ARCHIVES[platform].append(parent + filename)
else:
print(" -> Artifact: " + filename)
if VALIDATE and os.path.exists(filename):
if variant:
checksum = MANIFEST[component][platform][variant]["sha256"]
else:
checksum = MANIFEST[component][platform]["sha256"]
# Compare checksum
check_hash(filename, checksum)
def fetch_action(parent):
"""Do actions while parsing JSON"""
for component in MANIFEST.keys():
if not 'name' in MANIFEST[component]:
continue
"""Do actions while parsing JSON"""
for component in MANIFEST.keys():
if not "name" in MANIFEST[component]:
continue
if COMPONENT is not None and component != COMPONENT:
continue
if COMPONENT is not None and component != COMPONENT:
continue
print("\n" + MANIFEST[component]['name'] + ": " + MANIFEST[component]['version'])
print(
"\n" + MANIFEST[component]["name"] + ": " + MANIFEST[component]["version"]
)
for platform in MANIFEST[component].keys():
if "variant" in platform:
continue
for platform in MANIFEST[component].keys():
if "variant" in platform:
continue
if not platform in ARCHIVES:
ARCHIVES[platform] = []
if not platform in ARCHIVES:
ARCHIVES[platform] = []
if not isinstance(MANIFEST[component][platform], str):
if PLATFORM is not None and platform != PLATFORM:
print(" -> Skipping platform: " + platform)
continue
if not isinstance(MANIFEST[component][platform], str):
if PLATFORM is not None and platform != PLATFORM:
print(" -> Skipping platform: " + platform)
continue
if not "relative_path" in MANIFEST[component][platform]:
for variant in MANIFEST[component][platform].keys():
parse_artifact(parent, MANIFEST, component, platform, variant)
else:
parse_artifact(parent, MANIFEST, component, platform)
if not "relative_path" in MANIFEST[component][platform]:
for variant in MANIFEST[component][platform].keys():
parse_artifact(parent, MANIFEST, component, platform, variant)
else:
parse_artifact(parent, MANIFEST, component, platform)
def post_action():
"""Extract archives and merge directories"""
if len(ARCHIVES) == 0:
return
print("\nArchives:")
if not os.path.exists(OUTPUT):
os.makedirs(OUTPUT)
for platform in ARCHIVES:
for archive in ARCHIVES[platform]:
try:
binTag = archive.split("-")[3].split("_")[1]
print(platform, binTag)
except:
binTag = None
# Tar files
if UNROLLED and re.search(r"\.tar\.", archive):
print(":: tar: " + archive)
tarball = tarfile.open(archive)
topdir = os.path.commonprefix(tarball.getnames())
tarball.extractall()
tarball.close()
print(" -> Extracted: " + topdir + "/")
if COLLAPSE:
flatten_tree(topdir, OUTPUT + "/" + platform, binTag)
# Zip files
elif UNROLLED and re.search(r"\.zip", archive):
print(":: zip: " + archive)
with zipfile.ZipFile(archive) as zippy:
topdir = os.path.commonprefix(zippy.namelist())
zippy.extractall()
zippy.close()
print(" -> Extracted: " + topdir)
if COLLAPSE:
flatten_tree(topdir, OUTPUT + "/" + platform, binTag)
print("\nOutput: " + OUTPUT + "/")
for item in sorted(os.listdir(OUTPUT)):
if os.path.isdir(OUTPUT + "/" + item):
print(" - " + item + "/")
elif os.path.isfile(OUTPUT + "/" + item):
print(" - " + item)
"""Extract archives and merge directories"""
if len(ARCHIVES) == 0:
return
print("\nArchives:")
if not os.path.exists(OUTPUT):
os.makedirs(OUTPUT)
for platform in ARCHIVES:
for archive in ARCHIVES[platform]:
try:
binTag = archive.split("-")[3].split("_")[1]
print(platform, binTag)
except:
binTag = None
# Tar files
if UNROLLED and re.search(r"\.tar\.", archive):
print(":: tar: " + archive)
tarball = tarfile.open(archive)
topdir = os.path.commonprefix(tarball.getnames())
tarball.extractall()
tarball.close()
print(" -> Extracted: " + topdir + "/")
if COLLAPSE:
flatten_tree(topdir, OUTPUT + "/" + platform, binTag)
# Zip files
elif UNROLLED and re.search(r"\.zip", archive):
print(":: zip: " + archive)
with zipfile.ZipFile(archive) as zippy:
topdir = os.path.commonprefix(zippy.namelist())
zippy.extractall()
zippy.close()
print(" -> Extracted: " + topdir)
if COLLAPSE:
flatten_tree(topdir, OUTPUT + "/" + platform, binTag)
print("\nOutput: " + OUTPUT + "/")
for item in sorted(os.listdir(OUTPUT)):
if os.path.isdir(OUTPUT + "/" + item):
print(" - " + item + "/")
elif os.path.isfile(OUTPUT + "/" + item):
print(" - " + item)
# If running standalone
if __name__ == '__main__':
# Parse CLI arguments
PARSER = argparse.ArgumentParser()
# Input options
PARSER_GROUP = PARSER.add_mutually_exclusive_group(required=True)
PARSER_GROUP.add_argument('-u', '--url', dest='url', help='URL to manifest')
PARSER_GROUP.add_argument('-l', '--label', dest='label', help='Release label version')
PARSER.add_argument('-p', '--product', dest='product', help='Product name')
PARSER.add_argument('-o', '--output', dest='output', help='Output directory')
# Filter options
PARSER.add_argument('--component', dest='component', help='Component name')
PARSER.add_argument('--os', dest='os', help='Operating System')
PARSER.add_argument('--arch', dest='arch', help='Architecture')
# Toggle actions
PARSER.add_argument('-w', '--download', dest='retrieve', action='store_true', \
help='Download archives', default=True)
PARSER.add_argument('-W', '--no-download', dest='retrieve', action='store_false', \
help='Parse manifest without downloads')
PARSER.add_argument('-s', '--checksum', dest='validate', action='store_true', \
help='Verify SHA256 checksum', default=True)
PARSER.add_argument('-S', '--no-checksum', dest='validate', action='store_false', \
help='Skip SHA256 checksum validation')
PARSER.add_argument('-x', '--extract', dest='unrolled', action='store_true', \
help='Extract archives', default=True)
PARSER.add_argument('-X', '--no-extract', dest='unrolled', action='store_false', \
help='Do not extract archives')
PARSER.add_argument('-f', '--flatten', dest='collapse', action='store_true', \
help='Collapse directories', default=True)
PARSER.add_argument('-F', '--no-flatten', dest='collapse', action='store_false', \
help='Do not collapse directories')
ARGS = PARSER.parse_args()
#print(ARGS)
RETRIEVE = ARGS.retrieve
VALIDATE = ARGS.validate
UNROLLED = ARGS.unrolled
COLLAPSE = ARGS.collapse
# Define variables
if ARGS.label is not None:
LABEL = ARGS.label
if ARGS.product is not None:
PRODUCT = ARGS.product
if ARGS.url is not None:
URL = ARGS.url
if ARGS.output is not None:
OUTPUT = ARGS.output
if ARGS.component is not None:
COMPONENT = ARGS.component
if ARGS.os is not None:
OS = ARGS.os
if ARGS.arch is not None:
ARCH = ARGS.arch
if __name__ == "__main__":
# Parse CLI arguments
PARSER = argparse.ArgumentParser()
# Input options
PARSER_GROUP = PARSER.add_mutually_exclusive_group(required=True)
PARSER_GROUP.add_argument("-u", "--url", dest="url", help="URL to manifest")
PARSER_GROUP.add_argument(
"-l", "--label", dest="label", help="Release label version"
)
PARSER.add_argument("-p", "--product", dest="product", help="Product name")
PARSER.add_argument("-o", "--output", dest="output", help="Output directory")
# Filter options
PARSER.add_argument("--component", dest="component", help="Component name")
PARSER.add_argument("--os", dest="os", help="Operating System")
PARSER.add_argument("--arch", dest="arch", help="Architecture")
# Toggle actions
PARSER.add_argument(
"-w",
"--download",
dest="retrieve",
action="store_true",
help="Download archives",
default=True,
)
PARSER.add_argument(
"-W",
"--no-download",
dest="retrieve",
action="store_false",
help="Parse manifest without downloads",
)
PARSER.add_argument(
"-s",
"--checksum",
dest="validate",
action="store_true",
help="Verify SHA256 checksum",
default=True,
)
PARSER.add_argument(
"-S",
"--no-checksum",
dest="validate",
action="store_false",
help="Skip SHA256 checksum validation",
)
PARSER.add_argument(
"-x",
"--extract",
dest="unrolled",
action="store_true",
help="Extract archives",
default=True,
)
PARSER.add_argument(
"-X",
"--no-extract",
dest="unrolled",
action="store_false",
help="Do not extract archives",
)
PARSER.add_argument(
"-f",
"--flatten",
dest="collapse",
action="store_true",
help="Collapse directories",
default=True,
)
PARSER.add_argument(
"-F",
"--no-flatten",
dest="collapse",
action="store_false",
help="Do not collapse directories",
)
ARGS = PARSER.parse_args()
# print(ARGS)
RETRIEVE = ARGS.retrieve
VALIDATE = ARGS.validate
UNROLLED = ARGS.unrolled
COLLAPSE = ARGS.collapse
# Define variables
if ARGS.label is not None:
LABEL = ARGS.label
if ARGS.product is not None:
PRODUCT = ARGS.product
if ARGS.url is not None:
URL = ARGS.url
if ARGS.output is not None:
OUTPUT = ARGS.output
if ARGS.component is not None:
COMPONENT = ARGS.component
if ARGS.os is not None:
OS = ARGS.os
if ARGS.arch is not None:
ARCH = ARGS.arch
#
......@@ -253,22 +317,22 @@ if __name__ == '__main__':
# Sanity check
if not UNROLLED:
COLLAPSE = False
COLLAPSE = False
# Short-hand
if LABEL:
if PRODUCT:
URL = f"{DOMAIN}/compute/{PRODUCT}/redist/redistrib_{LABEL}.json"
else:
err("Must pass --product argument")
if PRODUCT:
URL = f"{DOMAIN}/compute/{PRODUCT}/redist/redistrib_{LABEL}.json"
else:
err("Must pass --product argument")
# Concatentate
if ARCH is not None and OS is not None:
PLATFORM = f"{OS}-{ARCH}"
PLATFORM = f"{OS}-{ARCH}"
elif ARCH is not None and OS is None:
err("Must pass --os argument")
err("Must pass --os argument")
elif OS is not None and ARCH is None:
err("Must pass --arch argument")
err("Must pass --arch argument")
#
# Run
......@@ -276,19 +340,19 @@ elif OS is not None and ARCH is None:
# Parse JSON
if os.path.isfile(URL):
with open(URL, "rb") as f:
MANIFEST = json.load(f)
with open(URL, "rb") as f:
MANIFEST = json.load(f)
else:
try:
MANIFEST = requests.get(URL).json()
except json.decoder.JSONDecodeError:
err("redistrib JSON manifest file not found")
try:
MANIFEST = requests.get(URL).json()
except json.decoder.JSONDecodeError:
err("redistrib JSON manifest file not found")
print(":: Parsing JSON: " + URL)
# Do stuff
fetch_action(os.path.dirname(URL) + "/")
if UNROLLED:
post_action()
post_action()
### END ###
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment