Web_scraping_project/job_scraper.py
Ofure Ikheloa 458e914d71 feat(scraping): enhance job scraping with session persistence and feedback system
- Add config module for spoof data management
- Implement session persistence to reuse authenticated sessions
- Add feedback system to track success rates and adjust fingerprinting
- Improve job link collection with pagination and scroll detection
- Separate verified/unverified job listings into different folders
- Enhance error handling for CAPTCHA and Cloudflare challenges
2025-11-21 16:51:26 +01:00

492 lines
22 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self._init_db()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
title TEXT,
company TEXT,
location TEXT,
salary TEXT,
description TEXT,
url TEXT UNIQUE,
workplace_type TEXT,
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_job_details(self, page) -> Dict:
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
await asyncio.sleep(2 * self.human_speed)
async def get_text(selector: str) -> str:
try:
el = await page.query_selector(selector)
if el:
text = await el.inner_text()
return text.strip() if text else "N/A"
except:
pass
return "N/A"
title = await get_text("h1.t-24")
if title == "N/A":
title = await get_text("h1, h2")
company = await get_text("a.app-aware-link[href*='/company/']")
if company == "N/A":
company = await get_text("div.org, .company, [class*='company']")
location = await get_text("span[class*='location']")
if location == "N/A":
location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']")
if description == "N/A":
description = await get_text(".job-desc, .description, main, body")
workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A"
return {
"title": title,
"company": company,
"location": location,
"workplace_type": workplace,
"salary": salary,
"description": description,
"url": page.url
}
async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
"""Save to appropriate folder using job ID to avoid duplication"""
folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
os.makedirs(folder, exist_ok=True)
# Extract job ID from URL for LinkedIn jobs
url = job_data.get("url", "")
if "/jobs/view/" in url:
try:
job_id = url.split("/view/")[1].split("/")[0]
except:
job_id = "unknown"
else:
# For external jobs, use a hash of the URL (first 12 chars)
import hashlib
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
filepath = os.path.join(folder, filename)
# Only save if file doesn't already exist (idempotent)
if os.path.exists(filepath):
print(f" 📝 Skipping duplicate Markdown file: {filename}")
return
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{url}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(keyword, title, company, location, salary, description, url, workplace_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
keyword,
job_data["title"],
job_data["company"],
job_data["location"],
job_data["salary"],
job_data["description"],
job_data["url"],
job_data["workplace_type"]
))
conn.commit()
async def scrape_jobs(
self,
search_keywords: str,
max_pages: int = 1,
credentials: Optional[Dict] = None
):
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
scraped_count = 0
all_job_links = []
seen_job_ids = set()
# ← NEW: Scroll once to reveal pagination (if any)
print("🔄 Scrolling to bottom to reveal pagination controls...")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Check if pagination exists
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Using page navigation.")
current_page = 1
while current_page <= max_pages:
print(f"📄 Processing page {current_page}/{max_pages}")
# Collect job links on current page
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs += 1
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
# Try to go to next page
if current_page < max_pages:
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000)
except:
pass
else:
print("🔚 'Next' button not available — stopping pagination.")
break
current_page += 1
else:
print("🔄 No pagination found. Falling back to infinite scroll...")
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs_found = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs_found += 1
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
# ← Rest of job processing loop unchanged
scraped_count = 0
for idx, href in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
is_cloudflare = await self.engine._detect_cloudflare(page)
page_content = await page.content()
has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
job_data = None
final_url = full_url
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=30000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
job_data["url"] = final_url
if job_data["title"] == "N/A" and "linkedin.com" in final_url:
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
is_meaningful = (
job_data["title"] != "N/A" or
job_data["company"] != "N/A" or
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
)
if is_meaningful:
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords, verified=True)
scraped_count += 1
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
else:
await self._save_to_markdown(job_data, search_keywords, verified=False)
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")