Ofure Ikheloa c370de83d5 Refactor scraper and sender modules for improved Redis management and SSL connection handling
- Introduced RedisManager class in scraper.py for centralized Redis operations including job tracking and caching.
- Enhanced job scraping logic in MultiPlatformJobScraper to support multiple platforms (Ashby, Lever, Greenhouse).
- Updated browser initialization and context management to ensure better resource handling.
- Improved error handling and logging throughout the scraping process.
- Added SSL connection parameters management in a new ssl_connection.py module for RabbitMQ connections.
- Refactored sender.py to utilize RedisManager for job deduplication and improved logging mechanisms.
- Enhanced CSV processing logic in sender.py with better validation and error handling.
- Updated connection parameters for RabbitMQ to support SSL configurations based on environment variables.
2025-12-12 13:48:26 +01:00

349 lines
14 KiB
Python

import csv
import json
import logging
import os
import sys
import time
import signal
import uuid
from configparser import ConfigParser
import pika
import redis
import ssl
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
class RedisManager:
"""Manages Redis connection and operations for job deduplication."""
def __init__(self):
self.redis_host = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz')
self.redis_port = int(os.getenv('REDIS_PORT', '6380'))
self.redis_password = os.getenv('REDIS_PASSWORD')
self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
self.redis_client = None
self._connect()
def _connect(self):
if not self.redis_password:
print("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
password=self.redis_password,
ssl=self.redis_ssl_enabled,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
decode_responses=True
)
response = self.redis_client.ping()
print(f"Connected to Redis at {self.redis_host}:{self.redis_port}! Response: {response}")
except Exception as e:
print(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_url):
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"sent_job:{job_url}"))
except Exception:
return False
def mark_job_sent(self, job_url):
if not self.redis_client:
return
try:
self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
except Exception:
pass
class Sender:
def __init__(self, config_file='config.ini'):
self.config = ConfigParser()
self.config.read(config_file)
self.rabbitmq_host = os.getenv("RABBITMQ_HOST")
self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT") or 5672)
self.username = os.getenv("RABBITMQ_USER")
self.password = os.getenv("RABBITMQ_PASS")
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv"))
default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs")
os.makedirs(default_log_dir, exist_ok=True)
default_log_file = os.path.join(default_log_dir, "sender.log")
self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file)
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
self.batch_size = 500
self.retry_attempts = 5
self.retry_sleep = 2
self.check_interval = 30
self.use_ssl = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
if self.rabbitmq_port is None:
self.rabbitmq_port = "5671" if self.use_ssl else "5672"
else:
self.rabbitmq_port = int(self.rabbitmq_port)
self.redis_manager = RedisManager()
log_dir = os.path.dirname(self.log_file)
os.makedirs(log_dir, exist_ok=True)
self.logger = logging.getLogger('sender')
self.logger.setLevel(logging.INFO)
self.logger.handlers.clear()
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
self.connection = None
self.channel = None
self.running = True
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
def _create_ssl_options(self):
if not self.use_ssl:
return None
context = ssl.create_default_context()
verify_ssl = os.getenv("RABBITMQ_SSL_VERIFY", "false").lower() == "true"
if verify_ssl:
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return pika.SSLOptions(context, self.rabbitmq_host)
def connect(self):
try:
if not self.rabbitmq_host:
self.logger.error("RABBITMQ_HOST environment variable is not set")
return False
if not self.username:
self.logger.error("RABBITMQ_USER environment variable is not set")
return False
if not self.password:
self.logger.error("RABBITMQ_PASS environment variable is not set")
return False
self.logger.info(f"Attempting to connect with host={self.rabbitmq_host}, port={self.rabbitmq_port}, user={self.username}")
credentials = pika.PlainCredentials(self.username, self.password)
params = {
'host': self.rabbitmq_host,
'port': self.rabbitmq_port,
'virtual_host': self.virtual_host,
'credentials': credentials,
'heartbeat': 600,
'blocked_connection_timeout': 300
}
if self.use_ssl:
params['ssl_options'] = self._create_ssl_options()
self.logger.info(f"Connecting to RabbitMQ over SSL at {self.rabbitmq_host}:{self.rabbitmq_port}")
else:
self.logger.info(f"Connecting to RabbitMQ at {self.rabbitmq_host}:{self.rabbitmq_port}")
parameters = pika.ConnectionParameters(**params)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.logger.info("Connected to RabbitMQ successfully")
return True
except Exception as e:
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
return False
def reconnect(self):
if self.connection and self.connection.is_open:
self.connection.close()
time.sleep(self.retry_sleep)
return self.connect()
def send_message(self, message, message_id):
for attempt in range(self.retry_attempts):
try:
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
message_id=message_id
)
)
return True
except Exception as e:
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_sleep * (2 ** attempt))
if not self.reconnect():
return False
return False
def is_job_seen(self, job_url):
return self.redis_manager.is_job_seen(job_url)
def mark_job_sent(self, job_url):
self.redis_manager.mark_job_sent(job_url)
def process_csv(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
sent_count = 0
skipped_count = 0
self.logger.info(f"CSV headers found: {reader.fieldnames}")
for row_num, row in enumerate(reader, start=1):
# ✅ IMMEDIATE EXIT CHECK
if not self.running:
self.logger.info("Shutdown requested during CSV processing. Exiting...")
return sent_count
if 'url' not in row or 'company' not in row:
self.logger.warning(f"Skipping row {row_num}: missing 'url' or 'company' field. Row: {row}")
skipped_count += 1
continue
url = row['url'].strip()
company = row['company'].strip()
if not url:
self.logger.warning(f"Skipping row {row_num}: empty URL. Company: '{company}'")
skipped_count += 1
continue
if not company:
self.logger.warning(f"Skipping row {row_num}: empty company. URL: {url}")
skipped_count += 1
continue
if not url.startswith(('http://', 'https://')):
self.logger.warning(f"Skipping row {row_num}: invalid URL format. URL: {url}")
skipped_count += 1
continue
if self.is_job_seen(url):
self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}")
skipped_count += 1
continue
job_data = {'job_link': url, 'company_name': company}
message_id = str(uuid.uuid4())
message = json.dumps(job_data)
if self.send_message(message, message_id):
sent_count += 1
self.mark_job_sent(url)
else:
self.logger.error(f"Failed to send job (row {row_num}): {url}")
skipped_count += 1
if (sent_count + skipped_count) % 100 == 0:
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}")
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
try:
os.rename(file_path, file_path + '.processed')
self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
except Exception as rename_error:
self.logger.error(f"Failed to rename {file_path}: {str(rename_error)}")
marker_file = file_path + '.processed_marker'
with open(marker_file, 'w') as f:
f.write(f"Processed at {datetime.now().isoformat()}")
self.logger.info(f"Created marker file: {marker_file}")
return sent_count
except Exception as e:
self.logger.error(f"Error processing {file_path}: {str(e)}")
return 0
def find_new_csvs(self):
if not self.running: # ✅ IMMEDIATE EXIT CHECK
return []
if not os.path.exists(self.directory):
return []
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
files.sort()
return [os.path.join(self.directory, f) for f in files]
def run(self):
if not self.connect():
self.logger.error("RabbitMQ connection failed, exiting")
sys.exit(1)
try:
while self.running:
new_files = self.find_new_csvs()
if new_files:
for file_path in new_files:
if not self.running: # ✅ IMMEDIATE EXIT CHECK
break
self.logger.info(f"Processing {file_path}")
sent = self.process_csv(file_path)
self.logger.info(f"Sent {sent} jobs from {file_path}")
else:
self.logger.info("No new CSV files found")
# Replace blocking sleep with interruptible sleep
for _ in range(self.check_interval):
if not self.running:
break
time.sleep(1)
except KeyboardInterrupt:
# This should not normally be reached due to signal handler, but added for safety
pass
finally:
if self.connection and self.connection.is_open:
self.logger.info("Closing RabbitMQ connection...")
self.connection.close()
def graceful_shutdown(self, signum, frame):
self.logger.info("Received shutdown signal. Initiating graceful shutdown...")
self.running = False # This will break all loops
if __name__ == '__main__':
required_vars = ['RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_USER', 'RABBITMQ_PASS']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"Missing environment variables: {missing_vars}")
print("Check your .env file and ensure load_dotenv() is working")
sys.exit(1)
sender = Sender()
print(f"Using directory: {sender.directory}")
print(f"Directory exists: {os.path.exists(sender.directory)}")
if os.path.exists(sender.directory):
print(f"Files: {os.listdir(sender.directory)}")
try:
sender.run()
except KeyboardInterrupt:
# Fallback in case signal handler doesn't catch it
sender.logger.info("KeyboardInterrupt caught in main. Exiting.")
sys.exit(0)