diff --git a/config.py b/config.py index 19f18ad..a352318 100644 --- a/config.py +++ b/config.py @@ -8,9 +8,9 @@ from dotenv import load_dotenv load_dotenv() # LLM Agent Configuration -GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") +GEMINI_API_KEY = os.getenv("XAI_API_KEY") if not GEMINI_API_KEY: - raise ValueError("GEMINI_API_KEY environment variable not set in .env file") + raise ValueError("XAI_API_KEY environment variable not set in .env file") def load_spoof_config(): diff --git a/fetcher.py b/fetcher.py new file mode 100644 index 0000000..0bacc3a --- /dev/null +++ b/fetcher.py @@ -0,0 +1,125 @@ +import asyncio +import random +import time +from playwright.async_api import Page, BrowserContext, Browser, TimeoutError as PlaywrightTimeoutError +from typing import Optional +from scraping_engine import FingerprintScrapingEngine + + +class StealthyFetcher: + def __init__(self, engine: FingerprintScrapingEngine, browser: Browser, context: BrowserContext): + self.engine = engine + self.browser = browser + self.context = context + self.max_retries = 5 + self.base_delay = 5 + + async def fetch_url(self, url: str, wait_for_selector: Optional[str] = None) -> Optional[Page]: + """ + Fetch a URL using stealth techniques, handling Cloudflare and other protections intelligently. + """ + for attempt in range(self.max_retries): + try: + print(f"Attempt {attempt + 1} to fetch {url}") + page = await self.context.new_page() + + await page.goto(url, wait_until='load', timeout=60000) + + if wait_for_selector: + try: + await page.wait_for_selector(wait_for_selector, timeout=10000) + except PlaywrightTimeoutError: + print(f"Selector {wait_for_selector} not found immediately, continuing...") + + await self._apply_human_behavior(page) + + protection_type = await self._detect_protection(page) + if protection_type: + print(f"đŸ›Ąī¸ Protection detected: {protection_type}") + content_accessible = await self._is_content_accessible(page, wait_for_selector) + if not content_accessible: + print("🔒 Content not accessible due to protection.") + handled = False + if protection_type == "cloudflare": + handled = await self._handle_cloudflare(page) + elif protection_type == "captcha": + handled = await self._handle_captcha(page) + if not handled: + print("❌ Failed to handle protection.") + await page.close() + await asyncio.sleep(self.base_delay * (2 ** attempt)) + continue + else: + print("✅ Protection present but content is accessible — proceeding.") + + print(f"✅ Successfully fetched {url}") + return page + + except Exception as e: + print(f"Attempt {attempt + 1} failed for {url}: {str(e)}") + if 'page' in locals(): + await page.close() + await asyncio.sleep(self.base_delay * (2 ** attempt)) + + print(f"❌ Failed to fetch {url} after {self.max_retries} attempts.") + return None + + async def _apply_human_behavior(self, page: Page): + await self.engine._human_like_scroll(page) + await asyncio.sleep(random.uniform(1, 3)) + await self.engine._simulate_human_interaction(page) + await asyncio.sleep(random.uniform(1, 2)) + + async def _detect_protection(self, page: Page) -> Optional[str]: + content = (await page.content()).lower() + if ( + "#cf-chl" in content + or "checking your browser" in content + or "just a moment" in content + or "cloudflare" in content + or "ddos protection" in content + or "turnstile" in content + ): + return "cloudflare" + elif "captcha" in content or "robot" in content or "verify you're human" in content: + return "captcha" + return None + + async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool: + if wait_for_selector: + try: + await page.wait_for_selector(wait_for_selector, timeout=5000) + return True + except PlaywrightTimeoutError: + pass + try: + body_text = await page.eval_on_selector("body", "el => el.innerText.toLowerCase()") + return len(body_text.strip()) > 200 + except: + return False + + async def _handle_captcha(self, page: Page) -> bool: + print("đŸĻž Using 'avoid' strategy for captcha — skipping page.") + return False + + async def _handle_cloudflare(self, page: Page) -> bool: + max_wait_time = 60 + start_time = time.time() + + while time.time() - start_time < max_wait_time: + if not await self._detect_protection(page): + print("â˜ī¸ Cloudflare challenge resolved.") + return True + + print("â˜ī¸ Cloudflare active, waiting...") + await self._apply_human_behavior(page) + wait_time = min(10, 2 + random.uniform(1, 3) + (time.time() - start_time) * 0.1) + await asyncio.sleep(wait_time) + + if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2: + print("🔄 Reloading page during Cloudflare wait...") + await page.reload(wait_until='load', timeout=30000) + + print("⏰ Timeout waiting for Cloudflare resolution.") + return False + \ No newline at end of file diff --git a/job_scraper2.py b/job_scraper2.py index 600f2cb..808ceb6 100644 --- a/job_scraper2.py +++ b/job_scraper2.py @@ -1,14 +1,13 @@ - import asyncio import random import sqlite3 import os -from datetime import datetime -from typing import Optional, Dict, List -from playwright.async_api import async_playwright +from typing import Optional, Dict +from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re +from fetcher import StealthyFetcher class LinkedInJobScraper: @@ -17,12 +16,12 @@ class LinkedInJobScraper: engine, db_path: str = "linkedin_jobs.db", human_speed: float = 1.0, - target_field: str = "all" + user_request: str = "Extract all standard job details" ): self.engine = engine self.db_path = db_path self.human_speed = human_speed - self.target_field = target_field + self.user_request = user_request self._init_db() self.llm_agent = LLMJobRefiner() @@ -61,7 +60,6 @@ class LinkedInJobScraper: return False async def _login(self, page, credentials: Dict) -> bool: - """Human-realistic LinkedIn login""" print("🔐 Navigating to LinkedIn login page...") await page.goto("https://www.linkedin.com/login", timeout=60000) await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed) @@ -107,45 +105,27 @@ class LinkedInJobScraper: return False async def _extract_all_page_content(self, page) -> str: - """Extract all content from the job page""" await asyncio.sleep(2 * self.human_speed) - - # Human-like scrolling to load all content await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) - - # Get the full page content page_content = await page.content() return page_content def _calculate_keyword_match(self, title: str, keywords: str) -> float: - """Calculate percentage of keywords matched in title""" if not title or not keywords: return 0.0 - title_lower = title.lower() keyword_list = [kw.strip().lower() for kw in keywords.split()] - - matches = 0 - for keyword in keyword_list: - if keyword in title_lower: - matches += 1 - + matches = sum(1 for kw in keyword_list if kw in title_lower) return matches / len(keyword_list) if keyword_list else 0.0 def _extract_location_from_keywords(self, search_keywords: str) -> str: - """Extract location from search keywords if present""" location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) - if location_match: - return location_match.group(1).strip().lower() - return "" + return location_match.group(1).strip().lower() if location_match else "" async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): - """Scrape job links from the current page that match keywords and location""" current_links = await page.query_selector_all("a[href*='/jobs/view/']") new_jobs = 0 - - # Extract location from search keywords location_from_keywords = self._extract_location_from_keywords(search_keywords) for link in current_links: @@ -155,22 +135,18 @@ class LinkedInJobScraper: job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href if job_id and job_id not in seen_job_ids: - # Check if job title matches keywords (at least 70% match) title_element = await link.query_selector("span.job-title, h3, .job-card-title") if title_element: title = await title_element.inner_text() match_percentage = self._calculate_keyword_match(title, search_keywords) - - # Check if location matches (if specified in keywords) location_match = True if location_from_keywords: - # Try to get location from the job card location_element = await link.query_selector("span.job-location, .job-card-location, .location") if location_element: location_text = await location_element.inner_text() location_match = location_from_keywords in location_text.lower() - if match_percentage >= 0.7 and location_match: # At least 70% match and location matches + if match_percentage >= 0.7 and location_match: seen_job_ids.add(job_id) all_job_links.append((href, title)) new_jobs += 1 @@ -179,28 +155,22 @@ class LinkedInJobScraper: elif not location_match: print(f" âš ī¸ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})") else: - # If no title element, still add to check later seen_job_ids.add(job_id) all_job_links.append((href, "Unknown Title")) new_jobs += 1 return new_jobs async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): - """Handle pagination by going through pages""" current_page = 1 while True: print(f"📄 Processing page {current_page}") - - # Collect job links on current page new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})") - # Try to go to next page next_btn = await page.query_selector("button[aria-label='Next']") if next_btn and await next_btn.is_enabled(): await self._human_click(page, next_btn) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) - # Wait for URL to change or new content try: await page.wait_for_function("() => window.location.href.includes('start=')", timeout=60000) except: @@ -211,7 +181,6 @@ class LinkedInJobScraper: break async def _handle_infinite_scroll(self, page, search_keywords: str, seen_job_ids, all_job_links): - """Handle infinite scroll to load more jobs""" last_height = await page.evaluate("document.body.scrollHeight") no_new_jobs_count = 0 max_no_new = 3 @@ -221,7 +190,6 @@ class LinkedInJobScraper: await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) new_jobs_found = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - print(f" ➕ Found {new_jobs_found} new job(s) (total: {len(all_job_links)})") new_height = await page.evaluate("document.body.scrollHeight") @@ -241,19 +209,15 @@ class LinkedInJobScraper: max_pages: int = 1, credentials: Optional[Dict] = None ): - # Parse location from keywords if present location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) location = location_match.group(1).strip() if location_match else "" - - # Remove location part from keywords for search clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() encoded_keywords = clean_keywords.replace(" ", "%20") - - # Build search URL with location if specified + search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}" if location: search_url += f"&location={location.replace(' ', '%20')}" - + profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) @@ -261,11 +225,11 @@ class LinkedInJobScraper: async with async_playwright() as pw: browser = await pw.chromium.launch( - headless= False, + headless=False, args=['--disable-blink-features=AutomationControlled'] ) context = await AsyncNewContext(browser, fingerprint=profile) - + await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); @@ -275,6 +239,9 @@ class LinkedInJobScraper: page = await context.new_page() + # Create a temporary fetcher for protection checks on main page + temp_fetcher = StealthyFetcher(self.engine, browser, context) + session_loaded = await self.engine.load_session(context) login_successful = False @@ -301,118 +268,111 @@ class LinkedInJobScraper: elif not credentials: print("â„šī¸ No credentials — proceeding as guest.") login_successful = True - else: - pass await page.wait_for_load_state("load", timeout=60000) print("✅ Post-login page fully loaded. Starting search...") - if await self.engine._detect_cloudflare(page): - print("â˜ī¸ Cloudflare detected on initial load.") - if not await self.engine._handle_cloudflare(page): - print("❌ Cloudflare could not be resolved.") - await browser.close() - self.engine.report_outcome("cloudflare") - return + # >>> PROTECTION CHECK USING FETCHER LOGIC <<< + protection_type = await temp_fetcher._detect_protection(page) + if protection_type: + print(f"đŸ›Ąī¸ Protection detected on initial page: {protection_type}") + content_accessible = await temp_fetcher._is_content_accessible(page) + if not content_accessible: + print("🔒 Content not accessible.") + handled = False + if protection_type == "cloudflare": + handled = await self.engine._handle_cloudflare(page) + elif protection_type == "captcha": + handled = False + if not handled: + await browser.close() + self.engine.report_outcome("protection_block") + return + else: + print("✅ Protection present but content accessible — proceeding.") print(f"🔍 Searching for: {search_keywords}") await page.goto(search_url, wait_until='load', timeout=60000) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) - if await self.engine._detect_cloudflare(page): - print("â˜ī¸ Cloudflare detected on search page.") - if not await self.engine._handle_cloudflare(page): - await browser.close() - self.engine.report_outcome("cloudflare") - return + # >>> PROTECTION CHECK ON SEARCH PAGE <<< + protection_type = await temp_fetcher._detect_protection(page) + if protection_type: + print(f"đŸ›Ąī¸ Protection detected on search page: {protection_type}") + content_accessible = await temp_fetcher._is_content_accessible(page) + if not content_accessible: + print("🔒 Content not accessible.") + handled = False + if protection_type == "cloudflare": + handled = await self.engine._handle_cloudflare(page) + elif protection_type == "captcha": + handled = False + if not handled: + await browser.close() + self.engine.report_outcome("protection_block") + return + else: + print("✅ Protection present but content accessible — proceeding.") all_job_links = [] seen_job_ids = set() - # First, scrape the initial page print("🔄 Collecting initial job links...") initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") - # Loop until no new jobs are found iteration = 1 while True: print(f"🔄 Iteration {iteration}: Checking for new jobs...") - - # First try infinite scroll + prev_job_count = len(all_job_links) await self._handle_infinite_scroll(page, search_keywords, seen_job_ids, all_job_links) new_jobs_count = len(all_job_links) - prev_job_count - + if new_jobs_count > 0: print(f" ➕ Found {new_jobs_count} new jobs via infinite scroll") iteration += 1 - continue # Continue with infinite scroll if new jobs found - - # If no new jobs via scroll, check for pagination + continue + pagination_exists = await page.query_selector("button[aria-label='Next']") - + if pagination_exists: print("â­ī¸ Pagination detected. Processing pages...") await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) iteration += 1 - continue # Continue with pagination if new jobs found + continue else: - # If no pagination and no new jobs from scroll, check by refreshing print("🔄 Refreshing page to check for new results...") await page.reload(wait_until='load') await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) - - # Check for new jobs after refresh + new_jobs_after_refresh = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) if new_jobs_after_refresh > 0: print(f" ➕ Found {new_jobs_after_refresh} new job(s) after refresh") iteration += 1 - continue # Continue if new jobs found after refresh + continue else: print("🔚 No new jobs found after refresh. Stopping.") break - - # Limit iterations to prevent infinite loops + if iteration > 10: print("🔄 Maximum iterations reached. Stopping.") break print(f"✅ Collected {len(all_job_links)} unique job links.") - # Process all collected job links scraped_count = 0 for idx, (href, title) in enumerate(all_job_links): try: full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") - await page.goto(full_url, wait_until='load', timeout=60000) - await asyncio.sleep(3 * self.human_speed) - is_cloudflare = await self.engine._detect_cloudflare(page) - page_content = await page.content() - has_captcha_text = "captcha" in page_content.lower() - captcha_present = is_cloudflare or has_captcha_text - - title_element = await page.query_selector("h1.t-24") - job_data_accessible = title_element is not None - - if captcha_present: - if job_data_accessible: - print(" âš ī¸ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...") - await self.engine._avoid_captcha(page) - else: - print(" âš ī¸ CAPTCHA detected and job data blocked. Attempting recovery...") - if not await self.engine._solve_captcha_fallback(page): - print(" ❌ CAPTCHA recovery failed. Skipping job.") - continue - title_element = await page.query_selector("h1.t-24") - if not title_element: - print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.") - continue - - if not captcha_present: - await self.engine._avoid_captcha(page) + fetcher = StealthyFetcher(self.engine, browser, context) + job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1.t-24") + if not job_page: + print(f" ❌ Failed to fetch job page {full_url} after retries.") + self.engine.report_outcome("fetch_failure", url=full_url) + continue apply_btn = None apply_selectors = [ @@ -422,19 +382,19 @@ class LinkedInJobScraper: "button:has-text('Easy Apply')" ] for selector in apply_selectors: - apply_btn = await page.query_selector(selector) + apply_btn = await job_page.query_selector(selector) if apply_btn: break page_data = None - final_url = full_url + final_url = job_page.url if apply_btn: print(" → Clicking 'Apply' / 'Easy Apply' button...") - + page_waiter = asyncio.create_task(context.wait_for_event("page")) - await self._human_click(page, apply_btn, wait_after=False) - + await self._human_click(job_page, apply_btn, wait_after=False) + external_page = None try: external_page = await asyncio.wait_for(page_waiter, timeout=5.0) @@ -443,68 +403,66 @@ class LinkedInJobScraper: await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(external_page) await asyncio.sleep(2 * self.human_speed) - + page_data = await self._extract_all_page_content(external_page) final_url = external_page.url - + if not external_page.is_closed(): await external_page.close() - + except asyncio.TimeoutError: - print(" đŸ–Ĩī¸ No external tab — scraping LinkedIn job page.") - await page.wait_for_timeout(2000) + print(" đŸ–Ĩī¸ No external tab — scraping LinkedIn job page directly.") + await job_page.wait_for_timeout(2000) try: - await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000) - except: + await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000) + except PlaywrightTimeoutError: pass - await self.engine._human_like_scroll(page) + await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) - page_data = await self._extract_all_page_content(page) - final_url = page.url + page_data = await self._extract_all_page_content(job_page) else: print(" âš ī¸ No 'Apply' button found — scraping job details directly.") - await self.engine._human_like_scroll(page) + await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) - page_data = await self._extract_all_page_content(page) - final_url = page.url + page_data = await self._extract_all_page_content(job_page) - # Extract job ID from URL job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown" - - # Prepare raw data for LLM processing + raw_data = { "page_content": page_data, - "url": final_url, - "job_id": job_id + "url": job_page.url, + "job_id": job_page.url.split("/")[-2] if "/jobs/view/" in job_page.url else "unknown" } - - # Send raw data to LLM agent for refinement - refined_data = await self.llm_agent.refine_job_data(raw_data, search_keywords) - - # Only save if LLM successfully extracted meaningful data + + refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) + if refined_data and refined_data.get("title", "N/A") != "N/A": - # Save refined data to markdown and database through LLM agent await self.llm_agent.save_job_data(refined_data, search_keywords) - scraped_count += 1 print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") + self.engine.report_outcome("success", url=raw_data["url"]) else: print(f" 🟡 Could not extract meaningful data from: {final_url}") + self.engine.report_outcome("llm_failure", url=raw_data["url"]) + + await job_page.close() except Exception as e: print(f" âš ī¸ Failed on job {idx+1}: {str(e)[:100]}") + if 'job_page' in locals() and job_page: + await job_page.close() continue - + finally: print(" â†Šī¸ Returning to LinkedIn search results...") await page.goto(search_url, timeout=60000) await asyncio.sleep(4 * self.human_speed) await browser.close() - + if scraped_count > 0: self.engine.report_outcome("success") - print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}'.") + print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") else: self.engine.report_outcome("captcha") - print("âš ī¸ No jobs processed successfully.") \ No newline at end of file + print("âš ī¸ No jobs processed successfully.") diff --git a/linkedin_main.py b/linkedin_main.py index 04e5b02..6986fc4 100644 --- a/linkedin_main.py +++ b/linkedin_main.py @@ -8,16 +8,17 @@ import asyncio # Load environment variables load_dotenv() + async def main(): engine = FingerprintScrapingEngine( - seed="job_scraping_engine", + seed="job_scraping_123", target_os="windows", db_path="job_listings.db", markdown_path="job_listings.md" ) # Initialize scraper with target field - scraper = LinkedInJobScraper(engine, human_speed=1.6, target_field="Web designer") + scraper = LinkedInJobScraper(engine, human_speed=1.6, user_request="Extract title, company, location, description, requirements, qualifications, nature of job(remote, onsite, hybrid) and salary") await scraper.scrape_jobs( search_keywords="Web Designer location:New York", diff --git a/llm_agent.py b/llm_agent.py index f91ef39..818fc0c 100644 --- a/llm_agent.py +++ b/llm_agent.py @@ -1,166 +1,170 @@ -import google.generativeai as genai -from typing import Dict, Any + +from openai import OpenAI +from typing import Dict, Any, Optional import asyncio import sqlite3 import os from datetime import datetime -from config import GEMINI_API_KEY +import json +import re +from dotenv import load_dotenv + +# ✅ Actually load .env +load_dotenv() class LLMJobRefiner: def __init__(self): - genai.configure(api_key=GEMINI_API_KEY) - self.model = genai.GenerativeModel('gemini-latest-flash') - - async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: - """ - Refine raw job data using Gemini LLM based on target field - """ - prompt = f""" - You are a job data extraction assistant. Extract the following fields from the job posting: - - title - - company_name - - location - - description - - requirements - - qualifications - - salary_range - - nature_of_work (remote, onsite, or hybrid) - - job_id - - Target Field: {target_field} - Raw Page Content: - {raw_data.get('page_content', '')[:6000]} # Limit content size - - Instructions: - 1. Extract only the information relevant to the target field: {target_field} - 2. Clean up any formatting issues in the description - 3. Standardize location format (city, state/country) - 4. Extract salary range if mentioned in description - 5. Determine nature of work (remote, onsite, or hybrid) from work arrangements - 6. Ensure all fields are properly formatted - 7. If a field cannot be found, use "N/A" - 8. Return the refined data in JSON format - - Response format (only return the JSON): - {{ - "title": "...", - "company_name": "...", - "location": "...", - "description": "...", - "requirements": "...", - "qualifications": "...", - "salary_range": "...", - "nature_of_work": "...", - "job_id": "{raw_data.get('job_id', 'unknown')}", - "url": "{raw_data.get('url', 'N/A')}" - }} - """ + xai_api_key = os.getenv("XAI_API_KEY") + if not xai_api_key: + raise ValueError("XAI_API_KEY not found in environment variables.") + self.client = OpenAI(api_key=xai_api_key, base_url="https://api.x.ai/v1") + self.model = "grok-4-latest" + self.extraction_schema_cache = {} + + def generate_content(self, prompt: str, system_message: str = "You are a helpful assistant.") -> str: + """Synchronous method to call Grok via xAI API.""" try: - response = await asyncio.get_event_loop().run_in_executor( - None, - lambda: self.model.generate_content(prompt) + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": prompt} + ], + temperature=0.2, + max_tokens=2048, + stream=False ) - - # Parse the response and return refined data - refined_data = self._parse_llm_response(response.text) - - # If parsing fails, return None + return response.choices[0].message.content or "" + except Exception as e: + print(f"Error in Grok API call: {e}") + return "" + + async def refine_job_data(self, raw_data: Dict[str, Any], user_request: str) -> Optional[Dict[str, Any]]: + page_content = raw_data.get('page_content', '') + if not page_content: + return None + + schema_key = user_request.lower().strip() + extraction_schema = self.extraction_schema_cache.get(schema_key) + if not extraction_schema: + extraction_schema = await self._generate_extraction_schema(user_request) + if extraction_schema: + self.extraction_schema_cache[schema_key] = extraction_schema + else: + extraction_schema = self._get_default_schema() + + prompt = f""" + You are a highly skilled web data extraction assistant. Your task is to analyze the raw HTML content of a job posting page and extract specific information requested by the user. + The user's request is: "{user_request}" + The raw HTML content of the page is provided below (limited in size). The content might be noisy or unstructured. + Your goal is to: + 1. Analyze the HTML structure to identify relevant sections. + 2. Extract the requested information accurately. + 3. Clean up formatting issues. + 4. If a field cannot be found, use "N/A". + 5. Return ONLY the extracted data in a JSON object based on the following schema: + {json.dumps(extraction_schema, indent=2)} + Raw Page Content (HTML): + {page_content[:6000]} + + Respond with the JSON object containing the extracted data. + """ + + try: + # ✅ Use self (current instance), NOT a new LLMJobRefiner() + response_text = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.generate_content(prompt) + ) + refined_data = self._parse_llm_response(response_text) if not refined_data: return None - + + refined_data['job_id'] = raw_data.get('job_id', 'unknown') + refined_data['url'] = raw_data.get('url', 'N/A') return refined_data - except Exception as e: print(f"LLM refinement failed: {str(e)}") return None - - def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: + + async def _generate_extraction_schema(self, user_request: str) -> Optional[Dict[str, str]]: + schema_prompt = f""" + Based on the user's request: "{user_request}", generate a JSON schema for the data they want to extract from a job posting. + The schema should be a dictionary where keys are field names (snake_case) and values are short descriptions. + Include standard fields like title, company_name, location, description, etc., if relevant. + Respond with only the JSON schema. """ - Parse the LLM response to extract refined job data - """ - import json - import re - - # Extract JSON from response (handle markdown code blocks) - json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - # If no code block, try to find JSON directly - json_match = re.search(r'\{.*\}', response_text, re.DOTALL) - if json_match: - json_str = json_match.group(0) - else: - return None - try: + # ✅ Use self.generate_content, NOT self.model.generate_content + schema_text = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.generate_content(schema_prompt) + ) + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', schema_text, re.DOTALL) + if not json_match: + json_match = re.search(r'\{.*\}', schema_text, re.DOTALL) + if not json_match: + return None + + json_str = json_match.group(1) if '```' in schema_text else json_match.group(0) return json.loads(json_str) + except Exception as e: + print(f"Schema generation failed: {str(e)}") + return None + + def _get_default_schema(self) -> Dict[str, str]: + return { + "title": "The job title", + "company_name": "The name of the company", + "location": "The location of the job", + "description": "The full job description", + "requirements": "List of job requirements", + "qualifications": "List of required qualifications", + "salary_range": "The salary range mentioned", + "nature_of_work": "Remote, onsite, or hybrid" + } + + def _parse_llm_response(self, response_text: str) -> Optional[Dict[str, Any]]: + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) + if not json_match: + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if not json_match: + return None + + try: + return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) except json.JSONDecodeError: return None - + async def save_job_data(self, job_data: Dict[str, Any], keyword: str): - """ - Save job data to both markdown and database - """ - # Save to database await self._save_to_db(job_data) - - # Save to markdown await self._save_to_markdown(job_data, keyword) - + async def _save_to_db(self, job_data: Dict[str, Any]): - """ - Save job data to database - """ db_path = "linkedin_jobs.db" - os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) - + os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) with sqlite3.connect(db_path) as conn: cursor = conn.cursor() - cursor.execute(''' - INSERT OR IGNORE INTO jobs - (title, company_name, location, description, requirements, - qualifications, salary_range, nature_of_work, job_id, url) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ''', ( - job_data.get("title", "N/A"), - job_data.get("company_name", "N/A"), - job_data.get("location", "N/A"), - job_data.get("description", "N/A"), - job_data.get("requirements", "N/A"), - job_data.get("qualifications", "N/A"), - job_data.get("salary_range", "N/A"), - job_data.get("nature_of_work", "N/A"), - job_data.get("job_id", "N/A"), - job_data.get("url", "N/A") - )) + fields = list(job_data.keys()) + placeholders = ', '.join(['?' for _ in fields]) + columns = ', '.join([f'"{col}"' for col in fields]) # Escape column names + cursor.execute(f"CREATE TABLE IF NOT EXISTS jobs ({columns})") + cursor.execute(f'INSERT INTO jobs ({columns}) VALUES ({placeholders})', + [job_data.get(field, 'N/A') for field in fields]) conn.commit() - + async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): - """ - Save job data to markdown file - """ os.makedirs("linkedin_jobs", exist_ok=True) - - # Create a single markdown file for all jobs - filename = "linkedin_jobs_scraped.md" - filepath = os.path.join("linkedin_jobs", filename) - + filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") + write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 + with open(filepath, "a", encoding="utf-8") as f: - # Only write header if file is empty - if os.path.getsize(filepath) == 0: + if write_header: f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") f.write(f"- **Keyword**: {keyword}\n") - f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n") - f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n") - f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") - f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") - f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") - f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") - f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") - f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") - f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") - f.write("---\n\n") \ No newline at end of file + for key, value in job_data.items(): + if key != 'title': + f.write(f"- **{key.replace('_', ' ').title()}**: {value}\n") + f.write("\n---\n\n") diff --git a/scraping_engine.py b/scraping_engine.py index 3ace021..87e89bd 100644 --- a/scraping_engine.py +++ b/scraping_engine.py @@ -6,10 +6,12 @@ import hashlib import random import os import json -from typing import List, Optional, Dict +from playwright.async_api import Page +from typing import List, Optional, Dict, Any from browserforge.fingerprints import FingerprintGenerator from dotenv import load_dotenv from config import load_spoof_config +import time # Load environment variables load_dotenv() @@ -45,24 +47,36 @@ class FingerprintScrapingEngine: browser=('chrome',), os=(self.os,) ) - + self.num_variations = num_variations - + # Load spoof config spoof_config = load_spoof_config() self.common_renderers = spoof_config["renderers"] self.common_vendors = spoof_config["vendors"] - # Feedback system self.feedback_file = f"feedback_{seed}.json" + + # Feedback system self.feedback = self._load_feedback() # ← NEW: Session persistence paths self.session_dir = "browser_sessions" os.makedirs(self.session_dir, exist_ok=True) - self.session_path = os.path.join(self.session_dir, f"{seed}_session.json") + self.session_path = os.path.join( + self.session_dir, f"{seed}_session.json") - def _load_feedback(self): + self.optimization_params = { + "base_delay": 2.0, + "max_concurrent_requests": 4, + "request_timeout": 60000, + "retry_attempts": 3, + "captcha_handling_strategy": "avoid", # or "solve_fallback" + "cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload" + } + self._update_params_from_feedback() + + def _load_feedback(self) -> Dict[str, Any]: if os.path.exists(self.feedback_file): try: with open(self.feedback_file, "r") as f: @@ -70,6 +84,8 @@ class FingerprintScrapingEngine: data.setdefault("success_rate", 1.0) data.setdefault("captcha_count", 0) data.setdefault("cloudflare_count", 0) + data.setdefault("avg_response_time", 10.0) # New metric + data.setdefault("failed_domains", {}) # New metrice return data except: pass @@ -79,16 +95,69 @@ class FingerprintScrapingEngine: with open(self.feedback_file, "w") as f: json.dump(self.feedback, f) - def report_outcome(self, outcome: str): + def report_outcome(self, outcome: str, url: Optional[str] = None, response_time: Optional[float] = None): if outcome == "success": - self.feedback["success_rate"] = min(1.0, self.feedback["success_rate"] + 0.1) + self.feedback["success_rate"] = min( + 1.0, self.feedback["success_rate"] + 0.05) # Smaller increment else: - self.feedback["success_rate"] = max(0.1, self.feedback["success_rate"] - 0.2) - if outcome == "captcha": - self.feedback["captcha_count"] += 1 - elif outcome == "cloudflare": - self.feedback["cloudflare_count"] += 1 + self.feedback["success_rate"] = max( + 0.05, self.feedback["success_rate"] - 0.1) # Smaller decrement + + if outcome == "captcha": + self.feedback["captcha_count"] += 1 + # Adapt strategy if many captchas + self.optimization_params["captcha_handling_strategy"] = "solve_fallback" + elif outcome == "cloudflare": + self.feedback["cloudflare_count"] += 1 + # Adjust wait strategy based on frequency + if self.feedback["cloudflare_count"] > 5: + self.optimization_params["cloudflare_wait_strategy"] = "aggressive_reload" + + # Track domain-specific failures + if url and outcome != "success": + domain = url.split("//")[1].split("/")[0] + if domain not in self.feedback["failed_domains"]: + self.feedback["failed_domains"][domain] = 0 + self.feedback["failed_domains"][domain] += 1 + + # Update average response time + if response_time: + prev_avg = self.feedback.get("avg_response_time", 10.0) + # Simple moving average + self.feedback["avg_response_time"] = ( + prev_avg * 0.9) + (response_time * 0.1) + self.save_feedback() + self._update_params_from_feedback() # Update params based on new feedback + + def _update_params_from_feedback(self): + """Adjust optimization parameters based on feedback.""" + sr = self.feedback["success_rate"] + cc = self.feedback["captcha_count"] + cf = self.feedback["cloudflare_count"] + avg_rt = self.feedback.get("avg_response_time", 10.0) + + # Adjust base delay based on success rate and avg response time + if sr < 0.6: + self.optimization_params["base_delay"] = max( + 5.0, self.optimization_params["base_delay"] * 1.2) + elif sr > 0.8: + self.optimization_params["base_delay"] = min( + 3.0, self.optimization_params["base_delay"] * 0.9) + + # Reduce concurrency if many captchas/cloudflares + if cc > 3 or cf > 3: + self.optimization_params["max_concurrent_requests"] = max( + 2, self.optimization_params["max_concurrent_requests"] - 2) + else: + # Reset to default + self.optimization_params["max_concurrent_requests"] = 4 + + # Increase timeout if avg response time is high + if avg_rt > 20: + self.optimization_params["request_timeout"] = 90000 # 90 seconds + + print(f"Optimization Params Updated: {self.optimization_params}") # ← NEW: Save browser context (cookies + localStorage) async def save_session(self, context): @@ -129,7 +198,8 @@ class FingerprintScrapingEngine: if self.feedback["success_rate"] < 0.5: concurrency_options = [8, 4] memory_options = [8] - profile.navigator.hardwareConcurrency = random.choice(concurrency_options) + profile.navigator.hardwareConcurrency = random.choice( + concurrency_options) profile.navigator.deviceMemory = random.choice(memory_options) return profile @@ -244,23 +314,6 @@ class FingerprintScrapingEngine: await asyncio.sleep(random.uniform(0.2, 1.0)) except: pass - - async def _detect_cloudflare(self, page) -> bool: - content = await page.content() - return ( - "#cf-chl" in content or - "checking your browser" in content.lower() or - "just a moment" in content.lower() - ) - - async def _handle_cloudflare(self, page, max_retries: int = 3): - for i in range(max_retries): - if not await self._detect_cloudflare(page): - return True - print(f"â˜ī¸ Cloudflare detected - waiting... (attempt {i+1})") - await asyncio.sleep(8 + random.uniform(2, 5)) - await page.wait_for_load_state("load", timeout=60000) - return False async def _avoid_captcha(self, page) -> bool: await asyncio.sleep(2 + random.random() * 3) @@ -268,7 +321,7 @@ class FingerprintScrapingEngine: await self._simulate_human_interaction(page) await asyncio.sleep(3 + random.random() * 2) return True - + async def _solve_captcha_fallback(self, page) -> bool: await asyncio.sleep(15 + random.random() * 10) captcha_content = await page.content() @@ -283,3 +336,42 @@ class FingerprintScrapingEngine: return True return False + + async def _detect_cloudflare(self, page: Page) -> bool: + """Detect Cloudflare challenges.""" + content = await page.content() + return ( + "#cf-chl" in content + or "checking your browser" in content.lower() + or "just a moment" in content.lower() + or "turnstile" in content.lower() # Check for Cloudflare Turnstile + ) + + async def _handle_cloudflare(self, page: Page) -> bool: + """ + Handle Cloudflare challenges, including Turnstile if present. + This is a simplified approach; real-world handling might require more sophisticated logic or external solvers. + """ + max_wait_time = 60 # Total time to wait for Cloudflare to resolve + start_time = time.time() + + while time.time() - start_time < max_wait_time: + if not await self._detect_cloudflare(page): + print("Cloudflare challenge resolved.") + return True + + print("Cloudflare active, waiting...") + # Simulate more human-like behavior while waiting + await self._simulate_human_interaction(page) + # Wait for a random period, increasing slightly each time + wait_time = min(10, 2 + random.uniform(1, 3) + + (time.time() - start_time) * 0.1) + await asyncio.sleep(wait_time) + + # Reload occasionally to trigger potential client-side checks + if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2: + print("Reloading page during Cloudflare wait...") + await page.reload(wait_until='load', timeout=30000) + + print("Timeout waiting for Cloudflare resolution.") + return False