import asyncio import random from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re from fetcher import StealthyFetcher from datetime import datetime import json import redis class CryptoJobScraper: def __init__( self, engine, db_path: str = "crypto_jobs.db", human_speed: float = 1.0, user_request: str = "Extract all standard job details" ): self.engine = engine self.db_path = db_path self.human_speed = human_speed self.user_request = user_request self.llm_agent = LLMJobRefiner() self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * self.human_speed) return True except: return False async def _extract_page_content_for_llm(self, page) -> str: await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) page_content = await page.content() return page_content def _calculate_keyword_match(self, title: str, keywords: str) -> float: if not title or not keywords: return 0.0 title_lower = title.lower() keyword_list = [kw.strip().lower() for kw in keywords.split()] matches = sum(1 for kw in keyword_list if kw in title_lower) return matches / len(keyword_list) if keyword_list else 0.0 async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): current_links = await page.query_selector_all("a[href*='/job/']") new_jobs = 0 for link in current_links: href = await link.get_attribute("href") if not href or not href.startswith("http"): href = "https://cryptocurrencyjobs.co" + href job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1] if job_id and job_id not in seen_job_ids: title_element = await link.query_selector("h3, .job-title") title = (await title_element.inner_text()) if title_element else "Unknown Title" match_percentage = self._calculate_keyword_match(title, search_keywords) if match_percentage >= 0.5: # Lower threshold than LinkedIn seen_job_ids.add(job_id) all_job_links.append((href, title)) new_jobs += 1 else: print(f" âš ī¸ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") return new_jobs async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): current_page = 1 while True: print(f"📄 Processing page {current_page}") new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {new_jobs} new job(s) (total: {len(all_job_links)})") next_btn = await page.query_selector('a[rel="next"]') if next_btn: next_url = await next_btn.get_attribute("href") if next_url and not next_url.startswith("http"): next_url = "https://cryptocurrencyjobs.co" + next_url await page.goto(next_url, timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) current_page += 1 else: print("🔚 No 'Next' page — stopping pagination.") break async def _extract_job_posted_date(self, page) -> str: try: date_element = await page.query_selector(".job-posted-date, .job-date, time") if date_element: date_text = await date_element.inner_text() if "Today" in date_text: return datetime.now().strftime("%m/%d/%y") elif "Yesterday" in date_text: yesterday = datetime.now().replace(day=datetime.now().day - 1) return yesterday.strftime("%m/%d/%y") else: return datetime.now().strftime("%m/%d/%y") except: pass return datetime.now().strftime("%m/%d/%y") async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): try: job_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data)) print(f" đŸ“Ļ Added failed job to Redis cache: {job_id} (Error: {error_type})") except Exception as e: print(f" ❌ Failed to add job to Redis cache: {str(e)}") async def scrape_jobs( self, search_keywords: Optional[str], max_pages: int = 1, credentials: Optional[Dict] = None ): # cryptocurrencyjobs.co uses URL params differently encoded_keywords = search_keywords.replace(" ", "%20") search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) async with async_playwright() as pw: browser = await pw.chromium.launch( headless=False, args=['--disable-blink-features=AutomationControlled'] ) context = await AsyncNewContext(browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) await context.add_init_script(spoof_script) page = await context.new_page() # Fetch main search page print(f"🔍 Searching for: {search_keywords}") await page.goto(search_url, wait_until='load', timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) all_job_links = [] seen_job_ids = set() print("🔄 Collecting job links from search results...") await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) print(f"✅ Collected {len(all_job_links)} unique job links.") scraped_count = 0 for idx, (href, title) in enumerate(all_job_links): try: full_url = href print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") fetcher = StealthyFetcher(self.engine, browser, context) job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1") if not job_page: print(f" ❌ Failed to fetch job page {full_url}") await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure") self.engine.report_outcome("fetch_failure", url=full_url) continue posted_date = await self._extract_job_posted_date(job_page) await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) page_content = await self._extract_page_content_for_llm(job_page) job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown" raw_data = { "page_content": page_content, "url": full_url, "job_id": job_id, "search_keywords": search_keywords, "posted_date": posted_date } refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = full_url elif field == 'company_name': refined_data[field] = "Unknown Company" refined_data['scraped_at'] = datetime.now().isoformat() refined_data['category'] = search_keywords refined_data['posted_date'] = posted_date await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") self.engine.report_outcome("success", url=raw_data["url"]) else: print(f" 🟡 Could not extract meaningful data from: {full_url}") await self._add_job_to_redis_cache(full_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=raw_data["url"]) await job_page.close() except Exception as e: error_msg = str(e)[:100] print(f" âš ī¸ Failed on job {idx+1}: {error_msg}") job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown" job_url = full_url if 'full_url' in locals() else "unknown" await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") if 'job_page' in locals() and job_page: await job_page.close() continue finally: print(" â†Šī¸ Returning to search results...") await page.goto(search_url, timeout=120000) await asyncio.sleep(4 * self.human_speed) await browser.close() if scraped_count > 0: self.engine.report_outcome("success") print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") else: self.engine.report_outcome("scraping_error") print("âš ī¸ No jobs processed successfully.")