#!/usr/bin/python3 import json, os, time, tempfile, shutil import argparse, urllib.request from hashlib import sha256 META_FILENAME = 'meta-v1.json' BASE_DIR = 'data' WAITING_DIR = 'pending' TEMP_DIR = tempfile.TemporaryDirectory() # file scanner functions def match_name(filename: str, extension: str=None, exact_name: str=None): if exact_name is not None: return filename == exact_name elif extension is not None: return filename.lower().endswith(extension.lower()) return True def scan_for_file(ask: bool=False, extension: str=None, exact_name: str=None) -> tuple[str]: for file in os.scandir(): if file.is_dir(): continue if not match_name(file.name, extension, exact_name): continue if ask: if not confirm(f"Found {file.name} in the current directory, do you want to proceed with it?"): return (None, None) return (file.path, file.name) return (None, None) def wait_for_file(waiting_dir: str, extension: str=None) -> tuple[str]: print(f"Please put a {extension} file in {waiting_dir}") while True: files = [i for i in os.scandir(waiting_dir)] if len(files) == 0: time.sleep(0.5) continue file = files[0] filepath = file.path filename = file.name if match_name(filename, extension): break else: os.remove(filepath) print(f"Not a {extension} file: {filename}") return (filepath, filename) def just_find_file(name: str) -> tuple[str]: spl = name.split('.') extension = spl[-1] exact_name = name[:-len(extension)-1] if len(spl) > 1 else None filepath, filename = scan_for_file(True, extension, exact_name) if filepath is None: try: os.makedirs(WAITING_DIR, exist_ok=True) filepath, filename = wait_for_file(WAITING_DIR, extension) except KeyboardInterrupt: os.rmdir(WAITING_DIR) return if filepath is not None: tpath = os.path.join(TEMP_DIR.name, filename) shutil.move(filepath, tpath) filepath = tpath return (filepath, filename) # directory util fnctions def make_path(channel: str, version: str=None, filename: str=None) -> str: args = [channel, version, filename] args = [i for i in args if i is not None] return os.path.join(BASE_DIR, *args) # metadata functions def load_latest_data(channel: str) -> dict: path = make_path(channel, 'latest', META_FILENAME) if os.path.isfile(path): return json.loads(open(path).read()) return {'id': 0} def write_metadata(channel: str, metadata: dict): version = metadata['label'] metadata = json.dumps(metadata) for filepath in [make_path(channel, version, META_FILENAME), make_path(channel, 'latest', META_FILENAME)]: with open(filepath, 'w') as file: file.write(metadata) def commit_and_push(channel: str, version: str): os.system(f'git add data/{channel}/latest') os.system(f'git add data/{channel}/{version}') os.system(f'git commit -m "[releaser] Release {version} on {channel}"') os.system('git push') # other def confirm(prompt: str) -> bool: confirmed = input(prompt + ' (Y/N) ') return confirmed.lower() == 'y' def hash_file(filepath: str) -> str: with open(filepath, 'rb') as file: return sha256(file.read()).hexdigest() def download_and_hash(url: str) -> str: sha256_hash = sha256() try: print(f"Downloading and hashing {url}...") with urllib.request.urlopen(url) as response: # Read and hash in chunks to handle large files efficiently while True: chunk = response.read(4096) # 4KB chunks if not chunk: break sha256_hash.update(chunk) return sha256_hash.hexdigest() except Exception as e: raise Exception(f"Download or hashing failed: {str(e)}") # main def main(config: dict, version: str, channel: str, local: bool): file_url = None file_hash = None changelog = confirm('Do you want to include a changelog?') changelog_url = None if changelog: chlog_filepath, chlog_filename = just_find_file('changelog.txt') changelog_url = f'{config["baseUrl"]}/{BASE_DIR}/{channel}/{version}/changelog.txt' latest_data = load_latest_data(channel) version_dir = make_path(channel, version) os.makedirs(make_path(channel, 'latest'), exist_ok=True) os.mkdir(version_dir) if changelog: shutil.move(chlog_filepath, os.path.join(version_dir, 'changelog.txt')) if local: jar_filepath, jar_filename = just_find_file('jar') file_hash = hash_file(jar_filepath) file_url = f'{config["baseUrl"]}/{BASE_DIR}/{channel}/{version}/{jar_filename}' shutil.move(jar_filepath, os.path.join(version_dir, jar_filename)) else: groupId, artifactId = config["maven"]["package"].split(':') file_url = config["maven"]["repo"] + f"/{groupId.replace('.', '/')}/{artifactId}/{version}/{artifactId}-{version}.jar" file_hash = download_and_hash(file_url) metadata = { 'label': version, 'id': latest_data['id'] + 1, 'timestamp': int(time.time()), 'file': file_url, 'changelog': changelog_url, 'sha256': file_hash } write_metadata(channel, metadata) try: os.rmdir(WAITING_DIR) except FileNotFoundError: pass if confirm("Commit and push?"): commit_and_push(channel, version) print("Done") if __name__ == "__main__": config = json.loads(open('config.json', 'r').read()) channels = [c.strip() for c in open(os.path.join('data', 'channels.txt'))] parser = argparse.ArgumentParser( description='Release', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument('version', help='The version to release') parser.add_argument('-c', '--channel', default=config["defaultChannel"], choices=channels, help='Release channel') parser.add_argument('-l', '--local', action='store_true', help='From local file instead of repo') args = parser.parse_args() version = args.version channel = args.channel local = args.local if version == "latest": print("Version can't be \"latest\"") exit() def check_version_exists(version: str) -> str | None: """ Check if version exists in any channel. If yes, returns that channel, otherwise None """ for channel in list_channels(): if os.path.isdir(make_path(channel, version)): return channel existing = [c for c in channels if os.path.isdir(make_path(c, version))] if channel in existing: print(f"Version {version} already exists in channel {channel}.") exit() if len(existing) > 0: if not confirm(f"Version {version} already exists in \"{', '.join(existing)}\". Do you still want to proceed?"): exit() main(config, version, channel, local)