modifications to work with postgre and use llm to extract and refine data

This commit is contained in:
Ofure Ikheloa 2025-12-05 17:00:43 +01:00
parent 4f78a845ae
commit 160efadbfb
5 changed files with 275 additions and 102 deletions

View File

@ -23,11 +23,11 @@ class StealthyFetcher:
print(f"Attempt {attempt + 1} to fetch {url}")
page = await self.context.new_page()
await page.goto(url, wait_until='load', timeout=60000)
await page.goto(url, wait_until='load', timeout=120000)
if wait_for_selector:
try:
await page.wait_for_selector(wait_for_selector, timeout=10000)
await page.wait_for_selector(wait_for_selector, timeout=40000)
except PlaywrightTimeoutError:
print(f"Selector {wait_for_selector} not found immediately, continuing...")
@ -88,7 +88,7 @@ class StealthyFetcher:
async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool:
if wait_for_selector:
try:
await page.wait_for_selector(wait_for_selector, timeout=5000)
await page.wait_for_selector(wait_for_selector, timeout=40000)
return True
except PlaywrightTimeoutError:
pass
@ -118,7 +118,7 @@ class StealthyFetcher:
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("🔄 Reloading page during Cloudflare wait...")
await page.reload(wait_until='load', timeout=30000)
await page.reload(wait_until='load', timeout=120000)
print("⏰ Timeout waiting for Cloudflare resolution.")
return False

View File

@ -1,13 +1,12 @@
import asyncio
import random
import sqlite3
import os
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
class LinkedInJobScraper:
@ -26,25 +25,8 @@ class LinkedInJobScraper:
self.llm_agent = LLMJobRefiner()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT,
url TEXT UNIQUE
)
''')
conn.commit()
# This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now
pass
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
@ -61,7 +43,7 @@ class LinkedInJobScraper:
async def _login(self, page, credentials: Dict) -> bool:
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await page.goto("https://www.linkedin.com/login", timeout=120000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
@ -104,7 +86,11 @@ class LinkedInJobScraper:
print("❌ Login may have failed.")
return False
async def _extract_all_page_content(self, page) -> str:
async def _extract_page_content_for_llm(self, page) -> str:
"""
Extract raw page content as HTML/text for LLM processing
The LLM will handle all extraction logic, not specific selectors
"""
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
@ -172,7 +158,7 @@ class LinkedInJobScraper:
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=60000)
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=120000)
except:
pass
current_page += 1
@ -247,7 +233,7 @@ class LinkedInJobScraper:
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
await page.goto("https://www.linkedin.com/feed/", timeout=120000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
@ -269,7 +255,7 @@ class LinkedInJobScraper:
print(" No credentials — proceeding as guest.")
login_successful = True
await page.wait_for_load_state("load", timeout=60000)
await page.wait_for_load_state("load", timeout=120000)
print("✅ Post-login page fully loaded. Starting search...")
# >>> PROTECTION CHECK USING FETCHER LOGIC <<<
@ -292,7 +278,7 @@ class LinkedInJobScraper:
print("✅ Protection present but content accessible — proceeding.")
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# >>> PROTECTION CHECK ON SEARCH PAGE <<<
@ -322,7 +308,7 @@ class LinkedInJobScraper:
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
iteration = 1
while True:
while True and iteration >= 5:
print(f"🔄 Iteration {iteration}: Checking for new jobs...")
prev_job_count = len(all_job_links)
@ -355,10 +341,6 @@ class LinkedInJobScraper:
print("🔚 No new jobs found after refresh. Stopping.")
break
if iteration > 10:
print("🔄 Maximum iterations reached. Stopping.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
scraped_count = 0
@ -386,8 +368,9 @@ class LinkedInJobScraper:
if apply_btn:
break
page_data = None
final_url = job_page.url
final_url = full_url
external_url = None
page_content = None
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
@ -399,44 +382,61 @@ class LinkedInJobScraper:
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=60000)
await external_page.wait_for_load_state("load", timeout=120000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(external_page)
final_url = external_page.url
# Extract raw content from external page for LLM processing
external_url = external_page.url
final_url = external_url
page_content = await self._extract_page_content_for_llm(external_page)
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page directly.")
await job_page.wait_for_timeout(2000)
await job_page.wait_for_timeout(60000)
try:
await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=80000)
except PlaywrightTimeoutError:
pass
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(job_page)
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(job_page)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown"
raw_data = {
"page_content": page_data,
"url": job_page.url,
"job_id": job_page.url.split("/")[-2] if "/jobs/view/" in job_page.url else "unknown"
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": search_keywords
}
# LLM agent is now fully responsible for extraction and validation
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
# Ensure compulsory fields are present (fallback if LLM missed them)
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = "Unknown Company"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = clean_keywords
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
@ -455,7 +455,7 @@ class LinkedInJobScraper:
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()

View File

@ -4,6 +4,8 @@ from job_scraper2 import LinkedInJobScraper
import os
from dotenv import load_dotenv
import asyncio
import random
import time
# Load environment variables
load_dotenv()
@ -11,7 +13,7 @@ load_dotenv()
async def main():
engine = FingerprintScrapingEngine(
seed="job_scraping_123",
seed="job_scraping_12",
target_os="windows",
db_path="job_listings.db",
markdown_path="job_listings.md"
@ -20,13 +22,50 @@ async def main():
# Initialize scraper with target field
scraper = LinkedInJobScraper(engine, human_speed=1.6, user_request="Extract title, company, location, description, requirements, qualifications, nature of job(remote, onsite, hybrid) and salary")
# List of job titles to cycle through
job_titles = [
"Software Engineer",
"Data Scientist",
"Product Manager",
"UX Designer",
"DevOps Engineer",
"Machine Learning Engineer",
"Frontend Developer",
"Backend Developer",
"Full Stack Developer",
"Data Analyst"
]
fixed_location = "New York"
# Keep cycling through all job titles
while True:
# Shuffle job titles to randomize order
random.shuffle(job_titles)
for job_title in job_titles:
search_keywords = f"{job_title} location:{fixed_location}"
print(f"\n{'='*60}")
print(f"Starting scrape for: {search_keywords}")
print(f"{'='*60}")
await scraper.scrape_jobs(
search_keywords="Lecturer location:New York",
search_keywords=search_keywords,
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
}
)
print(f"\n✅ Completed scraping for: {job_title}")
print(f"⏳ Waiting 2 minutes before next job title...")
# Wait 2 minutes before next job title
time.sleep(120)
print(f"\n✅ Completed full cycle of all job titles")
print(f"🔄 Starting new cycle...")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,28 +1,125 @@
from openai import OpenAI
from typing import Dict, Any
import asyncio
import sqlite3
import psycopg2
import os
from datetime import datetime
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
# Database credentials from .env
self.db_url = os.getenv("DB_URL")
self.db_username = os.getenv("DB_USERNAME")
self.db_password = os.getenv("DB_PASSWORD")
self.db_host = os.getenv("DB_HOST")
self.db_port = os.getenv("DB_PORT")
if not self.db_url or not self.db_username or not self.db_password:
raise ValueError("Database credentials not found in .env file.")
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat" # or "deepseek-coder" if preferred
self.model = "deepseek-chat"
self._init_db()
def _init_db(self):
"""Initialize PostgreSQL database connection and create table"""
try:
self.db_url = os.getenv("DB_URL")
if self.db_url and "supabase.com" in self.db_url:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
else:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id SERIAL PRIMARY KEY,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT UNIQUE,
url TEXT,
category TEXT,
scraped_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Ensure the uniqueness constraint exists
cursor.execute('''
ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key;
ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id);
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)')
conn.commit()
cursor.close()
conn.close()
print("✅ PostgreSQL database initialized successfully")
except Exception as e:
print(f"❌ Database initialization error: {e}")
raise
def _clean_html_for_llm(self, html_content: str) -> str:
"""Clean HTML to make it more readable for LLM while preserving structure"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Extract text but keep some structure
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
# Limit length for LLM context
if len(text) > 10000:
text = text[:10000] + "..."
return text
except Exception as e:
print(f"HTML cleaning error: {e}")
# Fallback to raw content if cleaning fails
return html_content[:100000] if len(html_content) > 100000 else html_content
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
@ -40,33 +137,47 @@ class LLMJobRefiner:
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
# Clean the raw HTML content for better LLM processing
page_content = raw_data.get('page_content', '')
cleaned_content = self._clean_html_for_llm(page_content)
# Get job_id and url from raw data
job_id = raw_data.get('job_id', 'unknown')
url = raw_data.get('url', 'N/A')
prompt = f"""
You are a job data extraction assistant. Extract the following fields from the job posting:
- title
- company_name
You are a job posting data extractor with two modes:
PRIMARY MODE (PREFERRED):
- Extract EXACT text as it appears on the page for all fields
- DO NOT summarize, paraphrase, or interpret
- Copy verbatim content including original wording and formatting
FALLBACK MODE (ONLY IF FIELD IS MISSING):
- If a field is NOT explicitly stated anywhere in the content, you MAY infer it using clear contextual clues
- Inference rules:
* company_name: Look for patterns like "at [Company]", "Join [Company]", "[Company] is hiring"
* nature_of_work: Look for "remote", "onsite", "hybrid", "work from home", "office-based"
* location: Extract city/state/country mentions near job title or details
* title: Use the largest/primary heading if no explicit "job title" label exists
REQUIRED FIELDS (must always have a value):
- title: Exact job title or best inference
- company_name: Exact company name or best inference
- job_id: Use provided: {job_id}
- url: Use provided: {url}
OPTIONAL FIELDS (use exact text or "N/A" if not present and not inferable):
- location
- description
- requirements
- qualifications
- salary_range
- nature_of_work (remote, onsite, or hybrid)
- job_id
- nature_of_work
Target Field: {target_field}
Raw Page Content:
{raw_data.get('page_content', '')}
Instructions:
1. Extract only the information relevant to the target field: {target_field}
2. Clean up any formatting issues in the description
3. Standardize location format (city, state/country)
4. Extract salary range if mentioned
5. Determine nature of work from work arrangements
6. Ensure all fields are properly formatted
7. If a field cannot be found, use "N/A"
8. Return ONLY the refined data in JSON format
Response format (only return the JSON):
Page Content:
{cleaned_content}
Response format (ONLY return this JSON):
{{
"title": "...",
"company_name": "...",
@ -76,8 +187,8 @@ class LLMJobRefiner:
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{raw_data.get('job_id', 'unknown')}",
"url": "{raw_data.get('url', 'N/A')}"
"job_id": "{job_id}",
"url": "{url}"
}}
"""
@ -87,7 +198,17 @@ class LLMJobRefiner:
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
return refined_data if refined_data else None
# Final validation - ensure required fields are present and meaningful
if refined_data:
required_fields = ['title', 'company_name', 'job_id', 'url']
for field in required_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown", "Company", "Job"]:
return None # LLM failed to extract properly
return refined_data
return None
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
@ -109,22 +230,23 @@ class LLMJobRefiner:
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
db_path = "linkedin_jobs.db"
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
title TEXT, company_name TEXT, location TEXT, description TEXT,
requirements TEXT, qualifications TEXT, salary_range TEXT,
nature_of_work TEXT, job_id TEXT, url TEXT
"""Save job data to PostgreSQL database with job_id uniqueness"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
''')
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
INSERT INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
@ -135,9 +257,19 @@ class LLMJobRefiner:
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A")
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
job_data.get("scraped_at")
))
conn.commit()
cursor.close()
conn.close()
print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}")
except Exception as e:
print(f"❌ Database save error: {e}")
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
os.makedirs("linkedin_jobs", exist_ok=True)
@ -154,6 +286,8 @@ class LLMJobRefiner:
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")

View File

@ -69,7 +69,7 @@ class FingerprintScrapingEngine:
self.optimization_params = {
"base_delay": 2.0,
"max_concurrent_requests": 4,
"request_timeout": 60000,
"request_timeout": 120000,
"retry_attempts": 3,
"captcha_handling_strategy": "avoid", # or "solve_fallback"
"cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload"
@ -155,7 +155,7 @@ class FingerprintScrapingEngine:
# Increase timeout if avg response time is high
if avg_rt > 20:
self.optimization_params["request_timeout"] = 90000 # 90 seconds
self.optimization_params["request_timeout"] = 150000 # 90 seconds
print(f"Optimization Params Updated: {self.optimization_params}")
@ -371,7 +371,7 @@ class FingerprintScrapingEngine:
# Reload occasionally to trigger potential client-side checks
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("Reloading page during Cloudflare wait...")
await page.reload(wait_until='load', timeout=30000)
await page.reload(wait_until='load', timeout=80000)
print("Timeout waiting for Cloudflare resolution.")
return False