From 2c5b42b7bd703d83cb8096c9555124baf91d59a0 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Mon, 15 Dec 2025 09:08:27 +0100 Subject: [PATCH] Refactor job tracking to use job ID instead of job URL in RedisManager methods --- scraper.py | 242 ++++++++++++++++++++++++----------------------------- 1 file changed, 111 insertions(+), 131 deletions(-) diff --git a/scraper.py b/scraper.py index 1b2c302..e5ef051 100644 --- a/scraper.py +++ b/scraper.py @@ -68,24 +68,22 @@ class RedisManager: logger.error(f"Failed to connect to Redis: {e}") self.redis_client = None - def is_job_seen(self, job_url: str) -> bool: - """✅ CHANGED: Check by job URL instead of job ID""" + def is_job_seen(self, job_id: str) -> bool: if not self.redis_client: return False try: - return bool(self.redis_client.exists(f"job_seen:{job_url}")) + return bool(self.redis_client.exists(f"job_seen:{job_id}")) except Exception as e: logger.error(f"Redis error checking job_seen: {e}") return False - def mark_job_seen(self, job_url: str): - """✅ CHANGED: Mark by job URL instead of job ID""" + def mark_job_seen(self, job_id: str): if not self.redis_client: return try: - self.redis_client.setex(f"job_seen:{job_url}", 2592000, "1") + self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1") except Exception as e: logger.error(f"Redis error marking job_seen: {e}") @@ -239,13 +237,11 @@ class MultiPlatformJobScraper: await asyncio.sleep(2 * (speed / 2)) return await page.content() - async def _is_job_seen(self, job_url: str) -> bool: - """✅ Use job URL for deduplication""" - return self.redis_manager.is_job_seen(job_url) + async def _is_job_seen(self, job_id: str) -> bool: + return self.redis_manager.is_job_seen(job_id) - async def _mark_job_seen(self, job_url: str): - """✅ Use job URL for marking""" - self.redis_manager.mark_job_seen(job_url) + async def _mark_job_seen(self, job_id: str): + self.redis_manager.mark_job_seen(job_id) async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: return self.redis_manager.get_cached_llm_result(job_url) @@ -267,36 +263,6 @@ class MultiPlatformJobScraper: else: return "unknown" - def _is_job_expired_or_invalid(self, page_content: str) -> bool: - """Check if job is expired, removed, or has no description""" - content_lower = page_content.lower() - - # Check for JavaScript-only pages - if "you need to enable javascript to run this app" in content_lower: - return True - - invalid_phrases = [ - "job no longer available", - "position has been filled", - "this job has expired", - "page not found", - "404 error", - "job has been closed", - "erweima.png", # Detect spam/ad content - "wocao03.com", - "github.com/wocao01" - ] - - for phrase in invalid_phrases: - if phrase in content_lower: - return True - - # Check for meaningful description content - description_keywords = ['responsibilities', 'requirements', 'description', 'duties', 'role', 'about the'] - has_description = any(kw in content_lower for kw in description_keywords) - - return not has_description - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def scrape_job( self, @@ -305,21 +271,20 @@ class MultiPlatformJobScraper: message_id: str ): platform = self._get_platform(job_url) + if platform == "unknown": + logger.info(f"⏭️ Skipping unsupported platform: {job_url}") + return True - # ✅ ONLY extract job_id from URL job_id = job_url.strip("/").split("/")[-1] - - # ✅ Check if already processed BY URL (not job_id) - if await self._is_job_seen(job_url): - logger.info(f"⏭️ Skipping already processed job URL: {job_url}") + if await self._is_job_seen(job_id): + logger.info(f"⏭️ Skipping already processed job: {job_id}") return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"📦 Using cached LLM result for: {job_url}") - # Save to Quelah Jobs - company_name will be overridden by LLM if found - await self.llm_agent.save_job_data(cached_result, company_name, "quelah") - await self._mark_job_seen(job_url) # ✅ Mark by URL + await self.llm_agent.save_job_data(cached_result, company_name) + await self._mark_job_seen(job_id) return True context = None @@ -333,77 +298,86 @@ class MultiPlatformJobScraper: temp_fetcher = StealthyFetcher(self.engine, self.browser, context) fetch_timeout = 60000 if platform == "lever" else timeout_ms - - # ✅ PLATFORM-SPECIFIC WAIT LOGIC WITH ASHBY FIX - if platform == "ashby": - # Ashby requires JS execution - wait for network idle + job content - job_page = await asyncio.wait_for( - temp_fetcher.fetch_url(job_url, wait_for_selector=None, timeout=fetch_timeout), - timeout=fetch_timeout / 100.0 - ) - if job_page: - # Wait for React hydration (job content to appear) - try: - await job_page.wait_for_function( - "document.querySelector('h1') && document.querySelector('h1').innerText.length > 0", - timeout=120000 - ) - except Exception: - # Fallback: check if we got valid content - content = await job_page.content() - if "you need to enable javascript" in content.lower(): - logger.warning(f"⚠️ Ashby page still shows JS error: {job_url}") - raise Exception("Ashby JS content not loaded") - elif platform == "greenhouse": - job_page = await asyncio.wait_for( - temp_fetcher.fetch_url(job_url, wait_for_selector="h1, div.job-desc", timeout=fetch_timeout), - timeout=fetch_timeout / 1000.0 - ) - else: # lever & others - job_page = await asyncio.wait_for( - temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), - timeout=fetch_timeout / 1000.0 - ) - - if job_page is None: - logger.error(f"❌ Failed to load page for {job_url}") - await self._add_job_to_redis_cache(job_url, job_id, "page_load_failed") - await self._mark_job_seen(job_url) - return True + job_page = await asyncio.wait_for( + temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), + timeout=fetch_timeout / 1000.0 + ) + # Check if job still exists (minimal content validation) page_content = await job_page.content() - - if self._is_job_expired_or_invalid(page_content): - logger.warning(f"🗑️ Discarding invalid job: {job_url}") - self.engine.report_outcome("job_discarded", url=job_url) - await self._mark_job_seen(job_url) # ✅ Mark by URL - return True + if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" + logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}") + await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") + self.engine.report_outcome("job_not_found", url=job_url) + return False - # Apply type logic + if platform == "ashby": + try: + await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) + except Exception: + logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}") + return False + elif platform == "lever": + pass + elif platform == "greenhouse": + try: + await job_page.wait_for_selector("div.job-desc, section", timeout=60000) + except Exception: + pass + + # Extract page content for initial validation + page_content = await self._extract_page_content_for_llm(job_page) + + # Check for job expiration or unavailability indicators + page_text_lower = page_content.lower() + job_unavailable_indicators = [ + "job no longer available", + "position has been filled", + "this job has expired", + "job posting has expired", + "no longer accepting applications", + "position is closed", + "job is no longer active", + "this position is no longer open" + ] + + if any(indicator in page_text_lower for indicator in job_unavailable_indicators): + logger.error(f"❌ Job no longer available/expired: {job_url}") + await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") + self.engine.report_outcome("job_not_found", url=job_url) + return False + + # 🔑 APPLY TYPE LOGIC if platform in ["ashby", "lever", "greenhouse"]: - apply_type = 'AI' + apply_type = 'AI' # Always AI for these platforms else: + # For other platforms: check if form is accessible without login apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") - apply_type = 'signup' + apply_type = 'signup' # default if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) form = await job_page.query_selector("form, div[class*='application-form']") if form: + # Check for login prompts in form login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'") if not login_indicators: apply_type = 'AI' + else: + apply_type = 'signup' + else: + apply_type = 'signup' final_url = job_url - page_content = await self._extract_page_content_for_llm(job_page) - posted_date = "12/01/25" # Fixed date + # Hardcode posted_date to Dec 1st 2025 + posted_date = "12/01/25" raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, - "search_keywords": company_name, # Only used if LLM can't find company + "search_keywords": company_name, "posted_date": posted_date } @@ -415,18 +389,23 @@ class MultiPlatformJobScraper: success = False if refined_data and refined_data.get("title", "N/A") != "N/A": - # ✅ ONLY job_id, url are guaranteed - everything else from LLM - compulsory_fields = ['job_id', 'url'] + # Check if description is missing or empty + description = refined_data.get("description", "").strip() + if not description or description in ["N/A", "Unknown", ""]: + logger.error(f"❌ Job discarded - missing description: {final_url}") + await self._add_job_to_redis_cache(final_url, job_id, "job_not_found") + self.engine.report_outcome("job_not_found", url=final_url) + return False + + compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = final_url - - # Company name: prefer LLM extraction, fallback to queue - if not refined_data.get('company_name') or refined_data['company_name'] in ["N/A", "", "Unknown"]: - refined_data['company_name'] = company_name + elif field == 'company_name': + refined_data[field] = company_name refined_data.update({ 'apply_type': apply_type, @@ -437,51 +416,51 @@ class MultiPlatformJobScraper: 'platform': platform }) - await self.llm_agent.save_job_data(refined_data, company_name, "quelah") + await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) - await self._mark_job_seen(job_url) # ✅ Mark by URL + await self._mark_job_seen(job_id) response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) - logger.info(f"✅ Saved to Quelah Jobs ({platform}): {refined_data['title'][:50]}...") + logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") success = True else: logger.warning(f"🟡 LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") - await self._mark_job_seen(job_url) # ✅ Mark by URL self.engine.report_outcome("llm_failure", url=final_url) - return True return success except asyncio.TimeoutError: logger.error(f"⏰ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") - await self._mark_job_seen(job_url) # ✅ Mark by URL self.engine.report_outcome("timeout", url=job_url) - return True - + return False except Exception as e: error_msg = str(e) if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg: logger.warning("Browser connection lost. Forcing reinitialization.") await self.close_browser() - error_type = "exception" + # 🔍 Distinguish job-not-found vs other errors if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg: - error_type = "job_not_found" + logger.error(f"❌ Job no longer exists (404/network error): {job_url}") + await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") + self.engine.report_outcome("job_not_found", url=job_url) else: - if "required" in error_msg.lower() or "missing" in error_msg.lower(): - error_type = "missing_fields" - elif "captcha" in error_msg.lower() or "cloudflare" in error_msg.lower(): - error_type = "anti_bot_protection" - - logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") - await self._add_job_to_redis_cache(job_url, job_id, error_type) - await self._mark_job_seen(job_url) # ✅ Mark by URL - self.engine.report_outcome(error_type, url=job_url) - return True - + # Categorize other errors + error_type = "exception" + if "timeout" in error_msg.lower(): + error_type = "timeout" + elif "llm" in error_msg.lower() or "refine" in error_msg.lower(): + error_type = "llm_failure" + else: + error_type = "scraping_error" + + logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") + await self._add_job_to_redis_cache(job_url, job_id, error_type) + self.engine.report_outcome(error_type, url=job_url) + return False finally: if context: try: @@ -507,19 +486,20 @@ async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, pr message_id = properties.message_id or f"msg_{int(time.time()*1000)}" logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") - _ = await scraper.scrape_job(job_link, company_name, message_id) + success = await scraper.scrape_job(job_link, company_name, message_id) METRICS["processed"] += 1 + if success: + METRICS["success"] += 1 + else: + METRICS["failed"] += 1 except json.JSONDecodeError: logger.error("❌ Invalid JSON in message") - ch.basic_ack(delivery_tag=method.delivery_tag) METRICS["failed"] += 1 - return except Exception as e: logger.error(f"💥 Unexpected error: {str(e)}") METRICS["failed"] += 1 finally: - # ✅ CRITICAL: Acknowledge ALL messages ch.basic_ack(delivery_tag=method.delivery_tag) @@ -562,7 +542,7 @@ def start_consumer(): channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) - logger.info('Waiting for messages (All platforms → Quelah Jobs). To exit press CTRL+C') + logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: