import asyncio import random import os import json import time from typing import Optional, Dict from playwright.async_api import async_playwright from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner from fetcher import StealthyFetcher from datetime import datetime import pika import logging from tenacity import retry, stop_after_attempt, wait_exponential from scraping_engine import FingerprintScrapingEngine from dotenv import load_dotenv from ssl_connection import create_ssl_connection_parameters # Import from ssl.py import redis load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment variables RABBITMQ_HOST = os.getenv("RABBITMQ_HOST") RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671")) RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true" # Redis configuration REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PORT = int(os.getenv('REDIS_PORT', '6380')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD') REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' # TTL constants (in seconds) JOB_SEEN_TTL = 2592000 # 30 days (1 month) class RedisManager: """Manages Redis connection and operations for job tracking and caching.""" def __init__(self): self.redis_client = None self._connect() def _connect(self): """Establish connection to Redis server.""" if not REDIS_PASSWORD: logger.warning("Warning: REDIS_PASSWORD not found in environment.") try: self.redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, ssl=REDIS_SSL_ENABLED, ssl_cert_reqs=None, socket_connect_timeout=10, socket_timeout=30, retry_on_timeout=True ) response = self.redis_client.ping() logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") self.redis_client = None def is_job_seen(self, job_id: str) -> bool: if not self.redis_client: return False try: return bool(self.redis_client.exists(f"job_seen:{job_id}")) except Exception as e: logger.error(f"Redis error checking job_seen: {e}") return False def get_cached_llm_result(self, job_url: str) -> Optional[Dict]: if not self.redis_client: return None try: cached_data = self.redis_client.get(f"llm_cache:{job_url}") if cached_data: return json.loads(cached_data) return None except Exception as e: logger.error(f"Redis error getting LLM cache: {e}") return None def mark_job_seen(self, job_id: str): if not self.redis_client: return try: # Set with TTL of 1 month self.redis_client.setex(f"job_seen:{job_id}", JOB_SEEN_TTL, "1") except Exception as e: logger.error(f"Redis error marking job_seen: {e}") def cache_llm_result(self, job_url: str, result: Dict): if not self.redis_client: return try: self.redis_client.set(f"llm_cache:{job_url}", json.dumps(result)) # No TTL for LLM cache except Exception as e: logger.error(f"Redis error caching LLM result: {e}") def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str): if not self.redis_client: return try: error_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } self.redis_client.set(f"error_cache:{job_id}", json.dumps(error_data)) # No TTL for error cache except Exception as e: logger.error(f"Redis error adding to error cache: {e}") def remove_job_from_error_cache(self, job_id: str): """Remove job from error cache when successfully processed later.""" if not self.redis_client: return try: deleted = self.redis_client.delete(f"error_cache:{job_id}") if deleted: logger.info(f"โœ… Removed job {job_id} from error cache after successful processing") except Exception as e: logger.error(f"Redis error removing from error cache: {e}") def add_job_to_sent_cache(self, job_id: str): """Mark job as sent for processing.""" if not self.redis_client: return try: self.redis_client.set(f"sent_job:{job_id}", "1") # No TTL except Exception as e: logger.error(f"Redis error adding to sent cache: {e}") def remove_job_from_sent_cache(self, job_id: str): """Remove job from sent cache upon successful or failed completion.""" if not self.redis_client: return try: deleted = self.redis_client.delete(f"sent_job:{job_id}") if deleted: logger.debug(f"๐Ÿงน Removed job {job_id} from sent cache") except Exception as e: logger.error(f"Redis error removing from sent cache: {e}") def is_job_in_sent_cache(self, job_id: str) -> bool: """Check if job is already in sent cache.""" if not self.redis_client: return False try: return bool(self.redis_client.exists(f"sent_job:{job_id}")) except Exception as e: logger.error(f"Redis error checking sent cache: {e}") return False class MultiPlatformJobScraper: def __init__( self, engine: FingerprintScrapingEngine, user_request: str = "Extract all standard job details" ): self.engine = engine self.user_request = user_request self.llm_agent = LLMJobRefiner() self.browser = None self.pw = None self.redis_manager = RedisManager() async def init_browser(self): if self.browser is not None: try: await self.browser.new_page() await self.close_browser() except: await self.close_browser() if self.browser is None: try: profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) self.pw = await async_playwright().start() self.browser = await self.pw.chromium.launch( headless=True, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu' ] ) logger.info("โœ… Browser launched (will reuse for all jobs)") except Exception as e: logger.error(f"๐Ÿ’ฅ Failed to launch browser: {e}") raise async def create_fresh_context(self): if self.browser is None: await self.init_browser() try: await self.browser.new_page() except Exception: logger.warning("Browser appears dead. Reinitializing...") await self.close_browser() await self.init_browser() profile = self.engine._select_profile() context = await AsyncNewContext(self.browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) spoof_script = self.engine._get_spoof_script( random.choice(self.engine.common_renderers[self.engine.os]), random.choice(self.engine.common_vendors) ) await context.add_init_script(spoof_script) return context async def close_browser(self): if self.browser: try: await self.browser.close() except: pass self.browser = None if self.pw: try: await self.pw.stop() except: pass self.pw = None async def _safe_inner_text(self, element): if not element: return "Unknown" try: return await element.text_content() except: return "Unknown" async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() speed = self.engine.optimization_params.get("base_delay", 2.0) / 2 await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2)) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * (speed / 2)) return True except: return False async def _extract_page_content_for_llm(self, page) -> str: speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) if "lever.co" not in page.url: await self.engine._human_like_scroll(page) await asyncio.sleep(2 * (speed / 2)) return await page.content() async def _is_job_seen(self, job_id: str) -> bool: return self.redis_manager.is_job_seen(job_id) async def _mark_job_seen(self, job_id: str): self.redis_manager.mark_job_seen(job_id) async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: return self.redis_manager.get_cached_llm_result(job_url) async def _cache_llm_result(self, job_url: str, result: Dict): self.redis_manager.cache_llm_result(job_url, result) async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): logger.info(f" ๐Ÿ“ฆ Adding failed job to Redis cache: {job_id} (Error: {error_type})") self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type) async def _remove_job_from_error_cache(self, job_id: str): """Remove job from error cache when successfully processed.""" self.redis_manager.remove_job_from_error_cache(job_id) async def _add_job_to_sent_cache(self, job_id: str): """Add job to sent cache when processing begins.""" self.redis_manager.add_job_to_sent_cache(job_id) async def _remove_job_from_sent_cache(self, job_id: str): """Remove job from sent cache when processing completes.""" self.redis_manager.remove_job_from_sent_cache(job_id) def _get_platform(self, url: str) -> str: if "ashbyhq.com" in url: return "ashby" elif "lever.co" in url: return "lever" elif "greenhouse.io" in url: return "greenhouse" else: return "unknown" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def scrape_job( self, job_url: str, company_name: str, message_id: str ): platform = self._get_platform(job_url) if platform == "unknown": logger.info(f"โญ๏ธ Skipping unsupported platform: {job_url}") return True job_id = job_url.strip("/").split("/")[-1] # Add job to sent cache at the beginning of processing await self._add_job_to_sent_cache(job_id) if await self._is_job_seen(job_id): logger.info(f"โญ๏ธ Skipping already processed job: {job_id}") await self._remove_job_from_sent_cache(job_id) return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"๐Ÿ“ฆ Using cached LLM result for: {job_url}") await self.llm_agent.save_job_data(cached_result, company_name) await self._mark_job_seen(job_id) await self._remove_job_from_sent_cache(job_id) return True context = None page = None start_time = time.time() try: context = await self.create_fresh_context() page = await context.new_page() timeout_ms = self.engine.optimization_params.get("request_timeout", 120000) temp_fetcher = StealthyFetcher(self.engine, self.browser, context) fetch_timeout = 60000 if platform == "lever" else timeout_ms job_page = await asyncio.wait_for( temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), timeout=fetch_timeout / 1000.0 ) # Check if job still exists (minimal content validation) page_content = await job_page.content() if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" logger.error(f"โŒ Job no longer exists (empty/deleted): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) await self._remove_job_from_sent_cache(job_id) return False if platform == "ashby": try: await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) except Exception: logger.warning(f"โš ๏ธ Ashby page didn't load properly: {job_url}") await self._remove_job_from_sent_cache(job_id) return False elif platform == "lever": pass elif platform == "greenhouse": try: await job_page.wait_for_selector("div.job-desc, section", timeout=60000) except Exception: pass # Extract page content for initial validation page_content = await self._extract_page_content_for_llm(job_page) # Check for job expiration or unavailability indicators page_text_lower = page_content.lower() job_unavailable_indicators = [ "job no longer available", "position has been filled", "this job has expired", "job posting has expired", "no longer accepting applications", "position is closed", "job is no longer active", "this position is no longer open" ] if any(indicator in page_text_lower for indicator in job_unavailable_indicators): logger.error(f"โŒ Job no longer available/expired: {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) await self._remove_job_from_sent_cache(job_id) return False # ๐Ÿ”‘ APPLY TYPE LOGIC if platform in ["ashby", "lever", "greenhouse"]: apply_type = 'AI' # Always AI for these platforms else: # For other platforms: check if form is accessible without login apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") apply_type = 'signup' # default if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) form = await job_page.query_selector("form, div[class*='application-form']") if form: # Check for login prompts in form login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'") if not login_indicators: apply_type = 'AI' else: apply_type = 'signup' else: apply_type = 'signup' final_url = job_url # Hardcode posted_date to Dec 1st 2025 posted_date = "12/01/25" raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, "search_keywords": company_name, "posted_date": posted_date } llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2) refined_data = await asyncio.wait_for( self.llm_agent.refine_job_data(raw_data, self.user_request), timeout=llm_timeout ) success = False if refined_data: # Define compulsory fields that must be present and valid compulsory_fields = ['title', 'company_name', 'description', 'job_id', 'url'] # Check if ALL compulsory fields are present and valid BEFORE any processing missing_fields = [] for field in compulsory_fields: field_value = refined_data.get(field, "") if not field_value or str(field_value).strip() in ["", "N/A", "Unknown", "Not provided", "Not available", "Company", "Job"]: missing_fields.append(field) # If any compulsory field is missing, discard the job immediately if missing_fields: logger.error(f"โŒ Job discarded - missing compulsory fields: {', '.join(missing_fields)} : {final_url}") error_type = "missing_compulsory_fields" await self._add_job_to_redis_cache(final_url, job_id, error_type) self.engine.report_outcome(error_type, url=final_url) await self._remove_job_from_sent_cache(job_id) return False # If we get here, all compulsory fields are valid - now add additional metadata refined_data.update({ 'apply_type': apply_type, 'scraped_at': datetime.now().isoformat(), 'category': company_name, 'posted_date': posted_date, 'message_id': message_id, 'platform': platform }) # Save to database and markdown await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) await self._mark_job_seen(job_id) # Remove from error cache if it was previously failed await self._remove_job_from_error_cache(job_id) response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) logger.info(f"โœ… Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") success = True await self._remove_job_from_sent_cache(job_id) else: logger.warning(f"๐ŸŸก LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_url) await self._remove_job_from_sent_cache(job_id) return success except asyncio.TimeoutError: logger.error(f"โฐ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") self.engine.report_outcome("timeout", url=job_url) await self._remove_job_from_sent_cache(job_id) return False except Exception as e: error_msg = str(e) if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg: logger.warning("Browser connection lost. Forcing reinitialization.") await self.close_browser() # ๐Ÿ” Distinguish job-not-found vs other errors if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg: logger.error(f"โŒ Job no longer exists (404/network error): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) else: # Categorize other errors error_type = "exception" if "timeout" in error_msg.lower(): error_type = "timeout" elif "llm" in error_msg.lower() or "refine" in error_msg.lower(): error_type = "llm_failure" else: error_type = "scraping_error" logger.error(f"๐Ÿ’ฅ Error processing job ({platform}) {job_url}: {error_msg}") await self._add_job_to_redis_cache(job_url, job_id, error_type) self.engine.report_outcome(error_type, url=job_url) await self._remove_job_from_sent_cache(job_id) return False finally: if context: try: await context.close() except Exception: pass # Global metrics METRICS = { "processed": 0, "success": 0, "failed": 0, "skipped": 0, "start_time": time.time() } async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body): try: job_data = json.loads(body) job_link = job_data['job_link'] company_name = job_data['company_name'] message_id = properties.message_id or f"msg_{int(time.time()*1000)}" logger.info(f"๐Ÿ“ฅ Processing job: {job_link} (ID: {message_id})") success = await scraper.scrape_job(job_link, company_name, message_id) METRICS["processed"] += 1 if success: METRICS["success"] += 1 else: METRICS["failed"] += 1 except json.JSONDecodeError: logger.error("โŒ Invalid JSON in message") METRICS["failed"] += 1 except Exception as e: logger.error(f"๐Ÿ’ฅ Unexpected error: {str(e)}") METRICS["failed"] += 1 finally: ch.basic_ack(delivery_tag=method.delivery_tag) def callback_wrapper(scraper: MultiPlatformJobScraper): def callback(ch, method, properties, body): asyncio.run(process_message_async(scraper, ch, method, properties, body)) return callback def start_consumer(): engine = FingerprintScrapingEngine( # Other env vars... seed=os.getenv("SEED_NAME", "multiplatform_scraper"), target_os="windows", num_variations=10 ) scraper = MultiPlatformJobScraper(engine) connection = None for attempt in range(5): try: parameters = create_ssl_connection_parameters() if RABBITMQ_SSL_ENABLED: logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}") else: logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}") connection = pika.BlockingConnection(parameters) break except Exception as e: logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}") time.sleep(2 ** attempt) if not connection: logger.error("Failed to connect to RabbitMQ after retries") return channel = connection.channel() channel.queue_declare(queue='job_queue', durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: logger.info("Shutting down...") channel.stop_consuming() connection.close() asyncio.run(scraper.close_browser()) if __name__ == "__main__": start_consumer()