import asyncio import random from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re from datetime import datetime import json import redis from urllib.parse import urlparse import hashlib class CryptoJobScraper: def __init__( self, engine, db_path: str = "crypto_jobs.db", human_speed: float = 1.0, user_request: str = "Extract all standard job details" ): self.engine = engine self.db_path = db_path self.human_speed = human_speed self.user_request = user_request self.llm_agent = LLMJobRefiner() self.redis_client = redis.Redis(host='=localhost', port=6379, db=0, decode_responses=True) self.FORBIDDEN_ATS_DOMAINS = [ 'ashby', 'ashbyhq', 'greenhouse', 'boards.greenhouse.io', 'gem', 'gem.com', 'rippling', 'myworkday', 'myworkdayjobs', 'smartrecruiters', 'workable', 'lever', 'jobs.lever.co', ] self.INVALID_CONTENT_PHRASES = [ "invalid job url", "cookie consent", "privacy policy", "not a valid job", "job not found", "page not found", "The requested job post could not be found. It may have been removed." "this page does not contain a job description" ] async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * self.human_speed) return True except: return False async def _extract_page_content_for_llm(self, page) -> str: await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) page_content = await page.content() return page_content def _calculate_keyword_match(self, title: str, keywords: str) -> float: if not title or not keywords: return 0.0 title_lower = title.lower() keyword_list = [kw.strip().lower() for kw in keywords.split()] matches = sum(1 for kw in keyword_list if kw in title_lower) return matches / len(keyword_list) if keyword_list else 0.0 async def _extract_job_title_from_card(self, card) -> str: try: title_selectors = [ 'h3', 'h2', 'h4', 'strong', 'span' ] for selector in title_selectors: title_element = await card.query_selector(selector) if title_element: title_text = await title_element.inner_text() if title_text and len(title_text.strip()) > 3: return title_text.strip() card_text = await card.inner_text() lines = [line.strip() for line in card_text.split('\n') if line.strip()] if lines: for line in lines: if len(line) > 5 and not any(skip in line.lower() for skip in ['today', 'featured', 'yesterday', 'company', 'location']): return line return "Unknown Title" except: return "Unknown Title" async def _collect_job_elements_from_page(self, page, search_keywords: str, seen_slugs): job_cards = [] job_found = False await asyncio.sleep(3 * self.human_speed) try: await page.wait_for_selector('a[href^="/"][href*="-"]', timeout=60000) candidates = await page.query_selector_all('a[href^="/"][href*="-"]') for link in candidates: href = await link.get_attribute("href") or "" href = href.rstrip('/') if not href or len(href.split('/')) != 3: continue if '-' not in href.split('/')[-1]: continue slug = href.split('/')[-1] if len(slug) < 8 or slug.startswith(('login', 'signup', 'about', 'terms')): continue full_url = "https://cryptocurrencyjobs.co" + href if not href.startswith('http') else href if slug in seen_slugs: continue title = await self._extract_job_title_from_card(link) if not title or title == "Unknown Title": title = slug.replace('-', ' ').title() match_percentage = self._calculate_keyword_match(title, search_keywords) if match_percentage >= 0.4 or not search_keywords.strip(): seen_slugs.add(slug) job_cards.append((full_url, title, link)) job_found = True print(f" ✅ Collected {len(job_cards)} valid job links after filtering ({len(candidates)} raw candidates).") except Exception as e: print(f" ⚠️ Error collecting job cards: {e}") if not job_found: print(" ❌ No valid job listings passed filters.") return job_cards async def _handle_pagination_and_collect_all(self, page, search_keywords: str, seen_slugs): all_job_elements = [] scroll_attempt = 0 max_scrolls = 40 prev_count = 0 while scroll_attempt < max_scrolls: print(f" Scroll attempt {scroll_attempt + 1} | Current total jobs: {len(all_job_elements)}") page_elements = await self._collect_job_elements_from_page(page, search_keywords, seen_slugs) all_job_elements.extend(page_elements) current_count = len(all_job_elements) if current_count == prev_count and scroll_attempt > 3: print(" 🔚 No new jobs after several scrolls → assuming end of list.") break prev_count = current_count await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(random.uniform(2.5, 5.5) * self.human_speed) try: load_more = await page.query_selector( 'button:has-text("Load more"), button:has-text("More"), div[role="button"]:has-text("Load"), a:has-text("Load more")' ) if load_more: print(" Found 'Load more' button → clicking...") await self._human_click(page, load_more) await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed) except: pass scroll_attempt += 1 print(f" Finished scrolling → collected {len(all_job_elements)} unique job links.") return all_job_elements async def _extract_job_posted_date_from_card(self, card) -> str: try: card_text = await card.inner_text() if "Today" in card_text: return datetime.now().strftime("%m/%d/%y") elif "Yesterday" in card_text: from datetime import timedelta return (datetime.now() - timedelta(days=1)).strftime("%m/%d/%y") else: match = re.search(r'(\d+)d', card_text) if match: days = int(match.group(1)) from datetime import timedelta return (datetime.now() - timedelta(days=days)).strftime("%m/%d/%y") except: pass return datetime.now().strftime("%m/%d/%y") async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): try: job_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data)) print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})") except Exception as e: print(f" ❌ Failed to add job to Redis cache: {str(e)}") async def _is_forbidden_ats_url(self, url: str) -> bool: url_lower = url.lower() return any(domain in url_lower for domain in self.FORBIDDEN_ATS_DOMAINS) async def _is_invalid_job_page(self, page_content: str) -> bool: content_lower = page_content.lower() return any(phrase in content_lower for phrase in self.INVALID_CONTENT_PHRASES) def _extract_job_id_from_url(self, url: str) -> Optional[str]: """ Extract job ID from URL. Returns ID if it contains at least one digit. Otherwise, returns None (but does NOT mean skip!). """ try: parsed = urlparse(url) path_parts = [p for p in parsed.path.split('/') if p] if not path_parts: return None candidate = path_parts[-1] candidate = re.split(r'[?#]', candidate)[0] candidate = re.sub(r'\.html?$', '', candidate) if not candidate or not any(c.isdigit() for c in candidate): return None # Avoid title-like strings (with spaces or long words + no structure) if re.search(r'[A-Za-z]{6,}\s', candidate): return None return candidate except: return None async def scrape_jobs( self, search_keywords: Optional[str], max_pages: int = 1, credentials: Optional[Dict] = None ): query = "" location = "" if search_keywords and search_keywords.strip(): parts = search_keywords.split(',', 1) query = parts[0].strip() if len(parts) > 1: location = parts[1].strip() clean_query = query.replace(' ', '+') clean_location = location.replace(' ', '+') search_url = "https://cryptocurrencyjobs.co/" if clean_query: search_url += f"?query={clean_query}" if clean_location: search_url += f"&location={clean_location}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) async with async_playwright() as pw: browser = await pw.chromium.launch( headless=False, args=['--disable-blink-features=AutomationControlled'] ) context = await AsyncNewContext(browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) await context.add_init_script(spoof_script) page = await context.new_page() print(f"🔍 Searching for: {search_keywords or 'all jobs'}") print(f" 🔗 URL: {search_url}") await page.goto(search_url, wait_until='networkidle', timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) try: await page.wait_for_selector("a[href^='/'][href*='-']", timeout=10000) except: print(" ⚠️ No job links found initially, waiting longer...") await asyncio.sleep(5 * self.human_speed) seen_slugs = set() all_job_elements = await self._handle_pagination_and_collect_all(page, search_keywords, seen_slugs) print(f"✅ Collected {len(all_job_elements)} unique job links.") scraped_count = 0 for idx, (href, title, job_element) in enumerate(all_job_elements): job_detail_page = None apply_page = None skip_job = False final_scrape_url = None try: print(f" → Processing job {idx+1}/{len(all_job_elements)}: {title}") posted_date = await self._extract_job_posted_date_from_card(job_element) job_detail_page = await context.new_page() await job_detail_page.goto(href, wait_until='networkidle', timeout=60000) await asyncio.sleep(2 * self.human_speed) # Check for invalid content page_content = await job_detail_page.content() if await self._is_invalid_job_page(page_content): print(" 🚫 Page contains invalid content → skipping.") await job_detail_page.close() continue # Try to click apply apply_clicked = False apply_selectors = [ 'a[href*="apply"], a:text("Apply"), a:text("Apply Now"), a:text("Apply here")', 'button:text("Apply"), button:has-text("Apply")', '[data-testid="apply-button"], [aria-label*="apply"], [role="button"]:has-text("Apply")', 'a.btn-apply, .apply-button, .apply-link, a:has-text("Apply")', 'a[rel="noopener"]:has-text("Apply")', ] for sel in apply_selectors: apply_elem = await job_detail_page.query_selector(sel) if apply_elem: print(f" 🔗 Found Apply element with selector: {sel}") await self._human_click(job_detail_page, apply_elem, wait_after=True) apply_clicked = True break apply_page = job_detail_page if apply_clicked: await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed) pages = context.pages new_pages = [p for p in pages if p != job_detail_page and p.url != "about:blank"] if new_pages: candidate_page = new_pages[-1] new_url = candidate_page.url.strip() print(f" New tab opened: {new_url}") if new_url and await self._is_forbidden_ats_url(new_url): print(" 🚫 New URL is a forbidden ATS → skipping job.") if candidate_page != job_detail_page: await candidate_page.close() await job_detail_page.close() skip_job = True else: apply_page = candidate_page else: print(" No new tab → using original page.") if skip_job: continue final_scrape_url = apply_page.url # Re-check invalid content on final page page_content = await self._extract_page_content_for_llm(apply_page) if await self._is_invalid_job_page(page_content): print(" 🚫 Final page contains invalid content → skipping.") if apply_page != job_detail_page: await apply_page.close() await job_detail_page.close() continue # Extract job ID — but do NOT fail if missing job_id = self._extract_job_id_from_url(final_scrape_url) if not job_id: # Fallback: hash the URL to create a stable, unique ID job_id = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12] raw_data = { "page_content": page_content, "url": final_scrape_url, "job_id": job_id, "search_keywords": search_keywords, "posted_date": posted_date } refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = final_scrape_url elif field == 'company_name': refined_data[field] = "Unknown Company" refined_data['scraped_at'] = datetime.now().isoformat() refined_data['category'] = search_keywords or "all" refined_data['posted_date'] = posted_date await self.llm_agent.save_job_data(refined_data, search_keywords or "all") scraped_count += 1 print(f" ✅ Scraped: {refined_data['title'][:50]}... (Job ID: {job_id})") self.engine.report_outcome("success", url=final_scrape_url) else: print(f" 🟡 Could not extract meaningful data from: {final_scrape_url}") await self._add_job_to_redis_cache(final_scrape_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_scrape_url) if apply_page != job_detail_page and not apply_page.is_closed(): await apply_page.close() if job_detail_page and not job_detail_page.is_closed(): await job_detail_page.close() except Exception as e: error_msg = str(e)[:100] print(f" ⚠️ Failed on job {idx+1}: {error_msg}") job_id_for_log = "unknown" if 'final_scrape_url' in locals() and final_scrape_url: job_id_for_log = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12] await self._add_job_to_redis_cache(href, job_id_for_log, f"exception: {error_msg}") if job_detail_page and not job_detail_page.is_closed(): await job_detail_page.close() if 'apply_page' in locals() and apply_page and apply_page != job_detail_page and not apply_page.is_closed(): await apply_page.close() continue await browser.close() if scraped_count > 0: self.engine.report_outcome("success") print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords or 'all jobs'}'.") else: self.engine.report_outcome("scraping_error") print("⚠️ No jobs processed successfully.")