diff --git a/src/anonchat/__init__.py b/src/anonchat/__init__.py index 197801a..21b1223 100644 --- a/src/anonchat/__init__.py +++ b/src/anonchat/__init__.py @@ -110,6 +110,7 @@ def ratelimit_handler(e): # Import routes from . import routes +from . import admin_routes # Setup scheduler jobs def setup_scheduler_jobs(): diff --git a/src/anonchat/admin_routes.py b/src/anonchat/admin_routes.py new file mode 100644 index 0000000..ec16e15 --- /dev/null +++ b/src/anonchat/admin_routes.py @@ -0,0 +1,137 @@ +from flask import request, jsonify, render_template, redirect, url_for, flash, session +from functools import wraps +from . import app, db, limiter, csrf +from .models import Inquiry, Message, Settings, Admin +from .webhooks import WebhookError, inquiry_created, inquiry_closed, inquiry_reopened + +def is_admin(): + return 'admin_authenticated' in session and session['admin_authenticated'] + +# Admin authentication middleware +def admin_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not is_admin(): + return redirect(url_for('admin_login', next=request.url)) + return f(*args, **kwargs) + return decorated_function + +@app.route('/admin', methods=['GET']) +def admin_login(): + return render_template('admin_login.html') + +@app.route('/admin', methods=['POST']) +@limiter.limit("1 per minute", deduct_when=lambda response: not is_admin()) +@limiter.limit("10 per hour") +def admin_login_post(): + username = request.form.get('username') + password = request.form.get('password') + + # Get admin user from database + admin = Admin.query.filter_by(username=username).first() + + # Verify credentials + if admin and admin.verify_password(password): + session['admin_authenticated'] = True + session['admin_username'] = admin.username + + # Redirect to next page if provided, otherwise to dashboard + next_page = request.args.get('next') + if next_page: + return redirect(next_page) + return redirect(url_for('admin_dashboard')) + else: + flash('Invalid username or password. Try again in 1 minute.', 'error') + + return redirect(url_for('admin_login')) + +@app.route('/admin/logout') +def admin_logout(): + session.pop('admin_authenticated', None) + session.pop('admin_username', None) + flash('You have been logged out') + return redirect(url_for('admin_login')) + +@app.route('/admin/dashboard') +@admin_required +def admin_dashboard(): + # Get all inquiries, ordered by the latest message timestamp + inquiries = Inquiry.query.all() + # For each inquiry, get the latest message timestamp + inquiries_with_data = [] + for inquiry in inquiries: + latest_message = Message.query.filter_by(inquiry_id=inquiry.id).order_by(Message.message_number.desc()).first() + message_count = Message.query.filter_by(inquiry_id=inquiry.id).count() + inquiries_with_data.append({ + 'inquiry': inquiry, + 'latest_message': latest_message, + 'message_count': message_count + }) + + # Sort by latest message timestamp (newest first) + inquiries_with_data.sort(key=lambda x: x['latest_message'].timestamp if x['latest_message'] else None, reverse=True) + + return render_template('admin_dashboard.html', inquiries_with_data=inquiries_with_data) + +@app.route('/admin/webhook', methods=['GET', 'POST']) +@admin_required +def admin_webhook(): + settings = Settings.query.first() + if not settings: + settings = Settings() + db.session.add(settings) + db.session.commit() + + if request.method == 'POST': + # Update webhook settings + settings.webhook_enabled = request.form.get('webhook_enabled') == 'on' + settings.webhook_url = request.form.get('webhook_url', '') + settings.webhook_secret = request.form.get('webhook_secret', '') + db.session.commit() + + # Test webhook if enabled and URL is provided + if settings.webhook_enabled and settings.webhook_url: + try: + inquiry_created('1234abcd1234abcd', 'This is a test message') + flash('Webhook settings saved and test sent successfully') + except WebhookError as e: + flash(f'Webhook test failed: {str(e)}') + else: + flash('Webhook settings saved') + + return redirect(url_for('admin_webhook')) + + return render_template('admin_webhook.html', settings=settings) + +@app.route('/admin/settings', methods=['GET', 'POST']) +@admin_required +def admin_settings(): + admin = Admin.query.filter_by(username=session['admin_username']).first() + + if request.method == 'POST': + current_password = request.form.get('current_password') + new_password = request.form.get('new_password') + confirm_password = request.form.get('confirm_password') + + # Verify current password + if not admin.verify_password(current_password): + flash('Current password is incorrect', 'error') + return redirect(url_for('admin_settings')) + + # Validate new password + if new_password != confirm_password: + flash('New passwords do not match', 'error') + return redirect(url_for('admin_settings')) + + if len(new_password) < 8: + flash('Password must be at least 8 characters', 'error') + return redirect(url_for('admin_settings')) + + # Update password + admin.password_hash = Admin.hash_password(new_password) + db.session.commit() + + flash('Password updated successfully', 'success') + return redirect(url_for('admin_settings')) + + return render_template('admin_settings.html') \ No newline at end of file diff --git a/src/anonchat/routes.py b/src/anonchat/routes.py index 6eec876..db4fd3f 100644 --- a/src/anonchat/routes.py +++ b/src/anonchat/routes.py @@ -1,22 +1,15 @@ from flask import request, jsonify, render_template, redirect, url_for, flash, session -from functools import wraps from . import app, db, limiter, csrf, webhooks from .webhooks import WebhookError from .models import Inquiry, Message, Settings, Admin import os from datetime import datetime, timedelta -def is_admin(): - return 'admin_authenticated' in session and session['admin_authenticated'] +# Import admin routes (these will be registered with the app) +from . import admin_routes -# Admin authentication middleware -def admin_required(f): - @wraps(f) - def decorated_function(*args, **kwargs): - if not is_admin(): - return redirect(url_for('admin_login', next=request.url)) - return f(*args, **kwargs) - return decorated_function +# Use is_admin from admin_routes to check admin status +from .admin_routes import is_admin @app.route('/', methods=['GET']) def index(): @@ -54,7 +47,7 @@ def create_inquiry(): @limiter.limit("10 per hour", deduct_when=lambda response: response.status_code != 200) def inquiry(inquiry_id): inquiry = Inquiry.query.get_or_404(inquiry_id) - is_admin = 'admin_authenticated' in session and session['admin_authenticated'] + is_admin_user = is_admin() if request.method == 'POST': # Handle closed inquiry differently for admin vs non-admin @@ -77,12 +70,12 @@ def inquiry(inquiry_id): message = Message( content=message_content, inquiry_id=inquiry_id, - is_admin=is_admin + is_admin=is_admin_user ) db.session.add(message) db.session.commit() - if not is_admin: # Admins don't need to be notified of their own messages + if not is_admin_user: # Admins don't need to be notified of their own messages try: webhooks.inquiry_message(inquiry_id, message_content) except WebhookError as e: @@ -91,127 +84,7 @@ def inquiry(inquiry_id): return redirect(url_for('inquiry', inquiry_id=inquiry_id)) messages = Message.query.filter_by(inquiry_id=inquiry_id).order_by(Message.message_number).all() - return render_template('inquiry.html', inquiry=inquiry, messages=messages, is_admin=is_admin) - -@app.route('/admin', methods=['GET']) -def admin_login(): - return render_template('admin_login.html') - -@app.route('/admin', methods=['POST']) -@limiter.limit("1 per minute", deduct_when=lambda response: not is_admin()) -@limiter.limit("10 per hour") -def admin_login_post(): - username = request.form.get('username') - password = request.form.get('password') - - # Get admin user from database - admin = Admin.query.filter_by(username=username).first() - - # Verify credentials - if admin and admin.verify_password(password): - session['admin_authenticated'] = True - session['admin_username'] = admin.username - - # Redirect to next page if provided, otherwise to dashboard - next_page = request.args.get('next') - if next_page: - return redirect(next_page) - return redirect(url_for('admin_dashboard')) - else: - flash('Invalid username or password. Try again in 1 minute.', 'error') - - return redirect(url_for('admin_login')) - -@app.route('/admin/logout') -def admin_logout(): - session.pop('admin_authenticated', None) - session.pop('admin_username', None) - flash('You have been logged out') - return redirect(url_for('admin_login')) - -@app.route('/admin/dashboard') -@admin_required -def admin_dashboard(): - # Get all inquiries, ordered by the latest message timestamp - inquiries = Inquiry.query.all() - # For each inquiry, get the latest message timestamp - inquiries_with_data = [] - for inquiry in inquiries: - latest_message = Message.query.filter_by(inquiry_id=inquiry.id).order_by(Message.message_number.desc()).first() - message_count = Message.query.filter_by(inquiry_id=inquiry.id).count() - inquiries_with_data.append({ - 'inquiry': inquiry, - 'latest_message': latest_message, - 'message_count': message_count - }) - - # Sort by latest message timestamp (newest first) - inquiries_with_data.sort(key=lambda x: x['latest_message'].timestamp if x['latest_message'] else None, reverse=True) - - return render_template('admin_dashboard.html', inquiries_with_data=inquiries_with_data) - -@app.route('/admin/webhook', methods=['GET', 'POST']) -@admin_required -def admin_webhook(): - settings = Settings.query.first() - if not settings: - settings = Settings() - db.session.add(settings) - db.session.commit() - - if request.method == 'POST': - # Update webhook settings - settings.webhook_enabled = request.form.get('webhook_enabled') == 'on' - settings.webhook_url = request.form.get('webhook_url', '') - settings.webhook_secret = request.form.get('webhook_secret', '') - db.session.commit() - - # Test webhook if enabled and URL is provided - if settings.webhook_enabled and settings.webhook_url: - try: - webhooks.inquiry_created('1234abcd1234abcd', 'This is a test message') - flash('Webhook settings saved and test sent successfully') - except WebhookError as e: - flash(f'Webhook test failed: {str(e)}') - else: - flash('Webhook settings saved') - - return redirect(url_for('admin_webhook')) - - return render_template('admin_webhook.html', settings=settings) - -@app.route('/admin/settings', methods=['GET', 'POST']) -@admin_required -def admin_settings(): - admin = Admin.query.filter_by(username=session['admin_username']).first() - - if request.method == 'POST': - current_password = request.form.get('current_password') - new_password = request.form.get('new_password') - confirm_password = request.form.get('confirm_password') - - # Verify current password - if not admin.verify_password(current_password): - flash('Current password is incorrect', 'error') - return redirect(url_for('admin_settings')) - - # Validate new password - if new_password != confirm_password: - flash('New passwords do not match', 'error') - return redirect(url_for('admin_settings')) - - if len(new_password) < 8: - flash('Password must be at least 8 characters', 'error') - return redirect(url_for('admin_settings')) - - # Update password - admin.password_hash = Admin.hash_password(new_password) - db.session.commit() - - flash('Password updated successfully', 'success') - return redirect(url_for('admin_settings')) - - return render_template('admin_settings.html') + return render_template('inquiry.html', inquiry=inquiry, messages=messages, is_admin=is_admin_user) @app.route('/inquiry//delete', methods=['POST']) @limiter.limit("10 per hour", deduct_when=lambda response: response.status_code != 200) @@ -223,7 +96,7 @@ def delete_inquiry(inquiry_id): db.session.commit() # Check if user is admin and redirect accordingly - if 'admin_authenticated' in session and session['admin_authenticated']: + if is_admin(): flash('Inquiry deleted successfully') return redirect(url_for('admin_dashboard')) else: