- Update fetch timeout in StealthyFetcher for improved reliability. - Refactor LLMJobRefiner to create and manage Quelah Jobs table in PostgreSQL. - Modify RedisManager to track sent job counts for jobs.csv and adjust deduplication logic. - Implement job URL-based deduplication across scraper and sender.
376 lines
14 KiB
Python
376 lines
14 KiB
Python
|
|
import csv
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import signal
|
|
import uuid
|
|
from configparser import ConfigParser
|
|
import pika
|
|
import redis
|
|
import ssl
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class RedisManager:
|
|
"""Manages Redis connection and operations for job deduplication."""
|
|
|
|
def __init__(self):
|
|
self.redis_host = os.getenv('REDIS_HOST')
|
|
self.redis_port = int(os.getenv('REDIS_PORT', '6380'))
|
|
self.redis_password = os.getenv('REDIS_PASSWORD')
|
|
self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
|
|
self.redis_client = None
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
if not self.redis_password:
|
|
print("Warning: REDIS_PASSWORD not found in environment.")
|
|
|
|
try:
|
|
self.redis_client = redis.Redis(
|
|
host=self.redis_host,
|
|
port=self.redis_port,
|
|
password=self.redis_password,
|
|
ssl=self.redis_ssl_enabled,
|
|
ssl_cert_reqs=None,
|
|
socket_connect_timeout=10,
|
|
socket_timeout=30,
|
|
decode_responses=True
|
|
)
|
|
response = self.redis_client.ping()
|
|
print(f"Connected to Redis at {self.redis_host}:{self.redis_port}! Response: {response}")
|
|
except Exception as e:
|
|
print(f"Failed to connect to Redis: {e}")
|
|
self.redis_client = None
|
|
|
|
def is_job_seen(self, job_url):
|
|
if not self.redis_client:
|
|
return False
|
|
try:
|
|
return bool(self.redis_client.exists(f"sent_job:{job_url}"))
|
|
except Exception:
|
|
return False
|
|
|
|
def mark_job_sent(self, job_url):
|
|
if not self.redis_client:
|
|
return
|
|
try:
|
|
self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
|
|
except Exception:
|
|
pass
|
|
|
|
# NEW: Track total sent jobs for jobs.csv
|
|
def get_jobs_csv_sent_count(self):
|
|
if not self.redis_client:
|
|
return 0
|
|
try:
|
|
count = self.redis_client.get("jobs_csv_sent_count")
|
|
return int(count) if count else 0
|
|
except Exception:
|
|
return 0
|
|
|
|
def increment_jobs_csv_sent_count(self):
|
|
if not self.redis_client:
|
|
return
|
|
try:
|
|
self.redis_client.incr("jobs_csv_sent_count")
|
|
# Set 30-day expiry to avoid stale data
|
|
self.redis_client.expire("jobs_csv_sent_count", 2592000)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class Sender:
|
|
def __init__(self, config_file='config.ini'):
|
|
self.config = ConfigParser()
|
|
self.config.read(config_file)
|
|
|
|
self.rabbitmq_host = os.getenv("RABBITMQ_HOST")
|
|
self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT") or 5672)
|
|
self.username = os.getenv("RABBITMQ_USER")
|
|
self.password = os.getenv("RABBITMQ_PASS")
|
|
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
|
|
self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv"))
|
|
|
|
default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs")
|
|
os.makedirs(default_log_dir, exist_ok=True)
|
|
default_log_file = os.path.join(default_log_dir, "sender.log")
|
|
self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file)
|
|
|
|
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
|
|
self.batch_size = 500
|
|
self.retry_attempts = 5
|
|
self.retry_sleep = 2
|
|
self.check_interval = 30
|
|
|
|
self.use_ssl = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
|
|
if self.rabbitmq_port is None:
|
|
self.rabbitmq_port = "5671" if self.use_ssl else "5672"
|
|
else:
|
|
self.rabbitmq_port = int(self.rabbitmq_port)
|
|
|
|
self.redis_manager = RedisManager()
|
|
|
|
log_dir = os.path.dirname(self.log_file)
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
self.logger = logging.getLogger('sender')
|
|
self.logger.setLevel(logging.INFO)
|
|
self.logger.handlers.clear()
|
|
|
|
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
|
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
self.logger.addHandler(file_handler)
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
self.logger.addHandler(console_handler)
|
|
|
|
self.connection = None
|
|
self.channel = None
|
|
self.running = True
|
|
|
|
signal.signal(signal.SIGTERM, self.graceful_shutdown)
|
|
signal.signal(signal.SIGINT, self.graceful_shutdown)
|
|
|
|
def _create_ssl_options(self):
|
|
if not self.use_ssl:
|
|
return None
|
|
context = ssl.create_default_context()
|
|
verify_ssl = os.getenv("RABBITMQ_SSL_VERIFY", "false").lower() == "true"
|
|
if verify_ssl:
|
|
context.check_hostname = True
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
else:
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
return pika.SSLOptions(context, self.rabbitmq_host)
|
|
|
|
def connect(self):
|
|
try:
|
|
if not self.rabbitmq_host:
|
|
self.logger.error("RABBITMQ_HOST environment variable is not set")
|
|
return False
|
|
if not self.username:
|
|
self.logger.error("RABBITMQ_USER environment variable is not set")
|
|
return False
|
|
if not self.password:
|
|
self.logger.error("RABBITMQ_PASS environment variable is not set")
|
|
return False
|
|
|
|
self.logger.info(f"Attempting to connect with host={self.rabbitmq_host}, port={self.rabbitmq_port}, user={self.username}")
|
|
|
|
credentials = pika.PlainCredentials(self.username, self.password)
|
|
params = {
|
|
'host': self.rabbitmq_host,
|
|
'port': self.rabbitmq_port,
|
|
'virtual_host': self.virtual_host,
|
|
'credentials': credentials,
|
|
'heartbeat': 600,
|
|
'blocked_connection_timeout': 300
|
|
}
|
|
|
|
if self.use_ssl:
|
|
params['ssl_options'] = self._create_ssl_options()
|
|
self.logger.info(f"Connecting to RabbitMQ over SSL at {self.rabbitmq_host}:{self.rabbitmq_port}")
|
|
else:
|
|
self.logger.info(f"Connecting to RabbitMQ at {self.rabbitmq_host}:{self.rabbitmq_port}")
|
|
|
|
parameters = pika.ConnectionParameters(**params)
|
|
self.connection = pika.BlockingConnection(parameters)
|
|
self.channel = self.connection.channel()
|
|
self.channel.queue_declare(queue=self.queue_name, durable=True)
|
|
self.logger.info("Connected to RabbitMQ successfully")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
|
|
return False
|
|
|
|
def reconnect(self):
|
|
if self.connection and self.connection.is_open:
|
|
self.connection.close()
|
|
time.sleep(self.retry_sleep)
|
|
return self.connect()
|
|
|
|
def send_message(self, message, message_id):
|
|
for attempt in range(self.retry_attempts):
|
|
try:
|
|
self.channel.basic_publish(
|
|
exchange='',
|
|
routing_key=self.queue_name,
|
|
body=message,
|
|
properties=pika.BasicProperties(
|
|
delivery_mode=2,
|
|
message_id=message_id
|
|
)
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
|
|
if attempt < self.retry_attempts - 1:
|
|
time.sleep(self.retry_sleep * (2 ** attempt))
|
|
if not self.reconnect():
|
|
return False
|
|
return False
|
|
|
|
def is_job_seen(self, job_url, filename):
|
|
"""Custom dedup logic: disable for jobs.csv until 6000 sent"""
|
|
if filename == "jobs.csv":
|
|
sent_count = self.redis_manager.get_jobs_csv_sent_count()
|
|
if sent_count < 6000:
|
|
return False # Always resend
|
|
return self.redis_manager.is_job_seen(job_url)
|
|
|
|
def mark_job_sent(self, job_url, filename):
|
|
self.redis_manager.mark_job_sent(job_url)
|
|
if filename == "jobs.csv":
|
|
self.redis_manager.increment_jobs_csv_sent_count()
|
|
|
|
def process_csv(self, file_path):
|
|
filename = os.path.basename(file_path)
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
sent_count = 0
|
|
skipped_count = 0
|
|
|
|
self.logger.info(f"CSV headers found: {reader.fieldnames}")
|
|
|
|
for row_num, row in enumerate(reader, start=1):
|
|
if not self.running:
|
|
self.logger.info("Shutdown requested during CSV processing. Exiting...")
|
|
return sent_count
|
|
|
|
if 'url' not in row or 'company' not in row:
|
|
self.logger.warning(f"Skipping row {row_num}: missing 'url' or 'company' field. Row: {row}")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
url = row['url'].strip()
|
|
company = row['company'].strip()
|
|
|
|
if not url:
|
|
self.logger.warning(f"Skipping row {row_num}: empty URL. Company: '{company}'")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
if not company:
|
|
self.logger.warning(f"Skipping row {row_num}: empty company. URL: {url}")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
if not url.startswith(('http://', 'https://')):
|
|
self.logger.warning(f"Skipping row {row_num}: invalid URL format. URL: {url}")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# ✅ Modified: Pass filename to is_job_seen
|
|
if self.is_job_seen(url, filename):
|
|
self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
job_data = {'job_link': url, 'company_name': company}
|
|
message_id = str(uuid.uuid4())
|
|
message = json.dumps(job_data)
|
|
|
|
if self.send_message(message, message_id):
|
|
sent_count += 1
|
|
# ✅ Modified: Pass filename to mark_job_sent
|
|
self.mark_job_sent(url, filename)
|
|
else:
|
|
self.logger.error(f"Failed to send job (row {row_num}): {url}")
|
|
skipped_count += 1
|
|
|
|
if (sent_count + skipped_count) % 100 == 0:
|
|
current_total = self.redis_manager.get_jobs_csv_sent_count() if filename == "jobs.csv" else "N/A"
|
|
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path} (jobs.csv total: {current_total})")
|
|
|
|
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
|
|
|
|
try:
|
|
os.rename(file_path, file_path + '.processed')
|
|
self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
|
|
except Exception as rename_error:
|
|
self.logger.error(f"Failed to rename {file_path}: {str(rename_error)}")
|
|
marker_file = file_path + '.processed_marker'
|
|
with open(marker_file, 'w') as f:
|
|
f.write(f"Processed at {datetime.now().isoformat()}")
|
|
self.logger.info(f"Created marker file: {marker_file}")
|
|
|
|
return sent_count
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing {file_path}: {str(e)}")
|
|
return 0
|
|
|
|
def find_new_csvs(self):
|
|
if not self.running:
|
|
return []
|
|
if not os.path.exists(self.directory):
|
|
return []
|
|
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
|
|
files.sort()
|
|
return [os.path.join(self.directory, f) for f in files]
|
|
|
|
def run(self):
|
|
if not self.connect():
|
|
self.logger.error("RabbitMQ connection failed, exiting")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
while self.running:
|
|
new_files = self.find_new_csvs()
|
|
if new_files:
|
|
for file_path in new_files:
|
|
if not self.running:
|
|
break
|
|
self.logger.info(f"Processing {file_path}")
|
|
sent = self.process_csv(file_path)
|
|
self.logger.info(f"Sent {sent} jobs from {file_path}")
|
|
else:
|
|
self.logger.info("No new CSV files found")
|
|
|
|
for _ in range(self.check_interval):
|
|
if not self.running:
|
|
break
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if self.connection and self.connection.is_open:
|
|
self.logger.info("Closing RabbitMQ connection...")
|
|
self.connection.close()
|
|
|
|
def graceful_shutdown(self, signum, frame):
|
|
self.logger.info("Received shutdown signal. Initiating graceful shutdown...")
|
|
self.running = False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
required_vars = ['RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_USER', 'RABBITMQ_PASS']
|
|
missing_vars = [var for var in required_vars if not os.getenv(var)]
|
|
|
|
if missing_vars:
|
|
print(f"Missing environment variables: {missing_vars}")
|
|
print("Check your .env file and ensure load_dotenv() is working")
|
|
sys.exit(1)
|
|
|
|
sender = Sender()
|
|
print(f"Using directory: {sender.directory}")
|
|
print(f"Directory exists: {os.path.exists(sender.directory)}")
|
|
if os.path.exists(sender.directory):
|
|
print(f"Files: {os.listdir(sender.directory)}")
|
|
|
|
try:
|
|
sender.run()
|
|
except KeyboardInterrupt:
|
|
sender.logger.info("KeyboardInterrupt caught in main. Exiting.")
|
|
sys.exit(0) |