181 lines
7.3 KiB
Python

import csv
import json
import logging
import os
import sys
import time
import signal
import uuid
from configparser import ConfigParser
import pika
import redis
import os
class Sender:
def __init__(self, config_file='config.ini'):
self.config = ConfigParser()
self.config.read(config_file)
# RabbitMQ from env vars with fallbacks
self.rabbitmq_host = os.getenv("RABBITMQ_HOST", self.config.get('rabbitmq', 'url', fallback='rabbitq.thejobhub.xyz'))
self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT", self.config.get('rabbitmq', 'port', fallback='5672')))
self.username = os.getenv("RABBITMQ_USER", self.config.get('rabbitmq', 'username', fallback='guest'))
self.password = os.getenv("RABBITMQ_PASS", self.config.get('rabbitmq', 'password', fallback='guest'))
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
self.directory = self.config.get('files', 'directory', fallback='/var/jobs/csv')
self.log_file = self.config.get('logging', 'log_file', fallback='/var/logs/sender.log')
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
self.batch_size = 500
self.retry_attempts = 5 # Increased for robustness
self.retry_sleep = 2
self.check_interval = 30 # More frequent polling
# Redis for deduplication
redis_host = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz")
redis_port = int(os.getenv("REDIS_PORT", "6379"))
self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=1, decode_responses=True)
logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
self.connection = None
self.channel = None
self.running = True
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
def connect(self):
try:
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.rabbitmq_host,
port=self.rabbitmq_port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.logger.info("Connected to RabbitMQ")
return True
except Exception as e:
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
return False
def reconnect(self):
if self.connection and self.connection.is_open:
self.connection.close()
time.sleep(self.retry_sleep)
return self.connect()
def send_message(self, message, message_id):
for attempt in range(self.retry_attempts):
try:
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
message_id=message_id
)
)
return True
except Exception as e:
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_sleep * (2 ** attempt)) # Exponential backoff
if not self.reconnect():
return False
return False
def is_job_seen(self, job_url):
"""Check if job was sent recently (7 days)"""
return self.redis_client.get(f"sent_job:{job_url}") is not None
def mark_job_sent(self, job_url):
"""Mark job as sent with 7-day TTL"""
self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
def process_csv(self, file_path):
try:
with open(file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
batch = []
sent_count = 0
skipped_count = 0
for row in reader:
# Validate required fields
if 'job_link' not in row or 'company_name' not in row:
self.logger.warning(f"Skipping invalid row in {file_path}: {row}")
continue
job_link = row['job_link'].strip()
company_name = row['company_name'].strip()
if not job_link or not company_name:
self.logger.warning(f"Skipping empty row in {file_path}")
continue
# Deduplication
if self.is_job_seen(job_link):
skipped_count += 1
continue
job_data = {
'job_link': job_link,
'company_name': company_name
}
message_id = str(uuid.uuid4())
message = json.dumps(job_data)
if self.send_message(message, message_id):
sent_count += 1
self.mark_job_sent(job_link)
else:
self.logger.error(f"Failed to send job: {job_link}")
if (sent_count + skipped_count) % 100 == 0:
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}")
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
os.rename(file_path, file_path + '.processed')
self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
return sent_count
except Exception as e:
self.logger.error(f"Error processing {file_path}: {str(e)}")
return 0
def find_new_csvs(self):
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
files.sort()
return [os.path.join(self.directory, f) for f in files]
def run(self):
if not self.connect():
sys.exit(1)
while self.running:
new_files = self.find_new_csvs()
if new_files:
for file_path in new_files:
self.logger.info(f"Processing {file_path}")
sent = self.process_csv(file_path)
self.logger.info(f"Sent {sent} jobs from {file_path}")
else:
self.logger.info("No new CSV files found")
time.sleep(self.check_interval)
if self.connection and self.connection.is_open:
self.connection.close()
def graceful_shutdown(self, signum, frame):
self.logger.info("Received shutdown signal")
self.running = False
if __name__ == '__main__':
sender = Sender()
sender.run()