import csv import json import logging import os import sys import time import signal import uuid from configparser import ConfigParser import pika import redis class Sender: def __init__(self, config_file='config.ini'): self.config = ConfigParser() self.config.read(config_file) # RabbitMQ from env vars with fallbacks self.rabbitmq_host = os.getenv("RABBITMQ_HOST") self.rabbitmq_port = os.getenv("RABBITMQ_PORT") self.username = os.getenv("RABBITMQ_USER") self.password = os.getenv("RABBITMQ_PASS") self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue') self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv")) # Cross-platform log path: use user's home directory default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs") os.makedirs(default_log_dir, exist_ok=True) default_log_file = os.path.join(default_log_dir, "sender.log") self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file) self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/') self.batch_size = 500 self.retry_attempts = 5 # Increased for robustness self.retry_sleep = 2 self.check_interval = 30 # More frequent polling # Redis for deduplication redis_host = os.getenv("REDIS_HOST") redis_port = os.getenv("REDIS_PORT") self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=1, decode_responses=True) # Ensure log directory exists before configuring logging log_dir = os.path.dirname(self.log_file) os.makedirs(log_dir, exist_ok=True) logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) self.connection = None self.channel = None self.running = True signal.signal(signal.SIGTERM, self.graceful_shutdown) signal.signal(signal.SIGINT, self.graceful_shutdown) def connect(self): try: credentials = pika.PlainCredentials(self.username, self.password) parameters = pika.ConnectionParameters( host=self.rabbitmq_host, port=self.rabbitmq_port, virtual_host=self.virtual_host, credentials=credentials, heartbeat=600, blocked_connection_timeout=300 ) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue_name, durable=True) self.logger.info("Connected to RabbitMQ") return True except Exception as e: self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}") return False def reconnect(self): if self.connection and self.connection.is_open: self.connection.close() time.sleep(self.retry_sleep) return self.connect() def send_message(self, message, message_id): for attempt in range(self.retry_attempts): try: self.channel.basic_publish( exchange='', routing_key=self.queue_name, body=message, properties=pika.BasicProperties( delivery_mode=2, message_id=message_id ) ) return True except Exception as e: self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}") if attempt < self.retry_attempts - 1: time.sleep(self.retry_sleep * (2 ** attempt)) # Exponential backoff if not self.reconnect(): return False return False def is_job_seen(self, job_url): """Check if job was sent recently (7 days)""" return self.redis_client.get(f"sent_job:{job_url}") is not None def mark_job_sent(self, job_url): """Mark job as sent with 7-day TTL""" self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1") def process_csv(self, file_path): try: with open(file_path, 'r') as csvfile: reader = csv.DictReader(csvfile) batch = [] sent_count = 0 skipped_count = 0 for row in reader: # Validate required fields if 'job_link' not in row or 'company_name' not in row: self.logger.warning(f"Skipping invalid row in {file_path}: {row}") continue job_link = row['job_link'].strip() company_name = row['company_name'].strip() if not job_link or not company_name: self.logger.warning(f"Skipping empty row in {file_path}") continue # Deduplication if self.is_job_seen(job_link): skipped_count += 1 continue job_data = { 'job_link': job_link, 'company_name': company_name } message_id = str(uuid.uuid4()) message = json.dumps(job_data) if self.send_message(message, message_id): sent_count += 1 self.mark_job_sent(job_link) else: self.logger.error(f"Failed to send job: {job_link}") if (sent_count + skipped_count) % 100 == 0: self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}") self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped") os.rename(file_path, file_path + '.processed') self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed") return sent_count except Exception as e: self.logger.error(f"Error processing {file_path}: {str(e)}") return 0 def find_new_csvs(self): files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')] files.sort() return [os.path.join(self.directory, f) for f in files] def run(self): if not self.connect(): sys.exit(1) while self.running: new_files = self.find_new_csvs() if new_files: for file_path in new_files: self.logger.info(f"Processing {file_path}") sent = self.process_csv(file_path) self.logger.info(f"Sent {sent} jobs from {file_path}") else: self.logger.info("No new CSV files found") time.sleep(self.check_interval) if self.connection and self.connection.is_open: self.connection.close() def graceful_shutdown(self, signum, frame): self.logger.info("Received shutdown signal") self.running = False if __name__ == '__main__': sender = Sender() sender.run()