import pika import ssl import os from dotenv import load_dotenv # Load environment variables load_dotenv() def create_ssl_connection_parameters(): """ Create and return RabbitMQ connection parameters with SSL configuration. This function handles both SSL and non-SSL connections based on environment variables. """ # Load environment variables with fallbacks rabbitmq_host = os.getenv('RABBITMQ_HOST') rabbitmq_port = int(os.getenv('RABBITMQ_PORT', '5671')) rabbitmq_user = os.getenv('RABBITMQ_USER') rabbitmq_pass = os.getenv('RABBITMQ_PASS', 'ofure-scrape') rabbitmq_ssl_enabled = os.getenv('RABBITMQ_SSL_ENABLED', 'true').lower() == 'true' rabbitmq_ssl_verify = os.getenv('RABBITMQ_SSL_VERIFY', 'false').lower() == 'true' # Validate credentials if not rabbitmq_pass or rabbitmq_pass == 'YOUR_STRONG_PASSWORD': print("Warning: Using placeholder or empty password. Please check .env file.") credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass) if rabbitmq_ssl_enabled: # SSL Context context = ssl.create_default_context() context.check_hostname = rabbitmq_ssl_verify context.verify_mode = ssl.CERT_REQUIRED if rabbitmq_ssl_verify else ssl.CERT_NONE ssl_options = pika.SSLOptions(context, rabbitmq_host) params = pika.ConnectionParameters( host=rabbitmq_host, port=rabbitmq_port, credentials=credentials, ssl_options=ssl_options, heartbeat=600, blocked_connection_timeout=300, virtual_host='/' ) else: # Non-SSL connection params = pika.ConnectionParameters( host=rabbitmq_host, port=rabbitmq_port if rabbitmq_port != 5671 else 5672, # Default non-SSL port credentials=credentials, heartbeat=600, blocked_connection_timeout=300, virtual_host='/' ) return params def test_connection(): """ Test function to verify RabbitMQ connection (original functionality preserved). """ try: params = create_ssl_connection_parameters() connection = pika.BlockingConnection(params) channel = connection.channel() print("Connected to Secure RabbitMQ!") connection.close() return True except Exception as e: import traceback print(f"Failed to connect: {e!r}") traceback.print_exc() return False # Keep the original test functionality when run directly if __name__ == "__main__": test_connection()