import csv import json import logging import os import sys import time import signal import uuid from configparser import ConfigParser import pika import redis import ssl from dotenv import load_dotenv from datetime import datetime load_dotenv() class RedisManager: """Manages Redis connection and operations for job deduplication.""" def __init__(self): self.redis_host = os.getenv('REDIS_HOST') self.redis_port = int(os.getenv('REDIS_PORT', '6380')) self.redis_password = os.getenv('REDIS_PASSWORD') self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' self.redis_client = None self._connect() def _connect(self): if not self.redis_password: print("Warning: REDIS_PASSWORD not found in environment.") try: self.redis_client = redis.Redis( host=self.redis_host, port=self.redis_port, password=self.redis_password, ssl=self.redis_ssl_enabled, ssl_cert_reqs=None, socket_connect_timeout=10, socket_timeout=30, decode_responses=True ) response = self.redis_client.ping() print(f"Connected to Redis at {self.redis_host}:{self.redis_port}! Response: {response}") except Exception as e: print(f"Failed to connect to Redis: {e}") self.redis_client = None def is_job_seen(self, job_url): if not self.redis_client: return False try: return bool(self.redis_client.exists(f"sent_job:{job_url}")) except Exception: return False def mark_job_sent(self, job_url): if not self.redis_client: return try: self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1") except Exception: pass # NEW: Track total sent jobs for jobs.csv def get_jobs_csv_sent_count(self): if not self.redis_client: return 0 try: count = self.redis_client.get("jobs_csv_sent_count") return int(count) if count else 0 except Exception: return 0 def increment_jobs_csv_sent_count(self): if not self.redis_client: return try: self.redis_client.incr("jobs_csv_sent_count") # Set 30-day expiry to avoid stale data self.redis_client.expire("jobs_csv_sent_count", 2592000) except Exception: pass class Sender: def __init__(self, config_file='config.ini'): self.config = ConfigParser() self.config.read(config_file) self.rabbitmq_host = os.getenv("RABBITMQ_HOST") self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT") or 5672) self.username = os.getenv("RABBITMQ_USER") self.password = os.getenv("RABBITMQ_PASS") self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue') self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv")) default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs") os.makedirs(default_log_dir, exist_ok=True) default_log_file = os.path.join(default_log_dir, "sender.log") self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file) self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/') self.batch_size = 500 self.retry_attempts = 5 self.retry_sleep = 2 self.check_interval = 30 self.use_ssl = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true" if self.rabbitmq_port is None: self.rabbitmq_port = "5671" if self.use_ssl else "5672" else: self.rabbitmq_port = int(self.rabbitmq_port) self.redis_manager = RedisManager() log_dir = os.path.dirname(self.log_file) os.makedirs(log_dir, exist_ok=True) self.logger = logging.getLogger('sender') self.logger.setLevel(logging.INFO) self.logger.handlers.clear() file_handler = logging.FileHandler(self.log_file, encoding='utf-8') file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) self.logger.addHandler(file_handler) console_handler = logging.StreamHandler(sys.stdout) console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) self.connection = None self.channel = None self.running = True signal.signal(signal.SIGTERM, self.graceful_shutdown) signal.signal(signal.SIGINT, self.graceful_shutdown) def _create_ssl_options(self): if not self.use_ssl: return None context = ssl.create_default_context() verify_ssl = os.getenv("RABBITMQ_SSL_VERIFY", "false").lower() == "true" if verify_ssl: context.check_hostname = True context.verify_mode = ssl.CERT_REQUIRED else: context.check_hostname = False context.verify_mode = ssl.CERT_NONE return pika.SSLOptions(context, self.rabbitmq_host) def connect(self): try: if not self.rabbitmq_host: self.logger.error("RABBITMQ_HOST environment variable is not set") return False if not self.username: self.logger.error("RABBITMQ_USER environment variable is not set") return False if not self.password: self.logger.error("RABBITMQ_PASS environment variable is not set") return False self.logger.info(f"Attempting to connect with host={self.rabbitmq_host}, port={self.rabbitmq_port}, user={self.username}") credentials = pika.PlainCredentials(self.username, self.password) params = { 'host': self.rabbitmq_host, 'port': self.rabbitmq_port, 'virtual_host': self.virtual_host, 'credentials': credentials, 'heartbeat': 600, 'blocked_connection_timeout': 300 } if self.use_ssl: params['ssl_options'] = self._create_ssl_options() self.logger.info(f"Connecting to RabbitMQ over SSL at {self.rabbitmq_host}:{self.rabbitmq_port}") else: self.logger.info(f"Connecting to RabbitMQ at {self.rabbitmq_host}:{self.rabbitmq_port}") parameters = pika.ConnectionParameters(**params) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue_name, durable=True) self.logger.info("Connected to RabbitMQ successfully") return True except Exception as e: self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}") return False def reconnect(self): if self.connection and self.connection.is_open: self.connection.close() time.sleep(self.retry_sleep) return self.connect() def send_message(self, message, message_id): for attempt in range(self.retry_attempts): try: self.channel.basic_publish( exchange='', routing_key=self.queue_name, body=message, properties=pika.BasicProperties( delivery_mode=2, message_id=message_id ) ) return True except Exception as e: self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}") if attempt < self.retry_attempts - 1: time.sleep(self.retry_sleep * (2 ** attempt)) if not self.reconnect(): return False return False def is_job_seen(self, job_url, filename): """Custom dedup logic: disable for jobs.csv until 6000 sent""" if filename == "jobs.csv": sent_count = self.redis_manager.get_jobs_csv_sent_count() if sent_count < 6000: return False # Always resend return self.redis_manager.is_job_seen(job_url) def mark_job_sent(self, job_url, filename): self.redis_manager.mark_job_sent(job_url) if filename == "jobs.csv": self.redis_manager.increment_jobs_csv_sent_count() def process_csv(self, file_path): filename = os.path.basename(file_path) try: with open(file_path, 'r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) sent_count = 0 skipped_count = 0 self.logger.info(f"CSV headers found: {reader.fieldnames}") for row_num, row in enumerate(reader, start=1): if not self.running: self.logger.info("Shutdown requested during CSV processing. Exiting...") return sent_count if 'url' not in row or 'company' not in row: self.logger.warning(f"Skipping row {row_num}: missing 'url' or 'company' field. Row: {row}") skipped_count += 1 continue url = row['url'].strip() company = row['company'].strip() if not url: self.logger.warning(f"Skipping row {row_num}: empty URL. Company: '{company}'") skipped_count += 1 continue if not company: self.logger.warning(f"Skipping row {row_num}: empty company. URL: {url}") skipped_count += 1 continue if not url.startswith(('http://', 'https://')): self.logger.warning(f"Skipping row {row_num}: invalid URL format. URL: {url}") skipped_count += 1 continue # ✅ Modified: Pass filename to is_job_seen if self.is_job_seen(url, filename): self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}") skipped_count += 1 continue job_data = {'job_link': url, 'company_name': company} message_id = str(uuid.uuid4()) message = json.dumps(job_data) if self.send_message(message, message_id): sent_count += 1 # ✅ Modified: Pass filename to mark_job_sent self.mark_job_sent(url, filename) else: self.logger.error(f"Failed to send job (row {row_num}): {url}") skipped_count += 1 if (sent_count + skipped_count) % 100 == 0: current_total = self.redis_manager.get_jobs_csv_sent_count() if filename == "jobs.csv" else "N/A" self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path} (jobs.csv total: {current_total})") self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped") try: os.rename(file_path, file_path + '.processed') self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed") except Exception as rename_error: self.logger.error(f"Failed to rename {file_path}: {str(rename_error)}") marker_file = file_path + '.processed_marker' with open(marker_file, 'w') as f: f.write(f"Processed at {datetime.now().isoformat()}") self.logger.info(f"Created marker file: {marker_file}") return sent_count except Exception as e: self.logger.error(f"Error processing {file_path}: {str(e)}") return 0 def find_new_csvs(self): if not self.running: return [] if not os.path.exists(self.directory): return [] files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')] files.sort() return [os.path.join(self.directory, f) for f in files] def run(self): if not self.connect(): self.logger.error("RabbitMQ connection failed, exiting") sys.exit(1) try: while self.running: new_files = self.find_new_csvs() if new_files: for file_path in new_files: if not self.running: break self.logger.info(f"Processing {file_path}") sent = self.process_csv(file_path) self.logger.info(f"Sent {sent} jobs from {file_path}") else: self.logger.info("No new CSV files found") for _ in range(self.check_interval): if not self.running: break time.sleep(1) except KeyboardInterrupt: pass finally: if self.connection and self.connection.is_open: self.logger.info("Closing RabbitMQ connection...") self.connection.close() def graceful_shutdown(self, signum, frame): self.logger.info("Received shutdown signal. Initiating graceful shutdown...") self.running = False if __name__ == '__main__': required_vars = ['RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_USER', 'RABBITMQ_PASS'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: print(f"Missing environment variables: {missing_vars}") print("Check your .env file and ensure load_dotenv() is working") sys.exit(1) sender = Sender() print(f"Using directory: {sender.directory}") print(f"Directory exists: {os.path.exists(sender.directory)}") if os.path.exists(sender.directory): print(f"Files: {os.listdir(sender.directory)}") try: sender.run() except KeyboardInterrupt: sender.logger.info("KeyboardInterrupt caught in main. Exiting.") sys.exit(0)