From b13d14d26d6d47614885332f1b43f121bf6169f9 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Fri, 12 Dec 2025 21:14:37 +0100 Subject: [PATCH] Enhance job handling in scraper and sender modules: - Update fetch timeout in StealthyFetcher for improved reliability. - Refactor LLMJobRefiner to create and manage Quelah Jobs table in PostgreSQL. - Modify RedisManager to track sent job counts for jobs.csv and adjust deduplication logic. - Implement job URL-based deduplication across scraper and sender. --- fetcher.py | 4 +- llm_agent.py | 151 +++++++++++++++++--------------------- scraper.py | 201 +++++++++++++++++++++++++++++++++------------------ sender.py | 53 ++++++++++---- 4 files changed, 240 insertions(+), 169 deletions(-) diff --git a/fetcher.py b/fetcher.py index e6d8405..699082c 100644 --- a/fetcher.py +++ b/fetcher.py @@ -23,7 +23,7 @@ class StealthyFetcher: try: page = await self.context.new_page() # Use networkidle for all platforms - works reliably for Ashby, Lever, and Greenhouse - await page.goto(url, wait_until='domcontentloaded', timeout=min(timeout, 60000)) + await page.goto(url, wait_until='domcontentloaded', timeout=min(timeout, 120000)) # Skip human behavior for Lever (already loads fully without it) if "lever.co" not in url: @@ -68,7 +68,7 @@ class StealthyFetcher: async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool: try: - await page.wait_for_selector("body", timeout=60000) + await page.wait_for_selector("body", timeout=120000) body_text = await page.eval_on_selector("body", "el => el.innerText.toLowerCase()") if len(body_text.strip()) < 100: return False diff --git a/llm_agent.py b/llm_agent.py index 65d5120..da3e0ba 100644 --- a/llm_agent.py +++ b/llm_agent.py @@ -39,39 +39,29 @@ class LLMJobRefiner: self._init_db() def _init_db(self): - """Initialize PostgreSQL database connection and create table""" + """Initialize PostgreSQL database connection and create Quelah Jobs table""" try: - self.db_url = os.getenv("DB_URL") - if self.db_url and "supabase.com" in self.db_url: - conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password - ) - else: - conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password ) cursor = conn.cursor() - # Create table if it doesn't exist + # ✅ CREATE NEW TABLE: quelah_jobs (no requirements field) cursor.execute(''' - CREATE TABLE IF NOT EXISTS jobs ( + CREATE TABLE IF NOT EXISTS quelah_jobs ( id SERIAL PRIMARY KEY, title TEXT, company_name TEXT, location TEXT, description TEXT, - requirements TEXT, qualifications TEXT, salary_range TEXT, nature_of_work TEXT, + apply_type TEXT DEFAULT 'signup', job_id TEXT UNIQUE, url TEXT, category TEXT, @@ -81,27 +71,22 @@ class LLMJobRefiner: ) ''') - # Add apply_type column if it doesn't exist + # Ensure uniqueness constraint cursor.execute(''' - ALTER TABLE jobs - ADD COLUMN IF NOT EXISTS apply_type TEXT DEFAULT 'signup' + ALTER TABLE quelah_jobs DROP CONSTRAINT IF EXISTS quelah_jobs_job_id_key; + ALTER TABLE quelah_jobs ADD CONSTRAINT quelah_jobs_job_id_key UNIQUE (job_id); ''') - # Ensure the uniqueness constraint exists - cursor.execute(''' - ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key; - ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id); - ''') - - cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON jobs(posted_date)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_apply_type ON jobs(apply_type)') + # Create indexes + cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_job_id ON quelah_jobs(job_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_category ON quelah_jobs(category)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_posted_date ON quelah_jobs(posted_date)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_apply_type ON quelah_jobs(apply_type)') conn.commit() cursor.close() conn.close() - print("✅ PostgreSQL database initialized successfully") + print("✅ Quelah Jobs table initialized successfully") except Exception as e: print(f"❌ Database initialization error: {e}") raise @@ -111,18 +96,18 @@ class LLMJobRefiner: try: soup = BeautifulSoup(html_content, 'html.parser') - # Remove unwanted elements + # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'noscript']): element.decompose() - # Keep only main content containers (platform-specific) + # Keep only main content containers main_content = None candidates = [ soup.find('main'), soup.find('div', class_=re.compile(r'job|posting|content')), soup.find('article'), soup.body - ] + ] for candidate in candidates: if candidate: @@ -132,22 +117,22 @@ class LLMJobRefiner: if not main_content: main_content = soup.body or soup - # Extract text with some structure + # Extract text with some structure lines = [] for elem in main_content.descendants: if isinstance(elem, str): text = elem.strip() if text and len(text) > 5: # Skip short fragments lines.append(text) - elif elem.name in ['h1', 'h2', 'h3', 'h4', 'p', 'li', 'strong', 'b']: - text = elem.get_text().strip() - if text: - lines.append(text) + elif elem.name in ['h1', 'h2', 'h3', 'h4', 'p', 'li', 'strong', 'b']: + text = elem.get_text().strip() + if text: + lines.append(text) - # Join with newlines for better LLM parsing + # Join with newlines for better LLM parsing cleaned = '\n'.join(lines) - # Limit length for LLM context + # Limit length for LLM context if len(cleaned) > 10000: cleaned = cleaned[:10000] + "..." @@ -176,9 +161,9 @@ class LLMJobRefiner: cleaned_content = self._clean_html_for_llm(page_content) job_id = raw_data.get('job_id', 'unknown') url = raw_data.get('url', 'N/A') - posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) + posted_date = raw_data.get('posted_date', "12/01/25") # ✅ Fixed date - # Detect platform from URL + # Detect platform from URL (for prompt only) platform = "unknown" if "ashbyhq.com" in url: platform = "ashby" @@ -187,7 +172,7 @@ class LLMJobRefiner: elif "greenhouse.io" in url: platform = "greenhouse" - # Platform-specific instructions + # Platform-specific instructions platform_instructions = "" if platform == "ashby": platform_instructions = """ @@ -195,7 +180,7 @@ class LLMJobRefiner: - Title is usually in

or

- Company name is often in or header - Description is in
or
- - Look for sections like "About Us", "What you'll do", "Requirements", "Benefits" + - Look for sections like "About Us", "What you'll do", "Qualifications", "Benefits" - Location may be in near job title or in metadata """ elif platform == "lever": @@ -226,17 +211,18 @@ CRITICAL INSTRUCTIONS: FIELD RULES: - description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists. - qualifications: MUST include ALL required skills, experience, education, and preferred qualifications. Combine them. -- requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description. - location: Extract city, state, or remote status if available. - salary_range: Extract if explicitly mentioned (e.g., "$70,000–$85,000"). - nature_of_work: Extract if mentioned (e.g., "Part-time", "Remote", "On-site"). REQUIRED FIELDS (must have valid values, never "N/A"): -- title, company_name, job_id, url +- title, company_name, job_id, url, description -OPTIONAL FIELDS (can be "Not provided"): +OPTIONAL FIELDS (can be "Not provided" if the information is actually not provided): - location, salary_range, nature_of_work +⚠️ IMPORTANT: Do NOT include or extract a "requirements" field. Focus only on description and qualifications. + Page Content: {cleaned_content} @@ -258,19 +244,19 @@ Response format (ONLY return this JSON): response_text = await asyncio.get_event_loop().run_in_executor( None, lambda: self._generate_content_sync(prompt) - ) + ) refined_data = self._parse_llm_response(response_text) if not refined_data: return None - # Validate required fields - required_fields = ['title', 'company_name', 'job_id', 'url'] + # Validate required fields + required_fields = ['title', 'company_name', 'job_id', 'url', 'description'] for field in required_fields: if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]: return None - # Add the posted_date to the refined data + # Add the fixed posted_date refined_data['posted_date'] = posted_date return refined_data @@ -291,79 +277,78 @@ Response format (ONLY return this JSON): except json.JSONDecodeError: return None - async def save_job_data(self, job_data: Dict[str, Any], keyword: str): - await self._save_to_db(job_data) - await self._save_to_markdown(job_data, keyword) + async def save_job_data(self, job_data: Dict[str, Any], keyword: str, platform: str = "quelah"): + """Save ALL jobs to Quelah Jobs table and markdown""" + await self._save_to_db_quelah(job_data) + await self._save_to_markdown_quelah(job_data, keyword) - async def _save_to_db(self, job_data: Dict[str, Any]): - """Save job data to PostgreSQL database with job_id uniqueness""" + async def _save_to_db_quelah(self, job_data: Dict[str, Any]): + """Save job data to Quelah Jobs table""" try: conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password ) cursor = conn.cursor() - # Add apply_type to job_data if not present (default to 'signup') - if 'apply_type' not in job_data: - job_data['apply_type'] = 'signup' + # Set apply_type if not present + apply_type = job_data.get("apply_type", "signup") cursor.execute(''' - INSERT INTO jobs - (title, company_name, location, description, requirements, - qualifications, salary_range, nature_of_work, apply_type, job_id, url, category, scraped_at, posted_date) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + INSERT INTO quelah_jobs + (title, company_name, location, description, qualifications, + salary_range, nature_of_work, apply_type, job_id, url, category, scraped_at, posted_date) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO NOTHING ''', ( job_data.get("title", "N/A"), job_data.get("company_name", "N/A"), job_data.get("location", "N/A"), job_data.get("description", "N/A"), - job_data.get("requirements", "N/A"), job_data.get("qualifications", "N/A"), job_data.get("salary_range", "N/A"), job_data.get("nature_of_work", "N/A"), - job_data.get("apply_type", "signup"), # Default to signup if not provided + apply_type, job_data.get("job_id", "N/A"), job_data.get("url", "N/A"), job_data.get("category", "N/A"), job_data.get("scraped_at"), - job_data.get("posted_date", "N/A") + job_data.get("posted_date", "12/01/25") # Fixed date )) conn.commit() cursor.close() conn.close() - print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") + print(f" 💾 Saved to Quelah Jobs | Job ID: {job_data.get('job_id', 'N/A')}") except Exception as e: print(f"❌ Database save error: {e}") - async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): - os.makedirs("linkedin_jobs", exist_ok=True) - filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") + async def _save_to_markdown_quelah(self, job_data: Dict[str, Any], keyword: str): + os.makedirs("quelah_jobs", exist_ok=True) + filepath = os.path.join("quelah_jobs", "quelah_jobs.md") write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 with open(filepath, "a", encoding="utf-8") as f: if write_header: - f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(f"# Quelah Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") f.write(f"- *Keyword*: {keyword}\n") f.write(f"- *Company*: {job_data.get('company_name', 'N/A')}\n") f.write(f"- *Location*: {job_data.get('location', 'N/A')}\n") f.write(f"- *Nature of Work*: {job_data.get('nature_of_work', 'N/A')}\n") f.write(f"- *Salary Range*: {job_data.get('salary_range', 'N/A')}\n") - f.write(f"- *Apply Type*: {job_data.get('apply_type', 'signup')}\n") # Add apply type to markdown + f.write(f"- *Apply Type*: {job_data.get('apply_type', 'signup')}\n") f.write(f"- *Job ID*: {job_data.get('job_id', 'N/A')}\n") - f.write(f"- *Posted Date*: {job_data.get('posted_date', 'N/A')}\n") + f.write(f"- *Posted Date*: {job_data.get('posted_date', '12/01/25')}\n") # Fixed date f.write(f"- *Category*: {job_data.get('category', 'N/A')}\n") f.write(f"- *Scraped At*: {job_data.get('scraped_at', 'N/A')}\n") f.write(f"- *URL*: <{job_data.get('url', 'N/A')}>\n\n") f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") - f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") + # ✅ REMOVED requirements section f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") f.write("---\n\n") \ No newline at end of file diff --git a/scraper.py b/scraper.py index db1ca57..1b2c302 100644 --- a/scraper.py +++ b/scraper.py @@ -68,22 +68,24 @@ class RedisManager: logger.error(f"Failed to connect to Redis: {e}") self.redis_client = None - def is_job_seen(self, job_id: str) -> bool: + def is_job_seen(self, job_url: str) -> bool: + """✅ CHANGED: Check by job URL instead of job ID""" if not self.redis_client: return False try: - return bool(self.redis_client.exists(f"job_seen:{job_id}")) + return bool(self.redis_client.exists(f"job_seen:{job_url}")) except Exception as e: logger.error(f"Redis error checking job_seen: {e}") return False - def mark_job_seen(self, job_id: str): + def mark_job_seen(self, job_url: str): + """✅ CHANGED: Mark by job URL instead of job ID""" if not self.redis_client: return try: - self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1") + self.redis_client.setex(f"job_seen:{job_url}", 2592000, "1") except Exception as e: logger.error(f"Redis error marking job_seen: {e}") @@ -237,11 +239,13 @@ class MultiPlatformJobScraper: await asyncio.sleep(2 * (speed / 2)) return await page.content() - async def _is_job_seen(self, job_id: str) -> bool: - return self.redis_manager.is_job_seen(job_id) + async def _is_job_seen(self, job_url: str) -> bool: + """✅ Use job URL for deduplication""" + return self.redis_manager.is_job_seen(job_url) - async def _mark_job_seen(self, job_id: str): - self.redis_manager.mark_job_seen(job_id) + async def _mark_job_seen(self, job_url: str): + """✅ Use job URL for marking""" + self.redis_manager.mark_job_seen(job_url) async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: return self.redis_manager.get_cached_llm_result(job_url) @@ -263,6 +267,36 @@ class MultiPlatformJobScraper: else: return "unknown" + def _is_job_expired_or_invalid(self, page_content: str) -> bool: + """Check if job is expired, removed, or has no description""" + content_lower = page_content.lower() + + # Check for JavaScript-only pages + if "you need to enable javascript to run this app" in content_lower: + return True + + invalid_phrases = [ + "job no longer available", + "position has been filled", + "this job has expired", + "page not found", + "404 error", + "job has been closed", + "erweima.png", # Detect spam/ad content + "wocao03.com", + "github.com/wocao01" + ] + + for phrase in invalid_phrases: + if phrase in content_lower: + return True + + # Check for meaningful description content + description_keywords = ['responsibilities', 'requirements', 'description', 'duties', 'role', 'about the'] + has_description = any(kw in content_lower for kw in description_keywords) + + return not has_description + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def scrape_job( self, @@ -271,20 +305,21 @@ class MultiPlatformJobScraper: message_id: str ): platform = self._get_platform(job_url) - if platform == "unknown": - logger.info(f"⏭️ Skipping unsupported platform: {job_url}") - return True + # ✅ ONLY extract job_id from URL job_id = job_url.strip("/").split("/")[-1] - if await self._is_job_seen(job_id): - logger.info(f"⏭️ Skipping already processed job: {job_id}") + + # ✅ Check if already processed BY URL (not job_id) + if await self._is_job_seen(job_url): + logger.info(f"⏭️ Skipping already processed job URL: {job_url}") return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"📦 Using cached LLM result for: {job_url}") - await self.llm_agent.save_job_data(cached_result, company_name) - await self._mark_job_seen(job_id) + # Save to Quelah Jobs - company_name will be overridden by LLM if found + await self.llm_agent.save_job_data(cached_result, company_name, "quelah") + await self._mark_job_seen(job_url) # ✅ Mark by URL return True context = None @@ -298,64 +333,77 @@ class MultiPlatformJobScraper: temp_fetcher = StealthyFetcher(self.engine, self.browser, context) fetch_timeout = 60000 if platform == "lever" else timeout_ms - job_page = await asyncio.wait_for( - temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), - timeout=fetch_timeout / 1000.0 - ) - - # Check if job still exists (minimal content validation) - page_content = await job_page.content() - if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists" - logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}") - await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") - self.engine.report_outcome("job_not_found", url=job_url) - return False - + + # ✅ PLATFORM-SPECIFIC WAIT LOGIC WITH ASHBY FIX if platform == "ashby": - try: - await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) - except Exception: - logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}") - return False - elif platform == "lever": - pass + # Ashby requires JS execution - wait for network idle + job content + job_page = await asyncio.wait_for( + temp_fetcher.fetch_url(job_url, wait_for_selector=None, timeout=fetch_timeout), + timeout=fetch_timeout / 100.0 + ) + if job_page: + # Wait for React hydration (job content to appear) + try: + await job_page.wait_for_function( + "document.querySelector('h1') && document.querySelector('h1').innerText.length > 0", + timeout=120000 + ) + except Exception: + # Fallback: check if we got valid content + content = await job_page.content() + if "you need to enable javascript" in content.lower(): + logger.warning(f"⚠️ Ashby page still shows JS error: {job_url}") + raise Exception("Ashby JS content not loaded") elif platform == "greenhouse": - try: - await job_page.wait_for_selector("div.job-desc, section", timeout=60000) - except Exception: - pass + job_page = await asyncio.wait_for( + temp_fetcher.fetch_url(job_url, wait_for_selector="h1, div.job-desc", timeout=fetch_timeout), + timeout=fetch_timeout / 1000.0 + ) + else: # lever & others + job_page = await asyncio.wait_for( + temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout), + timeout=fetch_timeout / 1000.0 + ) - # 🔑 APPLY TYPE LOGIC + if job_page is None: + logger.error(f"❌ Failed to load page for {job_url}") + await self._add_job_to_redis_cache(job_url, job_id, "page_load_failed") + await self._mark_job_seen(job_url) + return True + + page_content = await job_page.content() + + if self._is_job_expired_or_invalid(page_content): + logger.warning(f"🗑️ Discarding invalid job: {job_url}") + self.engine.report_outcome("job_discarded", url=job_url) + await self._mark_job_seen(job_url) # ✅ Mark by URL + return True + + # Apply type logic if platform in ["ashby", "lever", "greenhouse"]: - apply_type = 'AI' # Always AI for these platforms + apply_type = 'AI' else: - # For other platforms: check if form is accessible without login apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") - apply_type = 'signup' # default + apply_type = 'signup' if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) form = await job_page.query_selector("form, div[class*='application-form']") if form: - # Check for login prompts in form login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'") if not login_indicators: apply_type = 'AI' - else: - apply_type = 'signup' - else: - apply_type = 'signup' final_url = job_url page_content = await self._extract_page_content_for_llm(job_page) - posted_date = datetime.now().strftime("%m/%d/%y") + posted_date = "12/01/25" # Fixed date raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, - "search_keywords": company_name, + "search_keywords": company_name, # Only used if LLM can't find company "posted_date": posted_date } @@ -367,15 +415,18 @@ class MultiPlatformJobScraper: success = False if refined_data and refined_data.get("title", "N/A") != "N/A": - compulsory_fields = ['company_name', 'job_id', 'url'] + # ✅ ONLY job_id, url are guaranteed - everything else from LLM + compulsory_fields = ['job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = final_url - elif field == 'company_name': - refined_data[field] = company_name + + # Company name: prefer LLM extraction, fallback to queue + if not refined_data.get('company_name') or refined_data['company_name'] in ["N/A", "", "Unknown"]: + refined_data['company_name'] = company_name refined_data.update({ 'apply_type': apply_type, @@ -386,42 +437,51 @@ class MultiPlatformJobScraper: 'platform': platform }) - await self.llm_agent.save_job_data(refined_data, company_name) + await self.llm_agent.save_job_data(refined_data, company_name, "quelah") await self._cache_llm_result(job_url, refined_data) - await self._mark_job_seen(job_id) + await self._mark_job_seen(job_url) # ✅ Mark by URL response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) - logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") + logger.info(f"✅ Saved to Quelah Jobs ({platform}): {refined_data['title'][:50]}...") success = True else: logger.warning(f"🟡 LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") + await self._mark_job_seen(job_url) # ✅ Mark by URL self.engine.report_outcome("llm_failure", url=final_url) + return True return success except asyncio.TimeoutError: logger.error(f"⏰ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") + await self._mark_job_seen(job_url) # ✅ Mark by URL self.engine.report_outcome("timeout", url=job_url) - return False + return True + except Exception as e: error_msg = str(e) if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg: logger.warning("Browser connection lost. Forcing reinitialization.") await self.close_browser() - # 🔍 Distinguish job-not-found vs other errors + error_type = "exception" if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg: - logger.error(f"❌ Job no longer exists (404/network error): {job_url}") - await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") - self.engine.report_outcome("job_not_found", url=job_url) + error_type = "job_not_found" else: - logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") - await self._add_job_to_redis_cache(job_url, job_id, "exception") - self.engine.report_outcome("exception", url=job_url) - return False + if "required" in error_msg.lower() or "missing" in error_msg.lower(): + error_type = "missing_fields" + elif "captcha" in error_msg.lower() or "cloudflare" in error_msg.lower(): + error_type = "anti_bot_protection" + + logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}") + await self._add_job_to_redis_cache(job_url, job_id, error_type) + await self._mark_job_seen(job_url) # ✅ Mark by URL + self.engine.report_outcome(error_type, url=job_url) + return True + finally: if context: try: @@ -447,20 +507,19 @@ async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, pr message_id = properties.message_id or f"msg_{int(time.time()*1000)}" logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") - success = await scraper.scrape_job(job_link, company_name, message_id) + _ = await scraper.scrape_job(job_link, company_name, message_id) METRICS["processed"] += 1 - if success: - METRICS["success"] += 1 - else: - METRICS["failed"] += 1 except json.JSONDecodeError: logger.error("❌ Invalid JSON in message") + ch.basic_ack(delivery_tag=method.delivery_tag) METRICS["failed"] += 1 + return except Exception as e: logger.error(f"💥 Unexpected error: {str(e)}") METRICS["failed"] += 1 finally: + # ✅ CRITICAL: Acknowledge ALL messages ch.basic_ack(delivery_tag=method.delivery_tag) @@ -503,7 +562,7 @@ def start_consumer(): channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) - logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C') + logger.info('Waiting for messages (All platforms → Quelah Jobs). To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: diff --git a/sender.py b/sender.py index 905939f..d533488 100644 --- a/sender.py +++ b/sender.py @@ -21,7 +21,7 @@ class RedisManager: """Manages Redis connection and operations for job deduplication.""" def __init__(self): - self.redis_host = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz') + self.redis_host = os.getenv('REDIS_HOST') self.redis_port = int(os.getenv('REDIS_PORT', '6380')) self.redis_password = os.getenv('REDIS_PASSWORD') self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' @@ -65,6 +65,26 @@ class RedisManager: except Exception: pass + # NEW: Track total sent jobs for jobs.csv + def get_jobs_csv_sent_count(self): + if not self.redis_client: + return 0 + try: + count = self.redis_client.get("jobs_csv_sent_count") + return int(count) if count else 0 + except Exception: + return 0 + + def increment_jobs_csv_sent_count(self): + if not self.redis_client: + return + try: + self.redis_client.incr("jobs_csv_sent_count") + # Set 30-day expiry to avoid stale data + self.redis_client.expire("jobs_csv_sent_count", 2592000) + except Exception: + pass + class Sender: def __init__(self, config_file='config.ini'): @@ -201,13 +221,21 @@ class Sender: return False return False - def is_job_seen(self, job_url): + def is_job_seen(self, job_url, filename): + """Custom dedup logic: disable for jobs.csv until 6000 sent""" + if filename == "jobs.csv": + sent_count = self.redis_manager.get_jobs_csv_sent_count() + if sent_count < 6000: + return False # Always resend return self.redis_manager.is_job_seen(job_url) - def mark_job_sent(self, job_url): + def mark_job_sent(self, job_url, filename): self.redis_manager.mark_job_sent(job_url) + if filename == "jobs.csv": + self.redis_manager.increment_jobs_csv_sent_count() def process_csv(self, file_path): + filename = os.path.basename(file_path) try: with open(file_path, 'r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) @@ -217,7 +245,6 @@ class Sender: self.logger.info(f"CSV headers found: {reader.fieldnames}") for row_num, row in enumerate(reader, start=1): - # ✅ IMMEDIATE EXIT CHECK if not self.running: self.logger.info("Shutdown requested during CSV processing. Exiting...") return sent_count @@ -245,7 +272,8 @@ class Sender: skipped_count += 1 continue - if self.is_job_seen(url): + # ✅ Modified: Pass filename to is_job_seen + if self.is_job_seen(url, filename): self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}") skipped_count += 1 continue @@ -256,13 +284,15 @@ class Sender: if self.send_message(message, message_id): sent_count += 1 - self.mark_job_sent(url) + # ✅ Modified: Pass filename to mark_job_sent + self.mark_job_sent(url, filename) else: self.logger.error(f"Failed to send job (row {row_num}): {url}") skipped_count += 1 if (sent_count + skipped_count) % 100 == 0: - self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}") + current_total = self.redis_manager.get_jobs_csv_sent_count() if filename == "jobs.csv" else "N/A" + self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path} (jobs.csv total: {current_total})") self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped") @@ -282,7 +312,7 @@ class Sender: return 0 def find_new_csvs(self): - if not self.running: # ✅ IMMEDIATE EXIT CHECK + if not self.running: return [] if not os.path.exists(self.directory): return [] @@ -300,7 +330,7 @@ class Sender: new_files = self.find_new_csvs() if new_files: for file_path in new_files: - if not self.running: # ✅ IMMEDIATE EXIT CHECK + if not self.running: break self.logger.info(f"Processing {file_path}") sent = self.process_csv(file_path) @@ -308,13 +338,11 @@ class Sender: else: self.logger.info("No new CSV files found") - # Replace blocking sleep with interruptible sleep for _ in range(self.check_interval): if not self.running: break time.sleep(1) except KeyboardInterrupt: - # This should not normally be reached due to signal handler, but added for safety pass finally: if self.connection and self.connection.is_open: @@ -323,7 +351,7 @@ class Sender: def graceful_shutdown(self, signum, frame): self.logger.info("Received shutdown signal. Initiating graceful shutdown...") - self.running = False # This will break all loops + self.running = False if __name__ == '__main__': @@ -344,6 +372,5 @@ if __name__ == '__main__': try: sender.run() except KeyboardInterrupt: - # Fallback in case signal handler doesn't catch it sender.logger.info("KeyboardInterrupt caught in main. Exiting.") sys.exit(0) \ No newline at end of file