jarupdater-metadata-template/release.py

231 lines
6.8 KiB
Python
Raw Permalink Normal View History

2024-10-24 12:49:37 +02:00
#!/usr/bin/python3
import json, os, time, tempfile, shutil
2024-10-25 15:46:01 +02:00
import argparse, urllib.request
2024-10-24 12:49:37 +02:00
from hashlib import sha256
META_FILENAME = 'meta-v1.json'
BASE_DIR = 'data'
WAITING_DIR = 'pending'
TEMP_DIR = tempfile.TemporaryDirectory()
# file scanner functions
def match_name(filename: str, extension: str=None, exact_name: str=None):
if exact_name is not None:
return filename == exact_name
elif extension is not None:
return filename.lower().endswith(extension.lower())
return True
def scan_for_file(ask: bool=False, extension: str=None, exact_name: str=None) -> tuple[str]:
for file in os.scandir():
if file.is_dir(): continue
if not match_name(file.name, extension, exact_name): continue
if ask:
if not confirm(f"Found {file.name} in the current directory, do you want to proceed with it?"):
return (None, None)
return (file.path, file.name)
return (None, None)
def wait_for_file(waiting_dir: str, extension: str=None) -> tuple[str]:
print(f"Please put a {extension} file in {waiting_dir}")
while True:
files = [i for i in os.scandir(waiting_dir)]
if len(files) == 0:
time.sleep(0.5)
continue
file = files[0]
filepath = file.path
filename = file.name
if match_name(filename, extension):
break
else:
os.remove(filepath)
print(f"Not a {extension} file: {filename}")
return (filepath, filename)
def just_find_file(name: str) -> tuple[str]:
spl = name.split('.')
extension = spl[-1]
exact_name = name[:-len(extension)-1] if len(spl) > 1 else None
filepath, filename = scan_for_file(True, extension, exact_name)
if filepath is None:
try:
os.makedirs(WAITING_DIR, exist_ok=True)
filepath, filename = wait_for_file(WAITING_DIR, extension)
except KeyboardInterrupt:
os.rmdir(WAITING_DIR)
return
if filepath is not None:
tpath = os.path.join(TEMP_DIR.name, filename)
shutil.move(filepath, tpath)
filepath = tpath
return (filepath, filename)
# directory util fnctions
def make_path(channel: str, version: str=None, filename: str=None) -> str:
args = [channel, version, filename]
args = [i for i in args if i is not None]
return os.path.join(BASE_DIR, *args)
# metadata functions
def load_latest_data(channel: str) -> dict:
path = make_path(channel, 'latest', META_FILENAME)
if os.path.isfile(path):
return json.loads(open(path).read())
return {'id': 0}
def write_metadata(channel: str, metadata: dict):
version = metadata['label']
metadata = json.dumps(metadata)
for filepath in [make_path(channel, version, META_FILENAME), make_path(channel, 'latest', META_FILENAME)]:
with open(filepath, 'w') as file:
file.write(metadata)
def commit_and_push(channel: str, version: str):
2024-10-25 15:46:01 +02:00
os.system(f'git add data/{channel}/latest')
os.system(f'git add data/{channel}/{version}')
2024-10-24 12:49:37 +02:00
os.system(f'git commit -m "[releaser] Release {version} on {channel}"')
os.system('git push')
# other
def confirm(prompt: str) -> bool:
confirmed = input(prompt + ' (Y/N) ')
return confirmed.lower() == 'y'
def hash_file(filepath: str) -> str:
with open(filepath, 'rb') as file:
return sha256(file.read()).hexdigest()
2024-10-25 15:46:01 +02:00
def download_and_hash(url: str) -> str:
sha256_hash = sha256()
try:
print(f"Downloading and hashing {url}...")
with urllib.request.urlopen(url) as response:
# Read and hash in chunks to handle large files efficiently
while True:
chunk = response.read(4096) # 4KB chunks
if not chunk:
break
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
raise Exception(f"Download or hashing failed: {str(e)}")
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
# main
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
def main(config: dict, version: str, channel: str, local: bool):
file_url = None
file_hash = None
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
changelog = confirm('Do you want to include a changelog?')
if changelog:
chlog_filepath, chlog_filename = just_find_file('changelog.txt')
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
latest_data = load_latest_data(channel)
version_dir = make_path(channel, version)
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
os.makedirs(make_path(channel, 'latest'), exist_ok=True)
os.mkdir(version_dir)
if changelog: shutil.move(chlog_filepath, os.path.join(version_dir, 'changelog.txt'))
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
if local:
jar_filepath, jar_filename = just_find_file('jar')
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
file_hash = hash_file(jar_filepath)
file_url = f'{config["baseUrl"]}/{BASE_DIR}/{channel}/{version}/{jar_filename}'
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
shutil.move(jar_filepath, os.path.join(version_dir, jar_filename))
else:
groupId, artifactId = config["maven"]["package"].split(':')
file_url = config["maven"]["repo"] + f"/{groupId.replace('.', '/')}/{artifactId}/{version}/{artifactId}-{version}.jar"
file_hash = download_and_hash(file_url)
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
metadata = {
'label': version,
'id': latest_data['id'] + 1,
'timestamp': int(time.time()),
'file': file_url,
'sha256': file_hash
}
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
write_metadata(channel, metadata)
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
try:
os.rmdir(WAITING_DIR)
except FileNotFoundError:
pass
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
if confirm("Commit and push?"):
commit_and_push(channel, version)
2024-10-24 12:49:37 +02:00
2024-10-25 15:46:01 +02:00
print("Done")
2024-10-24 12:49:37 +02:00
if __name__ == "__main__":
2024-10-25 15:46:01 +02:00
config = json.loads(open('config.json', 'r').read())
channels = [c.strip() for c in open(os.path.join('data', 'channels.txt'))]
parser = argparse.ArgumentParser(
description='Release',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('version',
help='The version to release')
parser.add_argument('-c', '--channel',
default=config["defaultChannel"],
choices=channels,
help='Release channel')
parser.add_argument('-l', '--local',
action='store_true',
help='From local file instead of repo')
args = parser.parse_args()
version = args.version
channel = args.channel
local = args.local
if version == "latest":
print("Version can't be \"latest\"")
exit()
def check_version_exists(version: str) -> str | None:
"""
Check if version exists in any channel.
If yes, returns that channel, otherwise None
"""
for channel in list_channels():
if os.path.isdir(make_path(channel, version)):
return channel
existing = [c for c in channels if os.path.isdir(make_path(c, version))]
if channel in existing:
print(f"Version {version} already exists in channel {channel}.")
exit()
if len(existing) > 0:
if not confirm(f"Version {version} already exists in \"{', '.join(existing)}\". Do you still want to proceed?"):
exit()
main(config, version, channel, local)