import asyncio import random import os import json import time from typing import Optional, Dict from playwright.async_api import async_playwright from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner from fetcher import StealthyFetcher from datetime import datetime import pika import logging from tenacity import retry, stop_after_attempt, wait_exponential from scraping_engine import FingerprintScrapingEngine from dotenv import load_dotenv from ssl_connection import create_ssl_connection_parameters # Import from ssl.py import redis load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment variables RABBITMQ_HOST = os.getenv("RABBITMQ_HOST") RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671")) RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true" # Redis configuration REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PORT = int(os.getenv('REDIS_PORT', '6380')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD') REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' class RedisManager: """Manages Redis connection and operations for job tracking and caching.""" def __init__(self): self.redis_client = None self._connect() def _connect(self): """Establish connection to Redis server.""" if not REDIS_PASSWORD: logger.warning("Warning: REDIS_PASSWORD not found in environment.") try: self.redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, ssl=REDIS_SSL_ENABLED, ssl_cert_reqs=None, socket_connect_timeout=10, socket_timeout=30, retry_on_timeout=True ) response = self.redis_client.ping() logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") self.redis_client = None def is_job_seen(self, job_id: str) -> bool: if not self.redis_client: return False try: return bool(self.redis_client.exists(f"job_seen:{job_id}")) except Exception as e: logger.error(f"Redis error checking job_seen: {e}") return False def mark_job_seen(self, job_id: str): if not self.redis_client: return try: self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1") except Exception as e: logger.error(f"Redis error marking job_seen: {e}") def get_cached_llm_result(self, job_url: str) -> Optional[Dict]: if not self.redis_client: return None try: cached_data = self.redis_client.get(f"llm_cache:{job_url}") if cached_data: return json.loads(cached_data) return None except Exception as e: logger.error(f"Redis error getting LLM cache: {e}") return None def cache_llm_result(self, job_url: str, result: Dict): if not self.redis_client: return try: self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result)) except Exception as e: logger.error(f"Redis error caching LLM result: {e}") def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str): if not self.redis_client: return try: error_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } self.redis_client.setex(f"error_cache:{job_id}", 3600, json.dumps(error_data)) except Exception as e: logger.error(f"Redis error adding to error cache: {e}") class MultiPlatformJobScraper: def __init__( self, engine: FingerprintScrapingEngine, user_request: str = "Extract all standard job details" ): self.engine = engine self.user_request = user_request self.llm_agent = LLMJobRefiner() self.browser = None self.pw = None self.redis_manager = RedisManager() async def init_browser(self): if self.browser is not None: try: await self.browser.new_page() await self.close_browser() except: await self.close_browser() if self.browser is None: try: profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) self.pw = await async_playwright().start() self.browser = await self.pw.chromium.launch( headless=True, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu' ] ) logger.info("✅ Browser launched (will reuse for all jobs)") except Exception as e: logger.error(f"💥 Failed to launch browser: {e}") raise async def create_fresh_context(self): if self.browser is None: await self.init_browser() try: await self.browser.new_page() except Exception: logger.warning("Browser appears dead. Reinitializing...") await self.close_browser() await self.init_browser() profile = self.engine._select_profile() context = await AsyncNewContext(self.browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) spoof_script = self.engine._get_spoof_script( random.choice(self.engine.common_renderers[self.engine.os]), random.choice(self.engine.common_vendors) ) await context.add_init_script(spoof_script) return context async def close_browser(self): if self.browser: try: await self.browser.close() except: pass self.browser = None if self.pw: try: await self.pw.stop() except: pass self.pw = None async def _safe_inner_text(self, element): if not element: return "Unknown" try: return await element.text_content() except: return "Unknown" async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() speed = self.engine.optimization_params.get("base_delay", 2.0) / 2 await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2)) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * (speed / 2)) return True except: return False async def _extract_page_content_for_llm(self, page) -> str: speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) if "lever.co" not in page.url: await self.engine._human_like_scroll(page) await asyncio.sleep(2 * (speed / 2)) return await page.content() async def _is_job_seen(self, job_id: str) -> bool: return self.redis_manager.is_job_seen(job_id) async def _mark_job_seen(self, job_id: str): self.redis_manager.mark_job_seen(job_id) async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: return self.redis_manager.get_cached_llm_result(job_url) async def _cache_llm_result(self, job_url: str, result: Dict): self.redis_manager.cache_llm_result(job_url, result) async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})") self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type) def _get_platform(self, url: str) -> str: if "ashbyhq.com" in url: return "ashby" elif "lever.co" in url: return "lever" elif "greenhouse.io" in url: return "greenhouse" else: return "unknown" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def scrape_job( self, job_url: str, company_name: str, message_id: str ): platform = self._get_platform(job_url) if platform == "unknown": logger.info(f"⏭️ Skipping unsupported platform: {job_url}") return True job_id = job_url.strip("/").split("/")[-1] if await self._is_job_seen(job_id): logger.info(f"⏭️ Skipping already processed job: {job_id}") return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"📦 Using cached LLM result for: {job_url}") await self.llm_agent.save_job_data(cached_result, company_name) await self._mark_job_seen(job_id) return True context = None page = None start_time = time.time() try: context = await self.create_fresh_context() page = await context.new_page() timeout_ms = self.engine.optimization_params.get("request_timeout", 120000) temp_fetcher = StealthyFetcher(self.engine, self.browser, context) fetch_timeout = 60000 if platform == "lever" else timeout_ms job_page = await asyncio.wait_for( temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), timeout=fetch_timeout / 1000.0 ) # Check if job still exists (minimal content validation) page_content = await job_page.content() if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) return False if platform == "ashby": try: await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) except Exception: logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}") return False elif platform == "lever": pass elif platform == "greenhouse": try: await job_page.wait_for_selector("div.job-desc, section", timeout=60000) except Exception: pass # Extract page content for initial validation page_content = await self._extract_page_content_for_llm(job_page) # Check for job expiration or unavailability indicators page_text_lower = page_content.lower() job_unavailable_indicators = [ "job no longer available", "position has been filled", "this job has expired", "job posting has expired", "no longer accepting applications", "position is closed", "job is no longer active", "this position is no longer open" ] if any(indicator in page_text_lower for indicator in job_unavailable_indicators): logger.error(f"❌ Job no longer available/expired: {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) return False # 🔑 APPLY TYPE LOGIC if platform in ["ashby", "lever", "greenhouse"]: apply_type = 'AI' # Always AI for these platforms else: # For other platforms: check if form is accessible without login apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") apply_type = 'signup' # default if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) form = await job_page.query_selector("form, div[class*='application-form']") if form: # Check for login prompts in form login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'") if not login_indicators: apply_type = 'AI' else: apply_type = 'signup' else: apply_type = 'signup' final_url = job_url # Hardcode posted_date to Dec 1st 2025 posted_date = "12/01/25" raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, "search_keywords": company_name, "posted_date": posted_date } llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2) refined_data = await asyncio.wait_for( self.llm_agent.refine_job_data(raw_data, self.user_request), timeout=llm_timeout ) success = False if refined_data and refined_data.get("title", "N/A") != "N/A": # Define all compulsory fields that must be present and valid compulsory_fields = ['company_name', 'job_id', 'url', 'title', 'description'] # Validate all compulsory fields missing_fields = [] for field in compulsory_fields: field_value = refined_data.get(field, "").strip() if not field_value or field_value in ["N/A", "Unknown", ""]: missing_fields.append(field) # If any compulsory field or description is missing, discard the job if missing_fields: logger.error(f"❌ Job discarded - missing compulsory fields {missing_fields}: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=final_url) return False # If we get here, all compulsory fields are present and valid # Update with additional metadata refined_data.update({ 'apply_type': apply_type, 'scraped_at': datetime.now().isoformat(), 'category': company_name, 'posted_date': posted_date, 'message_id': message_id, 'platform': platform }) await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) await self._mark_job_seen(job_id) response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") success = True else: logger.warning(f"🟡 LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_url) return success except asyncio.TimeoutError: logger.error(f"⏰ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") self.engine.report_outcome("timeout", url=job_url) return False except Exception as e: error_msg = str(e) if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg: logger.warning("Browser connection lost. Forcing reinitialization.") await self.close_browser() # 🔍 Distinguish job-not-found vs other errors if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg: logger.error(f"❌ Job no longer exists (404/network error): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) else: # Categorize other errors error_type = "exception" if "timeout" in error_msg.lower(): error_type = "timeout" elif "llm" in error_msg.lower() or "refine" in error_msg.lower(): error_type = "llm_failure" else: error_type = "scraping_error" logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") await self._add_job_to_redis_cache(job_url, job_id, error_type) self.engine.report_outcome(error_type, url=job_url) return False finally: if context: try: await context.close() except Exception: pass # Global metrics METRICS = { "processed": 0, "success": 0, "failed": 0, "skipped": 0, "start_time": time.time() } async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body): try: job_data = json.loads(body) job_link = job_data['job_link'] company_name = job_data['company_name'] message_id = properties.message_id or f"msg_{int(time.time()*1000)}" logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") success = await scraper.scrape_job(job_link, company_name, message_id) METRICS["processed"] += 1 if success: METRICS["success"] += 1 else: METRICS["failed"] += 1 except json.JSONDecodeError: logger.error("❌ Invalid JSON in message") METRICS["failed"] += 1 except Exception as e: logger.error(f"💥 Unexpected error: {str(e)}") METRICS["failed"] += 1 finally: ch.basic_ack(delivery_tag=method.delivery_tag) def callback_wrapper(scraper: MultiPlatformJobScraper): def callback(ch, method, properties, body): asyncio.run(process_message_async(scraper, ch, method, properties, body)) return callback def start_consumer(): engine = FingerprintScrapingEngine( seed="multiplatform_scraper", target_os="windows", num_variations=10 ) scraper = MultiPlatformJobScraper(engine) connection = None for attempt in range(5): try: parameters = create_ssl_connection_parameters() if RABBITMQ_SSL_ENABLED: logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}") else: logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}") connection = pika.BlockingConnection(parameters) break except Exception as e: logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}") time.sleep(2 ** attempt) if not connection: logger.error("Failed to connect to RabbitMQ after retries") return channel = connection.channel() channel.queue_declare(queue='job_queue', durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: logger.info("Shutting down...") channel.stop_consuming() connection.close() asyncio.run(scraper.close_browser()) if __name__ == "__main__": start_consumer()