Move login to auth0

This commit is contained in:
Minecon724 2025-04-03 09:03:39 +02:00
commit 40da75e2d7
Signed by: Minecon724
GPG key ID: A02E6E67AB961189
10 changed files with 106 additions and 179 deletions

View file

@ -19,8 +19,9 @@ dependencies = [
"psycopg2-binary (>=2.9.9,<3.0.0)",
"redis (>=5.0.0,<6.0.0)",
"flask-session (>=0.5.0,<1.0.0)",
"argon2-cffi (>=23.0.0,<24.0.0)",
"flask-apscheduler (>=1.13.1,<2.0.0)"
"flask-apscheduler (>=1.13.1,<2.0.0)",
"authlib (>=1.5.2,<2.0.0)",
"requests (>=2.32.3,<3.0.0)"
]
[tool.poetry]

View file

@ -9,8 +9,8 @@ from flask_wtf.csrf import CSRFProtect
from flask_session import Session
import redis
from werkzeug.middleware.proxy_fix import ProxyFix
from argon2 import PasswordHasher
from flask_apscheduler import APScheduler
from authlib.integrations.flask_client import OAuth
# Load environment variables from .env file
load_dotenv()
@ -56,14 +56,6 @@ app.config['SCHEDULER_TIMEZONE'] = 'UTC'
if app.config['BEHIND_PROXY']:
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=2)
# Initialize password hasher
# Parameters source: https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html#argon2id
password_hasher = PasswordHasher(
time_cost=2,
memory_cost=19456,
parallelism=1
)
# Initialize session with Redis storage
Session(app)
@ -80,8 +72,20 @@ db = SQLAlchemy(app)
scheduler = APScheduler()
scheduler.init_app(app)
oauth = OAuth(app)
oauth.register(
"auth0",
client_id=os.environ.get('AUTH0_CLIENT_ID'),
client_secret=os.environ.get('AUTH0_CLIENT_SECRET'),
client_kwargs={
"scope": "openid profile email",
},
server_metadata_url=f'https://{os.environ.get("AUTH0_DOMAIN")}/.well-known/openid-configuration'
)
# Import models
from .models import Inquiry, Message, Settings, Admin
from .models import Inquiry, Message, Settings
# Ensure tables are created when app is loaded by Gunicorn
with app.app_context():
@ -150,7 +154,6 @@ def run_migrations_command():
def init_db_command():
"""Create database tables and initialize default data."""
# Explicitly import models here to ensure they are registered before create_all
from .models import Inquiry, Message, Settings, Admin
with app.app_context():
print("Creating database tables...")
db.create_all() # Should now create all tables including Inquiry
@ -175,27 +178,6 @@ def init_db_command():
print("Default settings initialized.")
else:
print("Settings already exist.")
print("Initializing admin user...")
# Initialize admin user if it doesn't exist
# Note: Models already imported above
admin_user = Admin.query.filter_by(username=app.config['ADMIN_USERNAME']).first()
if not admin_user and app.config['ADMIN_PASSWORD'] == None:
print("Admin user not found and no password provided. Skipping admin user initialization.")
elif app.config['ADMIN_FORCE_RESET'] or not admin_user:
if admin_user:
admin_user.password_hash = Admin.hash_password(app.config['ADMIN_PASSWORD'])
print("Admin user password reset.")
else:
admin_user = Admin(
username=app.config['ADMIN_USERNAME'],
password_hash=Admin.hash_password(app.config['ADMIN_PASSWORD'])
)
db.session.add(admin_user)
db.session.commit()
print("Admin user initialized.")
else:
print("Admin user already exists.")
print("Database initialization complete.")
def run() -> None:
@ -206,7 +188,7 @@ def run() -> None:
# Consider if you still need the initialization logic within run().
with app.app_context():
# Explicitly import all models to ensure they're registered before db.create_all()
from .models import Inquiry, Message, Settings, Admin
from .models import Inquiry, Message, Settings
db.create_all()
# Initialize settings if they don't exist
@ -219,13 +201,4 @@ def run() -> None:
db.session.add(default_settings)
db.session.commit()
# Initialize admin user if it doesn't exist
if not Admin.query.filter_by(username=app.config['ADMIN_USERNAME']).first():
admin_user = Admin(
username=app.config['ADMIN_USERNAME'],
password_hash=Admin.hash_password(app.config['ADMIN_PASSWORD'])
)
db.session.add(admin_user)
db.session.commit()
app.run(debug=True)

View file

@ -1,11 +1,18 @@
from flask import request, jsonify, render_template, redirect, url_for, flash, session
from functools import wraps
from . import app, db, limiter, csrf
from .models import Inquiry, Message, Settings, Admin
from . import app, db, limiter, csrf, oauth
from .models import Inquiry, Message, Settings
from .webhooks import WebhookError, inquiry_created, inquiry_closed, inquiry_reopened
import os
from urllib.parse import urlencode, quote_plus
import time
def is_admin():
return 'admin_authenticated' in session and session['admin_authenticated']
if 'user' in session and session['user'] is not None:
if session['user']['userinfo']['exp'] > time.time():
return True
else:
return False
# Admin authentication middleware
def admin_required(f):
@ -16,43 +23,50 @@ def admin_required(f):
return f(*args, **kwargs)
return decorated_function
@app.route('/admin', methods=['GET'])
def admin_login():
return render_template('admin_login.html')
@app.route('/admin', methods=['POST'])
@limiter.limit("1 per minute", deduct_when=lambda response: not is_admin())
@limiter.limit("10 per hour")
def admin_login_post():
username = request.form.get('username')
password = request.form.get('password')
# Get admin user from database
admin = Admin.query.filter_by(username=username).first()
# Verify credentials
if admin and admin.verify_password(password):
session['admin_authenticated'] = True
session['admin_username'] = admin.username
# Redirect to next page if provided, otherwise to dashboard
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect(url_for('admin_dashboard'))
else:
flash('Invalid username or password. Try again in 1 minute.', 'error')
return redirect(url_for('admin_login'))
@app.route('/admin/login', methods=['GET'])
def admin_login():
next_page = request.args.get('next') or url_for('admin_dashboard')
if is_admin():
return redirect(next_page)
return render_template('admin_login.html', next=next_page)
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_authenticated', None)
session.pop('admin_username', None)
flash('You have been logged out')
"""
Logs out the user from Auth0 and redirects to the admin login page.
This endpoint clears the session and redirects to Auth0's logout endpoint.
"""
session.clear()
params = {
'returnTo': url_for('admin_oauth_logout', _external=True),
'client_id': os.environ.get('AUTH0_CLIENT_ID')
}
return redirect(
f"https://{os.environ.get('AUTH0_DOMAIN')}/v2/logout?{urlencode(params, quote_via=quote_plus)}"
)
@app.route('/admin/oauth/login', methods=['GET'])
def admin_oauth_login():
return oauth.auth0.authorize_redirect(
redirect_uri=url_for("admin_oauth_callback", _external=True, next=request.args.get('next'))
)
@app.route('/admin/oauth/callback', methods=['GET', 'POST'])
@limiter.limit("10 per hour")
def admin_oauth_callback():
token = oauth.auth0.authorize_access_token()
print(token)
session["user"] = token
next_page = request.args.get('next') or url_for('admin_dashboard')
return redirect(next_page)
@app.route('/admin/oauth/logout')
def admin_oauth_logout():
return redirect(url_for('admin_login'))
@app.route('/admin/dashboard')
@app.route('/admin')
@admin_required
def admin_dashboard():
# Get all inquiries, ordered by the latest message timestamp
@ -103,35 +117,8 @@ def admin_webhook():
return render_template('admin_webhook.html', settings=settings)
@app.route('/admin/settings', methods=['GET', 'POST'])
@app.route('/admin/settings', methods=['GET'])
@admin_required
def admin_settings():
admin = Admin.query.filter_by(username=session['admin_username']).first()
if request.method == 'POST':
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Verify current password
if not admin.verify_password(current_password):
flash('Current password is incorrect', 'error')
return redirect(url_for('admin_settings'))
# Validate new password
if new_password != confirm_password:
flash('New passwords do not match', 'error')
return redirect(url_for('admin_settings'))
if len(new_password) < 8:
flash('Password must be at least 8 characters', 'error')
return redirect(url_for('admin_settings'))
# Update password
admin.password_hash = Admin.hash_password(new_password)
db.session.commit()
flash('Password updated successfully', 'success')
return redirect(url_for('admin_settings'))
# The admin settings page now only shows OAuth info since we removed the Admin model
return render_template('admin_settings.html')

View file

@ -0,0 +1,31 @@
from sqlalchemy import inspect
from flask import current_app
def run_migration(db):
"""Remove the admin table from the database if it exists."""
current_app.logger.info("Running migration: remove_admin_table")
# Check if the admin table exists
inspector = inspect(db.engine)
tables = inspector.get_table_names()
# Track if we need to commit changes
changes_made = False
# Drop the admin table if it exists
if 'admin' in tables:
current_app.logger.info("Dropping admin table")
with db.engine.connect() as conn:
conn.execute(db.text("DROP TABLE admin"))
# We need to commit within the connection context for some database types
conn.commit()
changes_made = True
else:
current_app.logger.info("admin table does not exist, no action needed")
if changes_made:
current_app.logger.info("Migration completed successfully")
else:
current_app.logger.info("No changes needed")
return changes_made

View file

@ -1,6 +1,5 @@
from .message import Message
from .inquiry import Inquiry
from .settings import Settings
from .admin import Admin
__all__ = ['Message', 'Inquiry', 'Settings', 'Admin']
__all__ = ['Message', 'Inquiry', 'Settings']

View file

@ -1,29 +0,0 @@
from .. import db
import argon2
from .. import password_hasher
class Admin(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
@classmethod
def hash_password(cls, password):
"""Hash a password using Argon2id"""
return password_hasher.hash(password)
def rehash_password(self, password):
"""Rehash a password using Argon2id"""
self.password_hash = self.hash_password(password)
db.session.add(self)
db.session.commit()
def verify_password(self, password):
"""Verify a password against the stored hash"""
try:
password_hasher.verify(self.password_hash, password)
if password_hasher.check_needs_rehash(self.password_hash):
self.rehash_password(password)
return True
except argon2.exceptions.VerifyMismatchError:
return False

View file

@ -1,12 +1,8 @@
from flask import request, jsonify, render_template, redirect, url_for, flash, session
from . import app, db, limiter, csrf, webhooks
from . import app, db, limiter, webhooks
from .webhooks import WebhookError
from .models import Inquiry, Message, Settings, Admin
from .models import Inquiry, Message
import os
from datetime import datetime, timedelta
# Import admin routes (these will be registered with the app)
from . import admin_routes
# Use is_admin from admin_routes to check admin status
from .admin_routes import is_admin

View file

@ -7,16 +7,7 @@
{{ flash_messages() }}
<form method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div>
<label for="username">Username:</label>
<input type="text" id="username" name="username" required>
</div>
<div>
<label for="password">Password:</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit">Login</button>
</form>
<div style="text-align: center; margin-top: 20px;">
<button><a href="{{ url_for('admin_oauth_login') }}" style="color: white; text-decoration: none;">Login with Auth0</a></button>
</div>
{% endblock %}

View file

@ -10,29 +10,6 @@
{{ admin_header() }}
{{ flash_messages() }}
<div class="settings-form">
<h3>Change Admin Password</h3>
<form method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div>
<label for="current_password">Current Password:</label>
<input type="password" id="current_password" name="current_password" required>
</div>
<div>
<label for="new_password">New Password:</label>
<input type="password" id="new_password" name="new_password" required>
<small style="display: block; margin-top: 0.25rem; color: #6c757d;">Password must be at least 8 characters</small>
</div>
<div>
<label for="confirm_password">Confirm New Password:</label>
<input type="password" id="confirm_password" name="confirm_password" required>
</div>
<button type="submit">Update Password</button>
</form>
</div>
<br>
<div class="settings-info">
<h3>System Configuration</h3>
<p>The following settings are configured through environment variables:</p>

View file

@ -1,5 +1,6 @@
{% macro admin_header() %}
<div style="margin-bottom: 1rem;">
<span style="margin-right: 1rem;">{{ session.get('user')['userinfo']['name'] }}</span>
<a href="{{ url_for('admin_dashboard') }}" style="margin-right: 1rem;">Admin Dashboard</a>
<a href="{{ url_for('admin_settings') }}" style="margin-right: 1rem;">Admin Settings</a>
<a href="{{ url_for('admin_webhook') }}" style="margin-right: 1rem;">Webhook Settings</a>