Delete job_scraper.py

This commit is contained in:
Ofure 2025-12-05 16:44:23 +00:00
parent 06f9820c38
commit 7e80801f89

View File

@ -1,491 +0,0 @@
import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict, List
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self._init_db()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
title TEXT,
company TEXT,
location TEXT,
salary TEXT,
description TEXT,
url TEXT UNIQUE,
workplace_type TEXT,
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_job_details(self, page) -> Dict:
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
await asyncio.sleep(2 * self.human_speed)
async def get_text(selector: str) -> str:
try:
el = await page.query_selector(selector)
if el:
text = await el.inner_text()
return text.strip() if text else "N/A"
except:
pass
return "N/A"
title = await get_text("h1.t-24")
if title == "N/A":
title = await get_text("h1, h2")
company = await get_text("a.app-aware-link[href*='/company/']")
if company == "N/A":
company = await get_text("div.org, .company, [class*='company']")
location = await get_text("span[class*='location']")
if location == "N/A":
location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']")
if description == "N/A":
description = await get_text(".job-desc, .description, main, body")
workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A"
return {
"title": title,
"company": company,
"location": location,
"workplace_type": workplace,
"salary": salary,
"description": description,
"url": page.url
}
async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
"""Save to appropriate folder using job ID to avoid duplication"""
folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
os.makedirs(folder, exist_ok=True)
# Extract job ID from URL for LinkedIn jobs
url = job_data.get("url", "")
if "/jobs/view/" in url:
try:
job_id = url.split("/view/")[1].split("/")[0]
except:
job_id = "unknown"
else:
# For external jobs, use a hash of the URL (first 12 chars)
import hashlib
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
filepath = os.path.join(folder, filename)
# Only save if file doesn't already exist (idempotent)
if os.path.exists(filepath):
print(f" 📝 Skipping duplicate Markdown file: {filename}")
return
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{url}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(keyword, title, company, location, salary, description, url, workplace_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
keyword,
job_data["title"],
job_data["company"],
job_data["location"],
job_data["salary"],
job_data["description"],
job_data["url"],
job_data["workplace_type"]
))
conn.commit()
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless= False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=80000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=80000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
scraped_count = 0
all_job_links = []
seen_job_ids = set()
# ← NEW: Scroll once to reveal pagination (if any)
print("🔄 Scrolling to bottom to reveal pagination controls...")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Check if pagination exists
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Using page navigation.")
current_page = 1
while current_page <= max_pages:
print(f"📄 Processing page {current_page}/{max_pages}")
# Collect job links on current page
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs += 1
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
# Try to go to next page
if current_page < max_pages:
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=60000)
except:
pass
else:
print("🔚 'Next' button not available — stopping pagination.")
break
current_page += 1
else:
print("🔄 No pagination found. Falling back to infinite scroll...")
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs_found = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs_found += 1
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
# ← Rest of job processing loop unchanged
scraped_count = 0
for idx, href in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
is_cloudflare = await self.engine._detect_cloudflare(page)
page_content = await page.content()
has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
job_data = None
final_url = full_url
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=60000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
job_data["url"] = final_url
if job_data["title"] == "N/A" and "linkedin.com" in final_url:
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
is_meaningful = (
job_data["title"] != "N/A" or
job_data["company"] != "N/A" or
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
)
if is_meaningful:
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords, verified=True)
scraped_count += 1
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
else:
await self._save_to_markdown(job_data, search_keywords, verified=False)
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")