From 762846cb4aa3a9d1a4ac8b7577e4ba065c110e11 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Wed, 10 Dec 2025 12:02:43 +0100 Subject: [PATCH] Add AshbyJobScraper and Sender classes for job scraping and message sending; implement Redis caching and RabbitMQ integration. --- scraper.py | 339 +++++++++++++++++++++++++++++++++++++++++++++++++++++ sender.py | 181 ++++++++++++++++++++++++++++ 2 files changed, 520 insertions(+) create mode 100644 scraper.py create mode 100644 sender.py diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..9a98cc8 --- /dev/null +++ b/scraper.py @@ -0,0 +1,339 @@ + +import asyncio +import random +import os +import json +import time +from typing import Optional, Dict +from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError +from browserforge.injectors.playwright import AsyncNewContext +from llm_agent import LLMJobRefiner +from fetcher import StealthyFetcher +from datetime import datetime +import redis +import pika +from tenacity import retry, stop_after_attempt, wait_exponential +import logging + +# Import your engine +from scraping_engine import FingerprintScrapingEngine + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Environment variables +RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "rabbitq.thejobhub.xyz") +RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672")) +RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest") +RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest") +REDIS_HOST = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) + +class AshbyJobScraper: + def __init__( + self, + engine: FingerprintScrapingEngine, + user_request: str = "Extract all standard job details" + ): + self.engine = engine + self.user_request = user_request + self.llm_agent = LLMJobRefiner() + self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) + self.browser = None + self.context = None + + async def init_browser(self): + """Initialize browser once using engine's fingerprint""" + if self.browser is None: + profile = self.engine._select_profile() + renderer = random.choice(self.engine.common_renderers[self.engine.os]) + vendor = random.choice(self.engine.common_vendors) + spoof_script = self.engine._get_spoof_script(renderer, vendor) + + pw = await async_playwright().start() + self.browser = await pw.chromium.launch( + headless=True, + args=['--disable-blink-features=AutomationControlled'] + ) + self.context = await AsyncNewContext(self.browser, fingerprint=profile) + await self.context.add_init_script(f""" + Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); + Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); + Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); + """) + await self.context.add_init_script(spoof_script) + + async def close_browser(self): + if self.browser: + await self.browser.close() + self.browser = None + + async def _safe_inner_text(self, element): + if not element: + return "Unknown" + try: + return await element.text_content() + except: + return "Unknown" + + async def _human_click(self, page, element, wait_after: bool = True): + if not element: + return False + await element.scroll_into_view_if_needed() + speed = self.engine.optimization_params.get("base_delay", 2.0) / 2 + await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2)) + try: + await element.click() + if wait_after: + await asyncio.sleep(random.uniform(2, 4) * (speed / 2)) + return True + except: + return False + + async def _extract_page_content_for_llm(self, page) -> str: + speed = self.engine.optimization_params.get("base_delay", 2.0) + await asyncio.sleep(2 * (speed / 2)) + await self.engine._human_like_scroll(page) + await asyncio.sleep(2 * (speed / 2)) + return await page.content() + + async def _is_job_seen(self, job_id: str) -> bool: + return self.redis_client.get(f"seen_job:{job_id}") is not None + + async def _mark_job_seen(self, job_id: str): + self.redis_client.setex(f"seen_job:{job_id}", 7 * 24 * 3600, "1") + + async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: + cached = self.redis_client.get(f"llm_cache:{job_url}") + if cached: + return json.loads(cached) + return None + + async def _cache_llm_result(self, job_url: str, result: Dict): + self.redis_client.setex(f"llm_cache:{job_url}", 7 * 24 * 3600, json.dumps(result)) + + async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): + try: + job_data = { + "job_url": job_url, + "job_id": job_id, + "error_type": error_type, + "timestamp": datetime.now().isoformat() + } + self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) + logger.info(f"📦 Added failed job to Redis cache: {job_id} (Error: {error_type})") + except Exception as e: + logger.error(f"❌ Failed to add to Redis: {str(e)}") + + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) + async def scrape_job( + self, + job_url: str, + company_name: str, + message_id: str + ): + job_id = job_url.strip("/").split("/")[-1] + + if await self._is_job_seen(job_id): + logger.info(f"⏭️ Skipping already processed job: {job_id}") + return True + + cached_result = await self._get_cached_llm_result(job_url) + if cached_result: + logger.info(f"📦 Using cached LLM result for: {job_url}") + await self.llm_agent.save_job_data(cached_result, company_name) + await self._mark_job_seen(job_id) + return True + + page = None + start_time = time.time() + try: + await self.init_browser() + page = await self.context.new_page() + + # Fetch with timeout from engine config + timeout_ms = self.engine.optimization_params.get("request_timeout", 120000) + temp_fetcher = StealthyFetcher(self.engine, self.browser, self.context) + job_page = await asyncio.wait_for( + temp_fetcher.fetch_url(job_url, wait_for_selector="h1"), + timeout=timeout_ms / 1000.0 + ) + + if not job_page: + await self._add_job_to_redis_cache(job_url, job_id, "fetch_failure") + self.engine.report_outcome("fetch_failure", url=job_url) + return False + + # Handle Cloudflare if detected + if await self.engine._detect_cloudflare(job_page): + success = await self.engine._handle_cloudflare(job_page) + if not success: + await self._add_job_to_redis_cache(job_url, job_id, "cloudflare") + self.engine.report_outcome("cloudflare", url=job_url) + return False + + apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") + apply_type = 'signup' + if apply_btn: + await self._human_click(job_page, apply_btn) + speed = self.engine.optimization_params.get("base_delay", 2.0) + await asyncio.sleep(2 * (speed / 2)) + form = await job_page.query_selector("form, div[class*='application-form']") + if form: + apply_type = 'AI' + + final_url = job_url + page_content = await self._extract_page_content_for_llm(job_page) + posted_date = datetime.now().strftime("%m/%d/%y") + + raw_data = { + "page_content": page_content, + "url": final_url, + "job_id": job_id, + "search_keywords": company_name, + "posted_date": posted_date + } + + # LLM call with timeout + llm_timeout = max(30, self.engine.feedback.get("avg_response_time", 10) * 2) + refined_data = await asyncio.wait_for( + self.llm_agent.refine_job_data(raw_data, self.user_request), + timeout=llm_timeout + ) + + success = False + if refined_data and refined_data.get("title", "N/A") != "N/A": + compulsory_fields = ['company_name', 'job_id', 'url'] + for field in compulsory_fields: + if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: + if field == 'job_id': + refined_data[field] = job_id + elif field == 'url': + refined_data[field] = final_url + elif field == 'company_name': + refined_data[field] = company_name + + refined_data['apply_type'] = apply_type + refined_data['scraped_at'] = datetime.now().isoformat() + refined_data['category'] = company_name + refined_data['posted_date'] = posted_date + refined_data['message_id'] = message_id + + await self.llm_agent.save_job_data(refined_data, company_name) + await self._cache_llm_result(job_url, refined_data) + await self._mark_job_seen(job_id) + + response_time = time.time() - start_time + self.engine.report_outcome("success", url=final_url, response_time=response_time) + logger.info(f"✅ Scraped: {refined_data['title'][:50]}...") + success = True + else: + logger.warning(f"🟡 LLM failed to refine: {final_url}") + await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") + self.engine.report_outcome("llm_failure", url=final_url) + + return success + + except asyncio.TimeoutError: + logger.error(f"⏰ Timeout processing job: {job_url}") + await self._add_job_to_redis_cache(job_url, job_id, "timeout") + self.engine.report_outcome("timeout", url=job_url) + return False + except Exception as e: + logger.error(f"💥 Error processing job {job_url}: {str(e)}") + await self._add_job_to_redis_cache(job_url, job_id, "exception") + self.engine.report_outcome("exception", url=job_url) + return False + finally: + if page: + await page.close() + +# Global metrics +METRICS = { + "processed": 0, + "success": 0, + "failed": 0, + "skipped": 0, + "start_time": time.time() +} + +async def process_message_async(scraper: AshbyJobScraper, ch, method, properties, body): + try: + job_data = json.loads(body) + job_link = job_data['job_link'] + company_name = job_data['company_name'] + message_id = properties.message_id or f"msg_{int(time.time()*1000)}" + + logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") + + success = await scraper.scrape_job(job_link, company_name, message_id) + + METRICS["processed"] += 1 + if success: + METRICS["success"] += 1 + else: + METRICS["failed"] += 1 + + except json.JSONDecodeError: + logger.error("❌ Invalid JSON in message") + METRICS["failed"] += 1 + except Exception as e: + logger.error(f"💥 Unexpected error: {str(e)}") + METRICS["failed"] += 1 + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + +def callback_wrapper(scraper: AshbyJobScraper): + def callback(ch, method, properties, body): + asyncio.run(process_message_async(scraper, ch, method, properties, body)) + return callback + +def start_consumer(): + # Initialize REAL engine + engine = FingerprintScrapingEngine( + seed="ashby_scraper", + target_os="windows", + num_variations=10 + ) + scraper = AshbyJobScraper(engine) + + # RabbitMQ connection with retries + connection = None + for attempt in range(5): + try: + credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + virtual_host='/', + credentials=credentials, + heartbeat=600, + blocked_connection_timeout=300 + ) + connection = pika.BlockingConnection(parameters) + break + except Exception as e: + logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}") + time.sleep(2 ** attempt) + + if not connection: + logger.error("Failed to connect to RabbitMQ after retries") + return + + channel = connection.channel() + channel.queue_declare(queue='job_queue', durable=True) + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) + + logger.info('Waiting for messages. To exit press CTRL+C') + try: + channel.start_consuming() + except KeyboardInterrupt: + logger.info("Shutting down...") + channel.stop_consuming() + connection.close() + asyncio.run(scraper.close_browser()) + +if __name__ == "__main__": + start_consumer() \ No newline at end of file diff --git a/sender.py b/sender.py new file mode 100644 index 0000000..c836ba1 --- /dev/null +++ b/sender.py @@ -0,0 +1,181 @@ +import csv +import json +import logging +import os +import sys +import time +import signal +import uuid +from configparser import ConfigParser +import pika +import redis +import os + +class Sender: + def __init__(self, config_file='config.ini'): + self.config = ConfigParser() + self.config.read(config_file) + + # RabbitMQ from env vars with fallbacks + self.rabbitmq_host = os.getenv("RABBITMQ_HOST", self.config.get('rabbitmq', 'url', fallback='rabbitq.thejobhub.xyz')) + self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT", self.config.get('rabbitmq', 'port', fallback='5672'))) + self.username = os.getenv("RABBITMQ_USER", self.config.get('rabbitmq', 'username', fallback='guest')) + self.password = os.getenv("RABBITMQ_PASS", self.config.get('rabbitmq', 'password', fallback='guest')) + self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue') + self.directory = self.config.get('files', 'directory', fallback='/var/jobs/csv') + self.log_file = self.config.get('logging', 'log_file', fallback='/var/logs/sender.log') + self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/') + self.batch_size = 500 + self.retry_attempts = 5 # Increased for robustness + self.retry_sleep = 2 + self.check_interval = 30 # More frequent polling + + # Redis for deduplication + redis_host = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz") + redis_port = int(os.getenv("REDIS_PORT", "6379")) + self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=1, decode_responses=True) + + logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + self.logger = logging.getLogger(__name__) + + self.connection = None + self.channel = None + self.running = True + + signal.signal(signal.SIGTERM, self.graceful_shutdown) + signal.signal(signal.SIGINT, self.graceful_shutdown) + + def connect(self): + try: + credentials = pika.PlainCredentials(self.username, self.password) + parameters = pika.ConnectionParameters( + host=self.rabbitmq_host, + port=self.rabbitmq_port, + virtual_host=self.virtual_host, + credentials=credentials, + heartbeat=600, + blocked_connection_timeout=300 + ) + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + self.channel.queue_declare(queue=self.queue_name, durable=True) + self.logger.info("Connected to RabbitMQ") + return True + except Exception as e: + self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}") + return False + + def reconnect(self): + if self.connection and self.connection.is_open: + self.connection.close() + time.sleep(self.retry_sleep) + return self.connect() + + def send_message(self, message, message_id): + for attempt in range(self.retry_attempts): + try: + self.channel.basic_publish( + exchange='', + routing_key=self.queue_name, + body=message, + properties=pika.BasicProperties( + delivery_mode=2, + message_id=message_id + ) + ) + return True + except Exception as e: + self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}") + if attempt < self.retry_attempts - 1: + time.sleep(self.retry_sleep * (2 ** attempt)) # Exponential backoff + if not self.reconnect(): + return False + return False + + def is_job_seen(self, job_url): + """Check if job was sent recently (7 days)""" + return self.redis_client.get(f"sent_job:{job_url}") is not None + + def mark_job_sent(self, job_url): + """Mark job as sent with 7-day TTL""" + self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1") + + def process_csv(self, file_path): + try: + with open(file_path, 'r') as csvfile: + reader = csv.DictReader(csvfile) + batch = [] + sent_count = 0 + skipped_count = 0 + + for row in reader: + # Validate required fields + if 'job_link' not in row or 'company_name' not in row: + self.logger.warning(f"Skipping invalid row in {file_path}: {row}") + continue + + job_link = row['job_link'].strip() + company_name = row['company_name'].strip() + + if not job_link or not company_name: + self.logger.warning(f"Skipping empty row in {file_path}") + continue + + # Deduplication + if self.is_job_seen(job_link): + skipped_count += 1 + continue + + job_data = { + 'job_link': job_link, + 'company_name': company_name + } + + message_id = str(uuid.uuid4()) + message = json.dumps(job_data) + + if self.send_message(message, message_id): + sent_count += 1 + self.mark_job_sent(job_link) + else: + self.logger.error(f"Failed to send job: {job_link}") + + if (sent_count + skipped_count) % 100 == 0: + self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}") + + self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped") + os.rename(file_path, file_path + '.processed') + self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed") + return sent_count + except Exception as e: + self.logger.error(f"Error processing {file_path}: {str(e)}") + return 0 + + def find_new_csvs(self): + files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')] + files.sort() + return [os.path.join(self.directory, f) for f in files] + + def run(self): + if not self.connect(): + sys.exit(1) + while self.running: + new_files = self.find_new_csvs() + if new_files: + for file_path in new_files: + self.logger.info(f"Processing {file_path}") + sent = self.process_csv(file_path) + self.logger.info(f"Sent {sent} jobs from {file_path}") + else: + self.logger.info("No new CSV files found") + time.sleep(self.check_interval) + if self.connection and self.connection.is_open: + self.connection.close() + + def graceful_shutdown(self, signum, frame): + self.logger.info("Received shutdown signal") + self.running = False + +if __name__ == '__main__': + sender = Sender() + sender.run() \ No newline at end of file