From 06f8e8b086b2ba6827c895aa05257e92402bcdb4 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Tue, 30 Dec 2025 12:19:18 +0100 Subject: [PATCH] Increase timeout for selector waits and refine job extraction logic in LLMJobRefiner and CryptoJobScraper --- fetcher.py | 4 +- llm_agent.py | 218 +++++++++++++-------------- main.py | 20 +-- scraper.py | 407 ++++++++++++++++++++++++++++++++++++++------------- 4 files changed, 425 insertions(+), 224 deletions(-) diff --git a/fetcher.py b/fetcher.py index 5418be3..fc0652d 100644 --- a/fetcher.py +++ b/fetcher.py @@ -27,7 +27,7 @@ class StealthyFetcher: if wait_for_selector: try: - await page.wait_for_selector(wait_for_selector, timeout=40000) + await page.wait_for_selector(wait_for_selector, timeout=120000) except PlaywrightTimeoutError: print(f"Selector {wait_for_selector} not found immediately, continuing...") @@ -88,7 +88,7 @@ class StealthyFetcher: async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool: if wait_for_selector: try: - await page.wait_for_selector(wait_for_selector, timeout=40000) + await page.wait_for_selector(wait_for_selector, timeout=120000) return True except PlaywrightTimeoutError: pass diff --git a/llm_agent.py b/llm_agent.py index 2d72299..7d47da2 100644 --- a/llm_agent.py +++ b/llm_agent.py @@ -21,13 +21,12 @@ class LLMJobRefiner: raise ValueError("DEEPSEEK_API_KEY not found in .env file.") # Database credentials from .env - self.db_url = os.getenv("DB_URL") self.db_username = os.getenv("DB_USERNAME") self.db_password = os.getenv("DB_PASSWORD") self.db_host = os.getenv("DB_HOST") self.db_port = os.getenv("DB_PORT") - if not self.db_url or not self.db_username or not self.db_password: + if not self.db_username or not self.db_password: raise ValueError("Database credentials not found in .env file.") # DeepSeek uses OpenAI-compatible API @@ -41,22 +40,12 @@ class LLMJobRefiner: def _init_db(self): """Initialize PostgreSQL database connection and create table""" try: - self.db_url = os.getenv("DB_URL") - if self.db_url and "supabase.com" in self.db_url: - conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password - ) - else: - conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password ) cursor = conn.cursor() @@ -113,8 +102,8 @@ class LLMJobRefiner: text = re.sub(r'\s+', ' ', text) # Limit length for LLM context - if len(text) > 10000: - text = text[:10000] + "..." + if len(text) > 100000: + text = text[:100000] + "..." return text except Exception as e: @@ -128,7 +117,7 @@ class LLMJobRefiner: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], - temperature=0.2, + temperature=0.1, max_tokens=2048, stream=False ) @@ -145,38 +134,52 @@ class LLMJobRefiner: posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) prompt = f""" - You are a job posting data extractor. - - EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT. - - For these critical fields, follow these rules: - - description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists. - - requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist. - - qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist. - - REQUIRED FIELDS (must have valid values, never "N/A"): - - title, company_name, job_id, url - - OPTIONAL FIELDS (can be "Not provided"): - - location, salary_range, nature_of_work - - Page Content: - {cleaned_content} - - Response format (ONLY return this JSON): - {{ - "title": "...", - "company_name": "...", - "location": "...", - "description": "...", - "requirements": "...", - "qualifications": "...", - "salary_range": "...", - "nature_of_work": "...", - "job_id": "{job_id}", - "url": "{url}" - }} - """ +You are an expert job posting data extractor. Your task is to extract AND infer fields from the provided job posting. + +### CORE RULES: +1. **NEVER invent, summarize, or paraphrase** — extract **exact wording** when available. +2. **For critical fields (title, company_name, job_id, url, description):** + - These MUST be present and meaningful. + - If not explicitly stated, **infer from context** (e.g., page title, headings, "About Us", etc.). + - **NEVER return "Not provided" or "N/A" for these fields.** +3. **For optional fields (location, salary_range, etc.):** + - Extract exact text if present. + - If absent but inferable (e.g., "Remote (US)", "Full-time"), **infer it**. + - Only return "Not provided" if truly absent and non-inferable. + +### FIELD DEFINITIONS: +- **title**: The job title. Look in

, page title, or bold headings. +- **company_name**: Company name. Look in logo alt text, footer, "About [X]", or page title. +- **description**: Main job overview, responsibilities, or duties. Combine relevant paragraphs if needed. **Must not be empty.** +- **requirements**: Required skills, experience, or qualifications. +- **qualifications**: Educational or certification requirements. +- **location**: Office location or remote policy. +- **salary_range**: Exact compensation info. +- **nature_of_work**: Employment type (Full-time, Contract, Internship, etc.). + +### OUTPUT FORMAT: +Return ONLY a valid JSON object with these keys: +{{ + "title": "...", + "company_name": "...", + "location": "...", + "description": "...", + "requirements": "...", + "qualifications": "...", + "salary_range": "...", + "nature_of_work": "...", + "job_id": "{job_id}", + "url": "{url}" +}} + +- **Critical fields must NEVER be "Not provided", "N/A", empty, or generic** (e.g., "Company", "Job Title"). +- **Optional fields may be "Not provided" ONLY if truly absent.** +- **Do not include markdown, explanations, or extra text.** +- **Use double quotes for JSON.** + +Page Content: +{cleaned_content} +""" try: response_text = await asyncio.get_event_loop().run_in_executor( @@ -188,31 +191,23 @@ class LLMJobRefiner: if not refined_data: return None - # Validate required fields - required_fields = ['title', 'company_name', 'job_id', 'url'] - for field in required_fields: - if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]: - return None + # Validate critical fields — reject if missing or placeholder + critical_fields = ['title', 'company_name', 'job_id', 'url', 'description'] + for field in critical_fields: + value = refined_data.get(field, "").strip() + if not value or value.lower() in ["n/a", "not provided", "unknown", "company", "job", "title", ""]: + print(f" ❌ Critical field '{field}' is invalid: '{value}'") + return None # This job will NOT be saved — as per requirement - # CRITICAL: Validate content fields - check if they SHOULD exist - content_fields = ['description', 'requirements', 'qualifications'] - cleaned_original = cleaned_content.lower() + # Optional fields: allow "Not provided", but ensure they're strings + optional_fields = ['location', 'requirements', 'qualifications', 'salary_range', 'nature_of_work'] + for field in optional_fields: + if field not in refined_data: + refined_data[field] = "Not provided" + elif not isinstance(refined_data[field], str): + refined_data[field] = str(refined_data[field]) - # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided" - job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master'] - has_job_content = any(indicator in cleaned_original for indicator in job_indicators) - - if has_job_content: - for field in content_fields: - value = refined_data.get(field, "").strip() - if value in ["Not provided", "N/A", ""]: - # LLM failed to extract existing content - print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present") - return None - - # Add the posted_date to the refined data refined_data['posted_date'] = posted_date - return refined_data except Exception as e: @@ -220,15 +215,22 @@ class LLMJobRefiner: return None def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: + # Try to extract JSON from markdown code block json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) if not json_match: - json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + # Try to find raw JSON object + json_match = re.search(r'\{[^{}]*\{[^{}]*\}[^{}]*\}|\{.*\}', response_text, re.DOTALL) if not json_match: return None try: - return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) - except json.JSONDecodeError: + json_str = json_match.group(1) if '```' in response_text else json_match.group(0) + # Clean common issues + json_str = re.sub(r'\s+', ' ', json_str) + json_str = re.sub(r',\s*([\]}\)])', r'\1', json_str) # Remove trailing commas + return json.loads(json_str) + except json.JSONDecodeError as e: + print(f"JSON parsing error: {e}") return None async def save_job_data(self, job_data: Dict[str, Any], keyword: str): @@ -239,11 +241,11 @@ class LLMJobRefiner: """Save job data to PostgreSQL database with job_id uniqueness""" try: conn = psycopg2.connect( - host=self.db_host, - port=self.db_port, - database="postgres", - user=self.db_username, - password=self.db_password + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password ) cursor = conn.cursor() @@ -254,50 +256,50 @@ class LLMJobRefiner: VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO NOTHING ''', ( - job_data.get("title", "N/A"), - job_data.get("company_name", "N/A"), - job_data.get("location", "N/A"), - job_data.get("description", "N/A"), - job_data.get("requirements", "N/A"), - job_data.get("qualifications", "N/A"), - job_data.get("salary_range", "N/A"), - job_data.get("nature_of_work", "N/A"), - job_data.get("job_id", "N/A"), + job_data.get("title", "Not provided"), + job_data.get("company_name", "Not provided"), + job_data.get("location", "Not provided"), + job_data.get("description", "Not provided"), + job_data.get("requirements", "Not provided"), + job_data.get("qualifications", "Not provided"), + job_data.get("salary_range", "Not provided"), + job_data.get("nature_of_work", "Not provided"), + job_data.get("job_id", "unknown"), job_data.get("url", "N/A"), - job_data.get("category", "N/A"), + job_data.get("category", "all"), job_data.get("scraped_at"), - job_data.get("posted_date", "N/A") + job_data.get("posted_date", datetime.now().strftime("%m/%d/%y")) )) conn.commit() cursor.close() conn.close() - print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") + print(f" 💾 Saved job to category '{job_data.get('category', 'all')}' with job_id: {job_data.get('job_id', 'unknown')}") except Exception as e: print(f"❌ Database save error: {e}") async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): - os.makedirs("linkedin_jobs", exist_ok=True) - filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") + os.makedirs("crypto_jobs", exist_ok=True) + filepath = os.path.join("crypto_jobs", "crypto_jobs_scraped.md") write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 with open(filepath, "a", encoding="utf-8") as f: if write_header: - f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") + f.write(f"# Crypto Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(f"## Job: {job_data.get('title', 'Not provided')}\n\n") f.write(f"- **Keyword**: {keyword}\n") - f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n") - f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n") - f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") - f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") - f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") + f.write(f"- **Company**: {job_data.get('company_name', 'Not provided')}\n") + f.write(f"- **Location**: {job_data.get('location', 'Not provided')}\n") + f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'Not provided')}\n") + f.write(f"- **Salary Range**: {job_data.get('salary_range', 'Not provided')}\n") + f.write(f"- **Job ID**: {job_data.get('job_id', 'unknown')}\n") f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n") - f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n") + f.write(f"- **Category**: {job_data.get('category', 'all')}\n") f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n") f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") - f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") - f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") - f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") + f.write(f"### Description\n\n{job_data.get('description', 'Not provided')}\n\n") + f.write(f"### Requirements\n\n{job_data.get('requirements', 'Not provided')}\n\n") + f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'Not provided')}\n\n") f.write("---\n\n") \ No newline at end of file diff --git a/main.py b/main.py index bb7c637..b99d1fe 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ + from scraping_engine import FingerprintScrapingEngine from scraper import CryptoJobScraper # Updated class name import os @@ -20,16 +21,15 @@ async def main(): scraper = CryptoJobScraper(engine, human_speed=1.3, user_request="Extract title, company, location, description, requirements, qualifications, nature of work, and salary") job_titles = [ - "Blockchain Engineer", - "Smart Contract Developer", - "DeFi Analyst", - "Web3 Developer", - "Crypto Researcher", - "Solidity Developer", - "Protocol Engineer", - "Tokenomics Specialist", - "Zero-Knowledge Proof Engineer", - "Crypto Compliance Officer" + "Customer Support", + "Design", + "Engineering", + "Finance", + "Marketing", + "Operations", + "Product", + "Sales" + ] while True: diff --git a/scraper.py b/scraper.py index 10bad5a..54510a2 100644 --- a/scraper.py +++ b/scraper.py @@ -6,10 +6,11 @@ from playwright.async_api import async_playwright, TimeoutError as PlaywrightTim from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re -from fetcher import StealthyFetcher from datetime import datetime import json import redis +from urllib.parse import urlparse +import hashlib class CryptoJobScraper: @@ -25,7 +26,29 @@ class CryptoJobScraper: self.human_speed = human_speed self.user_request = user_request self.llm_agent = LLMJobRefiner() - self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) + self.redis_client = redis.Redis(host='=localhost', port=6379, db=0, decode_responses=True) + + self.FORBIDDEN_ATS_DOMAINS = [ + 'ashby', 'ashbyhq', + 'greenhouse', 'boards.greenhouse.io', + 'gem', 'gem.com', + 'rippling', + 'myworkday', 'myworkdayjobs', + 'smartrecruiters', + 'workable', + 'lever', 'jobs.lever.co', + ] + + self.INVALID_CONTENT_PHRASES = [ + "invalid job url", + "cookie consent", + "privacy policy", + "not a valid job", + "job not found", + "page not found", + "The requested job post could not be found. It may have been removed." + "this page does not contain a job description" + ] async def _human_click(self, page, element, wait_after: bool = True): if not element: @@ -55,60 +78,127 @@ class CryptoJobScraper: matches = sum(1 for kw in keyword_list if kw in title_lower) return matches / len(keyword_list) if keyword_list else 0.0 - async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): - current_links = await page.query_selector_all("a[href*='/job/']") - new_jobs = 0 - - for link in current_links: - href = await link.get_attribute("href") - if not href or not href.startswith("http"): - href = "https://cryptocurrencyjobs.co" + href - job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1] - - if job_id and job_id not in seen_job_ids: - title_element = await link.query_selector("h3, .job-title") - title = (await title_element.inner_text()) if title_element else "Unknown Title" + async def _extract_job_title_from_card(self, card) -> str: + try: + title_selectors = [ + 'h3', 'h2', 'h4', + 'strong', 'span' + ] + for selector in title_selectors: + title_element = await card.query_selector(selector) + if title_element: + title_text = await title_element.inner_text() + if title_text and len(title_text.strip()) > 3: + return title_text.strip() + + card_text = await card.inner_text() + lines = [line.strip() for line in card_text.split('\n') if line.strip()] + if lines: + for line in lines: + if len(line) > 5 and not any(skip in line.lower() for skip in ['today', 'featured', 'yesterday', 'company', 'location']): + return line + return "Unknown Title" + except: + return "Unknown Title" + + async def _collect_job_elements_from_page(self, page, search_keywords: str, seen_slugs): + job_cards = [] + job_found = False + + await asyncio.sleep(3 * self.human_speed) + + try: + await page.wait_for_selector('a[href^="/"][href*="-"]', timeout=60000) + candidates = await page.query_selector_all('a[href^="/"][href*="-"]') + + for link in candidates: + href = await link.get_attribute("href") or "" + href = href.rstrip('/') + if not href or len(href.split('/')) != 3: + continue + if '-' not in href.split('/')[-1]: + continue + slug = href.split('/')[-1] + if len(slug) < 8 or slug.startswith(('login', 'signup', 'about', 'terms')): + continue + + full_url = "https://cryptocurrencyjobs.co" + href if not href.startswith('http') else href + if slug in seen_slugs: + continue + + title = await self._extract_job_title_from_card(link) + if not title or title == "Unknown Title": + title = slug.replace('-', ' ').title() + match_percentage = self._calculate_keyword_match(title, search_keywords) - - if match_percentage >= 0.5: # Lower threshold than LinkedIn - seen_job_ids.add(job_id) - all_job_links.append((href, title)) - new_jobs += 1 - else: - print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") - return new_jobs + if match_percentage >= 0.4 or not search_keywords.strip(): + seen_slugs.add(slug) + job_cards.append((full_url, title, link)) + job_found = True - async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): - current_page = 1 - while True: - print(f"📄 Processing page {current_page}") - new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - print(f" ➕ Found {new_jobs} new job(s) (total: {len(all_job_links)})") + print(f" ✅ Collected {len(job_cards)} valid job links after filtering ({len(candidates)} raw candidates).") - next_btn = await page.query_selector('a[rel="next"]') - if next_btn: - next_url = await next_btn.get_attribute("href") - if next_url and not next_url.startswith("http"): - next_url = "https://cryptocurrencyjobs.co" + next_url - await page.goto(next_url, timeout=120000) - await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) - current_page += 1 - else: - print("🔚 No 'Next' page — stopping pagination.") + except Exception as e: + print(f" ⚠️ Error collecting job cards: {e}") + + if not job_found: + print(" ❌ No valid job listings passed filters.") + + return job_cards + + async def _handle_pagination_and_collect_all(self, page, search_keywords: str, seen_slugs): + all_job_elements = [] + scroll_attempt = 0 + max_scrolls = 40 + prev_count = 0 + + while scroll_attempt < max_scrolls: + print(f" Scroll attempt {scroll_attempt + 1} | Current total jobs: {len(all_job_elements)}") + + page_elements = await self._collect_job_elements_from_page(page, search_keywords, seen_slugs) + all_job_elements.extend(page_elements) + + current_count = len(all_job_elements) + + if current_count == prev_count and scroll_attempt > 3: + print(" 🔚 No new jobs after several scrolls → assuming end of list.") break - async def _extract_job_posted_date(self, page) -> str: + prev_count = current_count + + await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + await asyncio.sleep(random.uniform(2.5, 5.5) * self.human_speed) + + try: + load_more = await page.query_selector( + 'button:has-text("Load more"), button:has-text("More"), div[role="button"]:has-text("Load"), a:has-text("Load more")' + ) + if load_more: + print(" Found 'Load more' button → clicking...") + await self._human_click(page, load_more) + await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed) + except: + pass + + scroll_attempt += 1 + + print(f" Finished scrolling → collected {len(all_job_elements)} unique job links.") + return all_job_elements + + async def _extract_job_posted_date_from_card(self, card) -> str: try: - date_element = await page.query_selector(".job-posted-date, .job-date, time") - if date_element: - date_text = await date_element.inner_text() - if "Today" in date_text: - return datetime.now().strftime("%m/%d/%y") - elif "Yesterday" in date_text: - yesterday = datetime.now().replace(day=datetime.now().day - 1) - return yesterday.strftime("%m/%d/%y") - else: - return datetime.now().strftime("%m/%d/%y") + card_text = await card.inner_text() + if "Today" in card_text: + return datetime.now().strftime("%m/%d/%y") + elif "Yesterday" in card_text: + from datetime import timedelta + return (datetime.now() - timedelta(days=1)).strftime("%m/%d/%y") + else: + match = re.search(r'(\d+)d', card_text) + if match: + days = int(match.group(1)) + from datetime import timedelta + return (datetime.now() - timedelta(days=days)).strftime("%m/%d/%y") except: pass return datetime.now().strftime("%m/%d/%y") @@ -126,15 +216,62 @@ class CryptoJobScraper: except Exception as e: print(f" ❌ Failed to add job to Redis cache: {str(e)}") + async def _is_forbidden_ats_url(self, url: str) -> bool: + url_lower = url.lower() + return any(domain in url_lower for domain in self.FORBIDDEN_ATS_DOMAINS) + + async def _is_invalid_job_page(self, page_content: str) -> bool: + content_lower = page_content.lower() + return any(phrase in content_lower for phrase in self.INVALID_CONTENT_PHRASES) + + def _extract_job_id_from_url(self, url: str) -> Optional[str]: + """ + Extract job ID from URL. Returns ID if it contains at least one digit. + Otherwise, returns None (but does NOT mean skip!). + """ + try: + parsed = urlparse(url) + path_parts = [p for p in parsed.path.split('/') if p] + if not path_parts: + return None + + candidate = path_parts[-1] + candidate = re.split(r'[?#]', candidate)[0] + candidate = re.sub(r'\.html?$', '', candidate) + + if not candidate or not any(c.isdigit() for c in candidate): + return None + + # Avoid title-like strings (with spaces or long words + no structure) + if re.search(r'[A-Za-z]{6,}\s', candidate): + return None + + return candidate + except: + return None + async def scrape_jobs( self, search_keywords: Optional[str], max_pages: int = 1, credentials: Optional[Dict] = None ): - # cryptocurrencyjobs.co uses URL params differently - encoded_keywords = search_keywords.replace(" ", "%20") - search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}" + query = "" + location = "" + if search_keywords and search_keywords.strip(): + parts = search_keywords.split(',', 1) + query = parts[0].strip() + if len(parts) > 1: + location = parts[1].strip() + + clean_query = query.replace(' ', '+') + clean_location = location.replace(' ', '+') + + search_url = "https://cryptocurrencyjobs.co/" + if clean_query: + search_url += f"?query={clean_query}" + if clean_location: + search_url += f"&location={clean_location}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) @@ -156,46 +293,107 @@ class CryptoJobScraper: await context.add_init_script(spoof_script) page = await context.new_page() - - # Fetch main search page - print(f"🔍 Searching for: {search_keywords}") - await page.goto(search_url, wait_until='load', timeout=120000) + print(f"🔍 Searching for: {search_keywords or 'all jobs'}") + print(f" 🔗 URL: {search_url}") + await page.goto(search_url, wait_until='networkidle', timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) - all_job_links = [] - seen_job_ids = set() + try: + await page.wait_for_selector("a[href^='/'][href*='-']", timeout=10000) + except: + print(" ⚠️ No job links found initially, waiting longer...") + await asyncio.sleep(5 * self.human_speed) - print("🔄 Collecting job links from search results...") - await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) - - print(f"✅ Collected {len(all_job_links)} unique job links.") + seen_slugs = set() + all_job_elements = await self._handle_pagination_and_collect_all(page, search_keywords, seen_slugs) + print(f"✅ Collected {len(all_job_elements)} unique job links.") scraped_count = 0 - for idx, (href, title) in enumerate(all_job_links): + for idx, (href, title, job_element) in enumerate(all_job_elements): + job_detail_page = None + apply_page = None + skip_job = False + final_scrape_url = None try: - full_url = href - print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") + print(f" → Processing job {idx+1}/{len(all_job_elements)}: {title}") - fetcher = StealthyFetcher(self.engine, browser, context) - job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1") - if not job_page: - print(f" ❌ Failed to fetch job page {full_url}") - await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure") - self.engine.report_outcome("fetch_failure", url=full_url) + posted_date = await self._extract_job_posted_date_from_card(job_element) + + job_detail_page = await context.new_page() + await job_detail_page.goto(href, wait_until='networkidle', timeout=60000) + await asyncio.sleep(2 * self.human_speed) + + # Check for invalid content + page_content = await job_detail_page.content() + if await self._is_invalid_job_page(page_content): + print(" 🚫 Page contains invalid content → skipping.") + await job_detail_page.close() continue - posted_date = await self._extract_job_posted_date(job_page) + # Try to click apply + apply_clicked = False + apply_selectors = [ + 'a[href*="apply"], a:text("Apply"), a:text("Apply Now"), a:text("Apply here")', + 'button:text("Apply"), button:has-text("Apply")', + '[data-testid="apply-button"], [aria-label*="apply"], [role="button"]:has-text("Apply")', + 'a.btn-apply, .apply-button, .apply-link, a:has-text("Apply")', + 'a[rel="noopener"]:has-text("Apply")', + ] - await self.engine._human_like_scroll(job_page) - await asyncio.sleep(2 * self.human_speed) - page_content = await self._extract_page_content_for_llm(job_page) + for sel in apply_selectors: + apply_elem = await job_detail_page.query_selector(sel) + if apply_elem: + print(f" 🔗 Found Apply element with selector: {sel}") + await self._human_click(job_detail_page, apply_elem, wait_after=True) + apply_clicked = True + break - job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown" + apply_page = job_detail_page + + if apply_clicked: + await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed) + pages = context.pages + new_pages = [p for p in pages if p != job_detail_page and p.url != "about:blank"] + + if new_pages: + candidate_page = new_pages[-1] + new_url = candidate_page.url.strip() + print(f" New tab opened: {new_url}") + + if new_url and await self._is_forbidden_ats_url(new_url): + print(" 🚫 New URL is a forbidden ATS → skipping job.") + if candidate_page != job_detail_page: + await candidate_page.close() + await job_detail_page.close() + skip_job = True + else: + apply_page = candidate_page + else: + print(" No new tab → using original page.") + + if skip_job: + continue + + final_scrape_url = apply_page.url + + # Re-check invalid content on final page + page_content = await self._extract_page_content_for_llm(apply_page) + if await self._is_invalid_job_page(page_content): + print(" 🚫 Final page contains invalid content → skipping.") + if apply_page != job_detail_page: + await apply_page.close() + await job_detail_page.close() + continue + + # Extract job ID — but do NOT fail if missing + job_id = self._extract_job_id_from_url(final_scrape_url) + if not job_id: + # Fallback: hash the URL to create a stable, unique ID + job_id = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12] raw_data = { "page_content": page_content, - "url": full_url, + "url": final_scrape_url, "job_id": job_id, "search_keywords": search_keywords, "posted_date": posted_date @@ -210,44 +408,45 @@ class CryptoJobScraper: if field == 'job_id': refined_data[field] = job_id elif field == 'url': - refined_data[field] = full_url + refined_data[field] = final_scrape_url elif field == 'company_name': refined_data[field] = "Unknown Company" - - refined_data['scraped_at'] = datetime.now().isoformat() - refined_data['category'] = search_keywords - refined_data['posted_date'] = posted_date - await self.llm_agent.save_job_data(refined_data, search_keywords) - scraped_count += 1 - print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") - self.engine.report_outcome("success", url=raw_data["url"]) - else: - print(f" 🟡 Could not extract meaningful data from: {full_url}") - await self._add_job_to_redis_cache(full_url, job_id, "llm_failure") - self.engine.report_outcome("llm_failure", url=raw_data["url"]) - await job_page.close() + refined_data['scraped_at'] = datetime.now().isoformat() + refined_data['category'] = search_keywords or "all" + refined_data['posted_date'] = posted_date + await self.llm_agent.save_job_data(refined_data, search_keywords or "all") + scraped_count += 1 + print(f" ✅ Scraped: {refined_data['title'][:50]}... (Job ID: {job_id})") + self.engine.report_outcome("success", url=final_scrape_url) + else: + print(f" 🟡 Could not extract meaningful data from: {final_scrape_url}") + await self._add_job_to_redis_cache(final_scrape_url, job_id, "llm_failure") + self.engine.report_outcome("llm_failure", url=final_scrape_url) + + if apply_page != job_detail_page and not apply_page.is_closed(): + await apply_page.close() + if job_detail_page and not job_detail_page.is_closed(): + await job_detail_page.close() except Exception as e: error_msg = str(e)[:100] print(f" ⚠️ Failed on job {idx+1}: {error_msg}") - job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown" - job_url = full_url if 'full_url' in locals() else "unknown" - await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") - if 'job_page' in locals() and job_page: - await job_page.close() + job_id_for_log = "unknown" + if 'final_scrape_url' in locals() and final_scrape_url: + job_id_for_log = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12] + await self._add_job_to_redis_cache(href, job_id_for_log, f"exception: {error_msg}") + if job_detail_page and not job_detail_page.is_closed(): + await job_detail_page.close() + if 'apply_page' in locals() and apply_page and apply_page != job_detail_page and not apply_page.is_closed(): + await apply_page.close() continue - finally: - print(" ↩️ Returning to search results...") - await page.goto(search_url, timeout=120000) - await asyncio.sleep(4 * self.human_speed) - await browser.close() if scraped_count > 0: self.engine.report_outcome("success") - print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") + print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords or 'all jobs'}'.") else: self.engine.report_outcome("scraping_error") print("⚠️ No jobs processed successfully.")