Move admin routes

This commit is contained in:
Minecon724 2025-04-02 19:32:07 +02:00
commit 3d4229d334
Signed by: Minecon724
GPG key ID: A02E6E67AB961189
3 changed files with 147 additions and 136 deletions

View file

@ -110,6 +110,7 @@ def ratelimit_handler(e):
# Import routes
from . import routes
from . import admin_routes
# Setup scheduler jobs
def setup_scheduler_jobs():

View file

@ -0,0 +1,137 @@
from flask import request, jsonify, render_template, redirect, url_for, flash, session
from functools import wraps
from . import app, db, limiter, csrf
from .models import Inquiry, Message, Settings, Admin
from .webhooks import WebhookError, inquiry_created, inquiry_closed, inquiry_reopened
def is_admin():
return 'admin_authenticated' in session and session['admin_authenticated']
# Admin authentication middleware
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not is_admin():
return redirect(url_for('admin_login', next=request.url))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin', methods=['GET'])
def admin_login():
return render_template('admin_login.html')
@app.route('/admin', methods=['POST'])
@limiter.limit("1 per minute", deduct_when=lambda response: not is_admin())
@limiter.limit("10 per hour")
def admin_login_post():
username = request.form.get('username')
password = request.form.get('password')
# Get admin user from database
admin = Admin.query.filter_by(username=username).first()
# Verify credentials
if admin and admin.verify_password(password):
session['admin_authenticated'] = True
session['admin_username'] = admin.username
# Redirect to next page if provided, otherwise to dashboard
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect(url_for('admin_dashboard'))
else:
flash('Invalid username or password. Try again in 1 minute.', 'error')
return redirect(url_for('admin_login'))
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_authenticated', None)
session.pop('admin_username', None)
flash('You have been logged out')
return redirect(url_for('admin_login'))
@app.route('/admin/dashboard')
@admin_required
def admin_dashboard():
# Get all inquiries, ordered by the latest message timestamp
inquiries = Inquiry.query.all()
# For each inquiry, get the latest message timestamp
inquiries_with_data = []
for inquiry in inquiries:
latest_message = Message.query.filter_by(inquiry_id=inquiry.id).order_by(Message.message_number.desc()).first()
message_count = Message.query.filter_by(inquiry_id=inquiry.id).count()
inquiries_with_data.append({
'inquiry': inquiry,
'latest_message': latest_message,
'message_count': message_count
})
# Sort by latest message timestamp (newest first)
inquiries_with_data.sort(key=lambda x: x['latest_message'].timestamp if x['latest_message'] else None, reverse=True)
return render_template('admin_dashboard.html', inquiries_with_data=inquiries_with_data)
@app.route('/admin/webhook', methods=['GET', 'POST'])
@admin_required
def admin_webhook():
settings = Settings.query.first()
if not settings:
settings = Settings()
db.session.add(settings)
db.session.commit()
if request.method == 'POST':
# Update webhook settings
settings.webhook_enabled = request.form.get('webhook_enabled') == 'on'
settings.webhook_url = request.form.get('webhook_url', '')
settings.webhook_secret = request.form.get('webhook_secret', '')
db.session.commit()
# Test webhook if enabled and URL is provided
if settings.webhook_enabled and settings.webhook_url:
try:
inquiry_created('1234abcd1234abcd', 'This is a test message')
flash('Webhook settings saved and test sent successfully')
except WebhookError as e:
flash(f'Webhook test failed: {str(e)}')
else:
flash('Webhook settings saved')
return redirect(url_for('admin_webhook'))
return render_template('admin_webhook.html', settings=settings)
@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
admin = Admin.query.filter_by(username=session['admin_username']).first()
if request.method == 'POST':
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Verify current password
if not admin.verify_password(current_password):
flash('Current password is incorrect', 'error')
return redirect(url_for('admin_settings'))
# Validate new password
if new_password != confirm_password:
flash('New passwords do not match', 'error')
return redirect(url_for('admin_settings'))
if len(new_password) < 8:
flash('Password must be at least 8 characters', 'error')
return redirect(url_for('admin_settings'))
# Update password
admin.password_hash = Admin.hash_password(new_password)
db.session.commit()
flash('Password updated successfully', 'success')
return redirect(url_for('admin_settings'))
return render_template('admin_settings.html')

View file

@ -1,22 +1,15 @@
from flask import request, jsonify, render_template, redirect, url_for, flash, session
from functools import wraps
from . import app, db, limiter, csrf, webhooks
from .webhooks import WebhookError
from .models import Inquiry, Message, Settings, Admin
import os
from datetime import datetime, timedelta
def is_admin():
return 'admin_authenticated' in session and session['admin_authenticated']
# Import admin routes (these will be registered with the app)
from . import admin_routes
# Admin authentication middleware
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not is_admin():
return redirect(url_for('admin_login', next=request.url))
return f(*args, **kwargs)
return decorated_function
# Use is_admin from admin_routes to check admin status
from .admin_routes import is_admin
@app.route('/', methods=['GET'])
def index():
@ -54,7 +47,7 @@ def create_inquiry():
@limiter.limit("10 per hour", deduct_when=lambda response: response.status_code != 200)
def inquiry(inquiry_id):
inquiry = Inquiry.query.get_or_404(inquiry_id)
is_admin = 'admin_authenticated' in session and session['admin_authenticated']
is_admin_user = is_admin()
if request.method == 'POST':
# Handle closed inquiry differently for admin vs non-admin
@ -77,12 +70,12 @@ def inquiry(inquiry_id):
message = Message(
content=message_content,
inquiry_id=inquiry_id,
is_admin=is_admin
is_admin=is_admin_user
)
db.session.add(message)
db.session.commit()
if not is_admin: # Admins don't need to be notified of their own messages
if not is_admin_user: # Admins don't need to be notified of their own messages
try:
webhooks.inquiry_message(inquiry_id, message_content)
except WebhookError as e:
@ -91,127 +84,7 @@ def inquiry(inquiry_id):
return redirect(url_for('inquiry', inquiry_id=inquiry_id))
messages = Message.query.filter_by(inquiry_id=inquiry_id).order_by(Message.message_number).all()
return render_template('inquiry.html', inquiry=inquiry, messages=messages, is_admin=is_admin)
@app.route('/admin', methods=['GET'])
def admin_login():
return render_template('admin_login.html')
@app.route('/admin', methods=['POST'])
@limiter.limit("1 per minute", deduct_when=lambda response: not is_admin())
@limiter.limit("10 per hour")
def admin_login_post():
username = request.form.get('username')
password = request.form.get('password')
# Get admin user from database
admin = Admin.query.filter_by(username=username).first()
# Verify credentials
if admin and admin.verify_password(password):
session['admin_authenticated'] = True
session['admin_username'] = admin.username
# Redirect to next page if provided, otherwise to dashboard
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect(url_for('admin_dashboard'))
else:
flash('Invalid username or password. Try again in 1 minute.', 'error')
return redirect(url_for('admin_login'))
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_authenticated', None)
session.pop('admin_username', None)
flash('You have been logged out')
return redirect(url_for('admin_login'))
@app.route('/admin/dashboard')
@admin_required
def admin_dashboard():
# Get all inquiries, ordered by the latest message timestamp
inquiries = Inquiry.query.all()
# For each inquiry, get the latest message timestamp
inquiries_with_data = []
for inquiry in inquiries:
latest_message = Message.query.filter_by(inquiry_id=inquiry.id).order_by(Message.message_number.desc()).first()
message_count = Message.query.filter_by(inquiry_id=inquiry.id).count()
inquiries_with_data.append({
'inquiry': inquiry,
'latest_message': latest_message,
'message_count': message_count
})
# Sort by latest message timestamp (newest first)
inquiries_with_data.sort(key=lambda x: x['latest_message'].timestamp if x['latest_message'] else None, reverse=True)
return render_template('admin_dashboard.html', inquiries_with_data=inquiries_with_data)
@app.route('/admin/webhook', methods=['GET', 'POST'])
@admin_required
def admin_webhook():
settings = Settings.query.first()
if not settings:
settings = Settings()
db.session.add(settings)
db.session.commit()
if request.method == 'POST':
# Update webhook settings
settings.webhook_enabled = request.form.get('webhook_enabled') == 'on'
settings.webhook_url = request.form.get('webhook_url', '')
settings.webhook_secret = request.form.get('webhook_secret', '')
db.session.commit()
# Test webhook if enabled and URL is provided
if settings.webhook_enabled and settings.webhook_url:
try:
webhooks.inquiry_created('1234abcd1234abcd', 'This is a test message')
flash('Webhook settings saved and test sent successfully')
except WebhookError as e:
flash(f'Webhook test failed: {str(e)}')
else:
flash('Webhook settings saved')
return redirect(url_for('admin_webhook'))
return render_template('admin_webhook.html', settings=settings)
@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
admin = Admin.query.filter_by(username=session['admin_username']).first()
if request.method == 'POST':
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Verify current password
if not admin.verify_password(current_password):
flash('Current password is incorrect', 'error')
return redirect(url_for('admin_settings'))
# Validate new password
if new_password != confirm_password:
flash('New passwords do not match', 'error')
return redirect(url_for('admin_settings'))
if len(new_password) < 8:
flash('Password must be at least 8 characters', 'error')
return redirect(url_for('admin_settings'))
# Update password
admin.password_hash = Admin.hash_password(new_password)
db.session.commit()
flash('Password updated successfully', 'success')
return redirect(url_for('admin_settings'))
return render_template('admin_settings.html')
return render_template('inquiry.html', inquiry=inquiry, messages=messages, is_admin=is_admin_user)
@app.route('/inquiry/<inquiry_id>/delete', methods=['POST'])
@limiter.limit("10 per hour", deduct_when=lambda response: response.status_code != 200)
@ -223,7 +96,7 @@ def delete_inquiry(inquiry_id):
db.session.commit()
# Check if user is admin and redirect accordingly
if 'admin_authenticated' in session and session['admin_authenticated']:
if is_admin():
flash('Inquiry deleted successfully')
return redirect(url_for('admin_dashboard'))
else: