diff --git a/scraper.py b/scraper.py index c3db070..89800c2 100644 --- a/scraper.py +++ b/scraper.py @@ -15,40 +15,34 @@ import logging from tenacity import retry, stop_after_attempt, wait_exponential from scraping_engine import FingerprintScrapingEngine from dotenv import load_dotenv -from ssl_connection import create_ssl_connection_parameters # Import from ssl.py +from ssl_connection import create_ssl_connection_parameters # Import from ssl.py import redis - load_dotenv() - # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) - # Environment variables RABBITMQ_HOST = os.getenv("RABBITMQ_HOST") RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671")) RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true" - # Redis configuration REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PORT = int(os.getenv('REDIS_PORT', '6380')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD') REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' - - class RedisManager: """Manages Redis connection and operations for job tracking and caching.""" - + def __init__(self): self.redis_client = None self._connect() - + def _connect(self): """Establish connection to Redis server.""" if not REDIS_PASSWORD: logger.warning("Warning: REDIS_PASSWORD not found in environment.") - + try: self.redis_client = redis.Redis( host=REDIS_HOST, @@ -60,37 +54,37 @@ class RedisManager: socket_timeout=30, retry_on_timeout=True ) - + response = self.redis_client.ping() logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}") - + except Exception as e: logger.error(f"Failed to connect to Redis: {e}") self.redis_client = None - + def is_job_seen(self, job_id: str) -> bool: if not self.redis_client: return False - + try: return bool(self.redis_client.exists(f"job_seen:{job_id}")) except Exception as e: logger.error(f"Redis error checking job_seen: {e}") return False - + def mark_job_seen(self, job_id: str): if not self.redis_client: return - + try: self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1") except Exception as e: logger.error(f"Redis error marking job_seen: {e}") - + def get_cached_llm_result(self, job_url: str) -> Optional[Dict]: if not self.redis_client: return None - + try: cached_data = self.redis_client.get(f"llm_cache:{job_url}") if cached_data: @@ -99,20 +93,20 @@ class RedisManager: except Exception as e: logger.error(f"Redis error getting LLM cache: {e}") return None - + def cache_llm_result(self, job_url: str, result: Dict): if not self.redis_client: return - + try: self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result)) except Exception as e: logger.error(f"Redis error caching LLM result: {e}") - + def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str): if not self.redis_client: return - + try: error_data = { "job_url": job_url, @@ -145,14 +139,12 @@ class MultiPlatformJobScraper: await self.close_browser() except: await self.close_browser() - if self.browser is None: try: profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) - self.pw = await async_playwright().start() self.browser = await self.pw.chromium.launch( headless=True, @@ -171,14 +163,13 @@ class MultiPlatformJobScraper: async def create_fresh_context(self): if self.browser is None: await self.init_browser() - + try: await self.browser.new_page() except Exception: logger.warning("Browser appears dead. Reinitializing...") await self.close_browser() await self.init_browser() - profile = self.engine._select_profile() context = await AsyncNewContext(self.browser, fingerprint=profile) await context.add_init_script(f""" @@ -192,7 +183,7 @@ class MultiPlatformJobScraper: ) await context.add_init_script(spoof_script) return context - + async def close_browser(self): if self.browser: try: @@ -214,7 +205,7 @@ class MultiPlatformJobScraper: return await element.text_content() except: return "Unknown" - + async def _human_click(self, page, element, wait_after: bool = True): if not element: return False @@ -228,7 +219,7 @@ class MultiPlatformJobScraper: return True except: return False - + async def _extract_page_content_for_llm(self, page) -> str: speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) @@ -236,23 +227,23 @@ class MultiPlatformJobScraper: await self.engine._human_like_scroll(page) await asyncio.sleep(2 * (speed / 2)) return await page.content() - + async def _is_job_seen(self, job_id: str) -> bool: return self.redis_manager.is_job_seen(job_id) - + async def _mark_job_seen(self, job_id: str): self.redis_manager.mark_job_seen(job_id) async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: return self.redis_manager.get_cached_llm_result(job_url) - + async def _cache_llm_result(self, job_url: str, result: Dict): self.redis_manager.cache_llm_result(job_url, result) async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})") self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type) - + def _get_platform(self, url: str) -> str: if "ashbyhq.com" in url: return "ashby" @@ -262,7 +253,7 @@ class MultiPlatformJobScraper: return "greenhouse" else: return "unknown" - + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def scrape_job( self, @@ -274,43 +265,36 @@ class MultiPlatformJobScraper: if platform == "unknown": logger.info(f"⏭️ Skipping unsupported platform: {job_url}") return True - job_id = job_url.strip("/").split("/")[-1] if await self._is_job_seen(job_id): logger.info(f"⏭️ Skipping already processed job: {job_id}") return True - cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"📦 Using cached LLM result for: {job_url}") await self.llm_agent.save_job_data(cached_result, company_name) await self._mark_job_seen(job_id) return True - context = None page = None start_time = time.time() try: context = await self.create_fresh_context() page = await context.new_page() - timeout_ms = self.engine.optimization_params.get("request_timeout", 120000) temp_fetcher = StealthyFetcher(self.engine, self.browser, context) - fetch_timeout = 60000 if platform == "lever" else timeout_ms job_page = await asyncio.wait_for( temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), timeout=fetch_timeout / 1000.0 ) - # Check if job still exists (minimal content validation) page_content = await job_page.content() - if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" + if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) return False - if platform == "ashby": try: await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) @@ -324,10 +308,9 @@ class MultiPlatformJobScraper: await job_page.wait_for_selector("div.job-desc, section", timeout=60000) except Exception: pass - # Extract page content for initial validation page_content = await self._extract_page_content_for_llm(job_page) - + # Check for job expiration or unavailability indicators page_text_lower = page_content.lower() job_unavailable_indicators = [ @@ -340,20 +323,19 @@ class MultiPlatformJobScraper: "job is no longer active", "this position is no longer open" ] - + if any(indicator in page_text_lower for indicator in job_unavailable_indicators): logger.error(f"❌ Job no longer available/expired: {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) return False - # 🔑 APPLY TYPE LOGIC if platform in ["ashby", "lever", "greenhouse"]: - apply_type = 'AI' # Always AI for these platforms + apply_type = 'AI' # Always AI for these platforms else: # For other platforms: check if form is accessible without login apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") - apply_type = 'signup' # default + apply_type = 'signup' # default if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) @@ -368,11 +350,9 @@ class MultiPlatformJobScraper: apply_type = 'signup' else: apply_type = 'signup' - final_url = job_url # Hardcode posted_date to Dec 1st 2025 posted_date = "12/01/25" - raw_data = { "page_content": page_content, "url": final_url, @@ -380,34 +360,32 @@ class MultiPlatformJobScraper: "search_keywords": company_name, "posted_date": posted_date } - llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2) refined_data = await asyncio.wait_for( self.llm_agent.refine_job_data(raw_data, self.user_request), timeout=llm_timeout ) - success = False - if refined_data and refined_data.get("title", "N/A") != "N/A": - # Define all compulsory fields that must be present and valid - compulsory_fields = ['company_name', 'job_id', 'url', 'title', 'description'] + if refined_data: + # Define compulsory fields that must be present and valid + compulsory_fields = ['title', 'company_name', 'description', 'job_id', 'url'] - # Validate all compulsory fields + # Check if ALL compulsory fields are present and valid BEFORE any processing missing_fields = [] for field in compulsory_fields: - field_value = refined_data.get(field, "").strip() - if not field_value or field_value in ["N/A", "Unknown", ""]: + field_value = refined_data.get(field, "") + if not field_value or str(field_value).strip() in ["", "N/A", "Unknown", "Not provided", "Not available", "Company", "Job"]: missing_fields.append(field) - # If any compulsory field or description is missing, discard the job + # If any compulsory field is missing, discard the job immediately if missing_fields: - logger.error(f"❌ Job discarded - missing compulsory fields {missing_fields}: {final_url}") - await self._add_job_to_redis_cache(final_url, job_id, "job_not_found") - self.engine.report_outcome("job_not_found", url=final_url) + logger.error(f"❌ Job discarded - missing compulsory fields: {', '.join(missing_fields)} : {final_url}") + error_type = "missing_compulsory_fields" + await self._add_job_to_redis_cache(final_url, job_id, error_type) + self.engine.report_outcome(error_type, url=final_url) return False - - # If we get here, all compulsory fields are present and valid - # Update with additional metadata + + # If we get here, all compulsory fields are valid - now add additional metadata refined_data.update({ 'apply_type': apply_type, 'scraped_at': datetime.now().isoformat(), @@ -416,11 +394,12 @@ class MultiPlatformJobScraper: 'message_id': message_id, 'platform': platform }) - + + # Save to database and markdown await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) await self._mark_job_seen(job_id) - + response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") @@ -429,9 +408,7 @@ class MultiPlatformJobScraper: logger.warning(f"🟡 LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_url) - return success - except asyncio.TimeoutError: logger.error(f"⏰ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") @@ -442,7 +419,7 @@ class MultiPlatformJobScraper: if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg: logger.warning("Browser connection lost. Forcing reinitialization.") await self.close_browser() - + # 🔍 Distinguish job-not-found vs other errors if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg: logger.error(f"❌ Job no longer exists (404/network error): {job_url}") @@ -457,7 +434,7 @@ class MultiPlatformJobScraper: error_type = "llm_failure" else: error_type = "scraping_error" - + logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") await self._add_job_to_redis_cache(job_url, job_id, error_type) self.engine.report_outcome(error_type, url=job_url) @@ -468,7 +445,6 @@ class MultiPlatformJobScraper: await context.close() except Exception: pass - # Global metrics METRICS = { "processed": 0, @@ -477,18 +453,14 @@ METRICS = { "skipped": 0, "start_time": time.time() } - - async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body): try: job_data = json.loads(body) job_link = job_data['job_link'] company_name = job_data['company_name'] message_id = properties.message_id or f"msg_{int(time.time()*1000)}" - logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") success = await scraper.scrape_job(job_link, company_name, message_id) - METRICS["processed"] += 1 if success: METRICS["success"] += 1 @@ -502,14 +474,10 @@ async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, pr METRICS["failed"] += 1 finally: ch.basic_ack(delivery_tag=method.delivery_tag) - - def callback_wrapper(scraper: MultiPlatformJobScraper): def callback(ch, method, properties, body): asyncio.run(process_message_async(scraper, ch, method, properties, body)) return callback - - def start_consumer(): engine = FingerprintScrapingEngine( seed="multiplatform_scraper", @@ -517,32 +485,27 @@ def start_consumer(): num_variations=10 ) scraper = MultiPlatformJobScraper(engine) - connection = None for attempt in range(5): try: parameters = create_ssl_connection_parameters() - + if RABBITMQ_SSL_ENABLED: logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}") else: logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}") - connection = pika.BlockingConnection(parameters) break except Exception as e: logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}") time.sleep(2 ** attempt) - if not connection: logger.error("Failed to connect to RabbitMQ after retries") return - channel = connection.channel() channel.queue_declare(queue='job_queue', durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) - logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C') try: channel.start_consuming() @@ -551,7 +514,5 @@ def start_consumer(): channel.stop_consuming() connection.close() asyncio.run(scraper.close_browser()) - - if __name__ == "__main__": start_consumer() \ No newline at end of file