From 160efadbfb1ad06cd955a0ab8389d7e853722f03 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Fri, 5 Dec 2025 17:00:43 +0100 Subject: [PATCH] modifications to work with postgre and use llm to extract and refine data --- fetcher.py | 8 +- job_scraper2.py | 92 +++++++++---------- linkedin_main.py | 57 ++++++++++-- llm_agent.py | 214 ++++++++++++++++++++++++++++++++++++--------- scraping_engine.py | 6 +- 5 files changed, 275 insertions(+), 102 deletions(-) diff --git a/fetcher.py b/fetcher.py index 0bacc3a..5418be3 100644 --- a/fetcher.py +++ b/fetcher.py @@ -23,11 +23,11 @@ class StealthyFetcher: print(f"Attempt {attempt + 1} to fetch {url}") page = await self.context.new_page() - await page.goto(url, wait_until='load', timeout=60000) + await page.goto(url, wait_until='load', timeout=120000) if wait_for_selector: try: - await page.wait_for_selector(wait_for_selector, timeout=10000) + await page.wait_for_selector(wait_for_selector, timeout=40000) except PlaywrightTimeoutError: print(f"Selector {wait_for_selector} not found immediately, continuing...") @@ -88,7 +88,7 @@ class StealthyFetcher: async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool: if wait_for_selector: try: - await page.wait_for_selector(wait_for_selector, timeout=5000) + await page.wait_for_selector(wait_for_selector, timeout=40000) return True except PlaywrightTimeoutError: pass @@ -118,7 +118,7 @@ class StealthyFetcher: if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2: print("šŸ”„ Reloading page during Cloudflare wait...") - await page.reload(wait_until='load', timeout=30000) + await page.reload(wait_until='load', timeout=120000) print("ā° Timeout waiting for Cloudflare resolution.") return False diff --git a/job_scraper2.py b/job_scraper2.py index 808ceb6..e7158f5 100644 --- a/job_scraper2.py +++ b/job_scraper2.py @@ -1,13 +1,12 @@ import asyncio import random -import sqlite3 -import os from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re from fetcher import StealthyFetcher +from datetime import datetime class LinkedInJobScraper: @@ -26,25 +25,8 @@ class LinkedInJobScraper: self.llm_agent = LLMJobRefiner() def _init_db(self): - os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True) - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute(''' - CREATE TABLE IF NOT EXISTS jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - title TEXT, - company_name TEXT, - location TEXT, - description TEXT, - requirements TEXT, - qualifications TEXT, - salary_range TEXT, - nature_of_work TEXT, - job_id TEXT, - url TEXT UNIQUE - ) - ''') - conn.commit() + # This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now + pass async def _human_click(self, page, element, wait_after: bool = True): if not element: @@ -61,7 +43,7 @@ class LinkedInJobScraper: async def _login(self, page, credentials: Dict) -> bool: print("šŸ” Navigating to LinkedIn login page...") - await page.goto("https://www.linkedin.com/login", timeout=60000) + await page.goto("https://www.linkedin.com/login", timeout=120000) await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed) email_field = await page.query_selector('input[name="session_key"]') @@ -104,7 +86,11 @@ class LinkedInJobScraper: print("āŒ Login may have failed.") return False - async def _extract_all_page_content(self, page) -> str: + async def _extract_page_content_for_llm(self, page) -> str: + """ + Extract raw page content as HTML/text for LLM processing + The LLM will handle all extraction logic, not specific selectors + """ await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) @@ -172,7 +158,7 @@ class LinkedInJobScraper: await self._human_click(page, next_btn) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) try: - await page.wait_for_function("() => window.location.href.includes('start=')", timeout=60000) + await page.wait_for_function("() => window.location.href.includes('start=')", timeout=120000) except: pass current_page += 1 @@ -247,7 +233,7 @@ class LinkedInJobScraper: if session_loaded: print("šŸ” Using saved session — verifying login...") - await page.goto("https://www.linkedin.com/feed/", timeout=60000) + await page.goto("https://www.linkedin.com/feed/", timeout=120000) if "feed" in page.url and "login" not in page.url: print("āœ… Session still valid.") login_successful = True @@ -269,7 +255,7 @@ class LinkedInJobScraper: print("ā„¹ļø No credentials — proceeding as guest.") login_successful = True - await page.wait_for_load_state("load", timeout=60000) + await page.wait_for_load_state("load", timeout=120000) print("āœ… Post-login page fully loaded. Starting search...") # >>> PROTECTION CHECK USING FETCHER LOGIC <<< @@ -292,7 +278,7 @@ class LinkedInJobScraper: print("āœ… Protection present but content accessible — proceeding.") print(f"šŸ” Searching for: {search_keywords}") - await page.goto(search_url, wait_until='load', timeout=60000) + await page.goto(search_url, wait_until='load', timeout=120000) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) # >>> PROTECTION CHECK ON SEARCH PAGE <<< @@ -322,7 +308,7 @@ class LinkedInJobScraper: print(f" āž• Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") iteration = 1 - while True: + while True and iteration >= 5: print(f"šŸ”„ Iteration {iteration}: Checking for new jobs...") prev_job_count = len(all_job_links) @@ -355,10 +341,6 @@ class LinkedInJobScraper: print("šŸ”š No new jobs found after refresh. Stopping.") break - if iteration > 10: - print("šŸ”„ Maximum iterations reached. Stopping.") - break - print(f"āœ… Collected {len(all_job_links)} unique job links.") scraped_count = 0 @@ -386,8 +368,9 @@ class LinkedInJobScraper: if apply_btn: break - page_data = None - final_url = job_page.url + final_url = full_url + external_url = None + page_content = None if apply_btn: print(" → Clicking 'Apply' / 'Easy Apply' button...") @@ -399,44 +382,61 @@ class LinkedInJobScraper: try: external_page = await asyncio.wait_for(page_waiter, timeout=5.0) print(" 🌐 External job site opened in new tab.") - await external_page.wait_for_load_state("load", timeout=60000) + await external_page.wait_for_load_state("load", timeout=120000) await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(external_page) await asyncio.sleep(2 * self.human_speed) - page_data = await self._extract_all_page_content(external_page) - final_url = external_page.url + # Extract raw content from external page for LLM processing + external_url = external_page.url + final_url = external_url + page_content = await self._extract_page_content_for_llm(external_page) if not external_page.is_closed(): await external_page.close() except asyncio.TimeoutError: print(" šŸ–„ļø No external tab — scraping LinkedIn job page directly.") - await job_page.wait_for_timeout(2000) + await job_page.wait_for_timeout(60000) try: - await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000) + await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=80000) except PlaywrightTimeoutError: pass await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) - page_data = await self._extract_all_page_content(job_page) + page_content = await self._extract_page_content_for_llm(job_page) else: print(" āš ļø No 'Apply' button found — scraping job details directly.") await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) - page_data = await self._extract_all_page_content(job_page) + page_content = await self._extract_page_content_for_llm(job_page) - job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown" + job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown" raw_data = { - "page_content": page_data, - "url": job_page.url, - "job_id": job_page.url.split("/")[-2] if "/jobs/view/" in job_page.url else "unknown" + "page_content": page_content, + "url": final_url, + "job_id": job_id, + "search_keywords": search_keywords } + # LLM agent is now fully responsible for extraction and validation refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": + # Ensure compulsory fields are present (fallback if LLM missed them) + compulsory_fields = ['company_name', 'job_id', 'url'] + for field in compulsory_fields: + if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: + if field == 'job_id': + refined_data[field] = job_id + elif field == 'url': + refined_data[field] = final_url + elif field == 'company_name': + refined_data[field] = "Unknown Company" + + refined_data['scraped_at'] = datetime.now().isoformat() + refined_data['category'] = clean_keywords await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 print(f" āœ… Scraped and refined: {refined_data['title'][:50]}...") @@ -455,7 +455,7 @@ class LinkedInJobScraper: finally: print(" ā†©ļø Returning to LinkedIn search results...") - await page.goto(search_url, timeout=60000) + await page.goto(search_url, timeout=120000) await asyncio.sleep(4 * self.human_speed) await browser.close() diff --git a/linkedin_main.py b/linkedin_main.py index c97f031..59bb061 100644 --- a/linkedin_main.py +++ b/linkedin_main.py @@ -4,6 +4,8 @@ from job_scraper2 import LinkedInJobScraper import os from dotenv import load_dotenv import asyncio +import random +import time # Load environment variables load_dotenv() @@ -11,7 +13,7 @@ load_dotenv() async def main(): engine = FingerprintScrapingEngine( - seed="job_scraping_123", + seed="job_scraping_12", target_os="windows", db_path="job_listings.db", markdown_path="job_listings.md" @@ -20,13 +22,50 @@ async def main(): # Initialize scraper with target field scraper = LinkedInJobScraper(engine, human_speed=1.6, user_request="Extract title, company, location, description, requirements, qualifications, nature of job(remote, onsite, hybrid) and salary") - await scraper.scrape_jobs( - search_keywords="Lecturer location:New York", - credentials={ - "email": os.getenv("SCRAPING_USERNAME"), - "password": os.getenv("SCRAPING_PASSWORD") - } - ) + # List of job titles to cycle through + job_titles = [ + "Software Engineer", + "Data Scientist", + "Product Manager", + "UX Designer", + "DevOps Engineer", + "Machine Learning Engineer", + "Frontend Developer", + "Backend Developer", + "Full Stack Developer", + "Data Analyst" + ] + + fixed_location = "New York" + + # Keep cycling through all job titles + while True: + # Shuffle job titles to randomize order + random.shuffle(job_titles) + + for job_title in job_titles: + search_keywords = f"{job_title} location:{fixed_location}" + + print(f"\n{'='*60}") + print(f"Starting scrape for: {search_keywords}") + print(f"{'='*60}") + + await scraper.scrape_jobs( + search_keywords=search_keywords, + credentials={ + "email": os.getenv("SCRAPING_USERNAME"), + "password": os.getenv("SCRAPING_PASSWORD") + } + ) + + print(f"\nāœ… Completed scraping for: {job_title}") + print(f"ā³ Waiting 2 minutes before next job title...") + + # Wait 2 minutes before next job title + time.sleep(120) + + print(f"\nāœ… Completed full cycle of all job titles") + print(f"šŸ”„ Starting new cycle...") if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/llm_agent.py b/llm_agent.py index e59b3eb..f7473e8 100644 --- a/llm_agent.py +++ b/llm_agent.py @@ -1,28 +1,125 @@ + from openai import OpenAI from typing import Dict, Any import asyncio -import sqlite3 +import psycopg2 import os from datetime import datetime import json import re +from bs4 import BeautifulSoup from dotenv import load_dotenv # Load environment variables from .env load_dotenv() + class LLMJobRefiner: def __init__(self): deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") if not deepseek_api_key: raise ValueError("DEEPSEEK_API_KEY not found in .env file.") + # Database credentials from .env + self.db_url = os.getenv("DB_URL") + self.db_username = os.getenv("DB_USERNAME") + self.db_password = os.getenv("DB_PASSWORD") + self.db_host = os.getenv("DB_HOST") + self.db_port = os.getenv("DB_PORT") + + if not self.db_url or not self.db_username or not self.db_password: + raise ValueError("Database credentials not found in .env file.") + # DeepSeek uses OpenAI-compatible API self.client = OpenAI( api_key=deepseek_api_key, base_url="https://api.deepseek.com/v1" ) - self.model = "deepseek-chat" # or "deepseek-coder" if preferred + self.model = "deepseek-chat" + self._init_db() + + def _init_db(self): + """Initialize PostgreSQL database connection and create table""" + try: + self.db_url = os.getenv("DB_URL") + if self.db_url and "supabase.com" in self.db_url: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + else: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + cursor = conn.cursor() + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS jobs ( + id SERIAL PRIMARY KEY, + title TEXT, + company_name TEXT, + location TEXT, + description TEXT, + requirements TEXT, + qualifications TEXT, + salary_range TEXT, + nature_of_work TEXT, + job_id TEXT UNIQUE, + url TEXT, + category TEXT, + scraped_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + + # Ensure the uniqueness constraint exists + cursor.execute(''' + ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key; + ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id); + ''') + + cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)') + + conn.commit() + cursor.close() + conn.close() + print("āœ… PostgreSQL database initialized successfully") + except Exception as e: + print(f"āŒ Database initialization error: {e}") + raise + + def _clean_html_for_llm(self, html_content: str) -> str: + """Clean HTML to make it more readable for LLM while preserving structure""" + try: + soup = BeautifulSoup(html_content, 'html.parser') + + # Remove script and style elements + for script in soup(["script", "style", "nav", "footer", "header"]): + script.decompose() + + # Extract text but keep some structure + text = soup.get_text(separator=' ', strip=True) + + # Clean up whitespace + text = re.sub(r'\s+', ' ', text) + + # Limit length for LLM context + if len(text) > 10000: + text = text[:10000] + "..." + + return text + except Exception as e: + print(f"HTML cleaning error: {e}") + # Fallback to raw content if cleaning fails + return html_content[:100000] if len(html_content) > 100000 else html_content def _generate_content_sync(self, prompt: str) -> str: """Synchronous call to DeepSeek API""" @@ -40,33 +137,47 @@ class LLMJobRefiner: return "" async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: + # Clean the raw HTML content for better LLM processing + page_content = raw_data.get('page_content', '') + cleaned_content = self._clean_html_for_llm(page_content) + + # Get job_id and url from raw data + job_id = raw_data.get('job_id', 'unknown') + url = raw_data.get('url', 'N/A') + prompt = f""" - You are a job data extraction assistant. Extract the following fields from the job posting: - - title - - company_name + You are a job posting data extractor with two modes: + + PRIMARY MODE (PREFERRED): + - Extract EXACT text as it appears on the page for all fields + - DO NOT summarize, paraphrase, or interpret + - Copy verbatim content including original wording and formatting + + FALLBACK MODE (ONLY IF FIELD IS MISSING): + - If a field is NOT explicitly stated anywhere in the content, you MAY infer it using clear contextual clues + - Inference rules: + * company_name: Look for patterns like "at [Company]", "Join [Company]", "[Company] is hiring" + * nature_of_work: Look for "remote", "onsite", "hybrid", "work from home", "office-based" + * location: Extract city/state/country mentions near job title or details + * title: Use the largest/primary heading if no explicit "job title" label exists + + REQUIRED FIELDS (must always have a value): + - title: Exact job title or best inference + - company_name: Exact company name or best inference + - job_id: Use provided: {job_id} + - url: Use provided: {url} + + OPTIONAL FIELDS (use exact text or "N/A" if not present and not inferable): - location - - description + - description - requirements - qualifications - salary_range - - nature_of_work (remote, onsite, or hybrid) - - job_id + - nature_of_work - Target Field: {target_field} - Raw Page Content: - {raw_data.get('page_content', '')} - - Instructions: - 1. Extract only the information relevant to the target field: {target_field} - 2. Clean up any formatting issues in the description - 3. Standardize location format (city, state/country) - 4. Extract salary range if mentioned - 5. Determine nature of work from work arrangements - 6. Ensure all fields are properly formatted - 7. If a field cannot be found, use "N/A" - 8. Return ONLY the refined data in JSON format - - Response format (only return the JSON): + Page Content: + {cleaned_content} + Response format (ONLY return this JSON): {{ "title": "...", "company_name": "...", @@ -76,8 +187,8 @@ class LLMJobRefiner: "qualifications": "...", "salary_range": "...", "nature_of_work": "...", - "job_id": "{raw_data.get('job_id', 'unknown')}", - "url": "{raw_data.get('url', 'N/A')}" + "job_id": "{job_id}", + "url": "{url}" }} """ @@ -87,7 +198,17 @@ class LLMJobRefiner: lambda: self._generate_content_sync(prompt) ) refined_data = self._parse_llm_response(response_text) - return refined_data if refined_data else None + + # Final validation - ensure required fields are present and meaningful + if refined_data: + required_fields = ['title', 'company_name', 'job_id', 'url'] + for field in required_fields: + if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown", "Company", "Job"]: + return None # LLM failed to extract properly + + return refined_data + return None + except Exception as e: print(f"LLM refinement failed: {str(e)}") return None @@ -109,22 +230,23 @@ class LLMJobRefiner: await self._save_to_markdown(job_data, keyword) async def _save_to_db(self, job_data: Dict[str, Any]): - db_path = "linkedin_jobs.db" - os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) - with sqlite3.connect(db_path) as conn: + """Save job data to PostgreSQL database with job_id uniqueness""" + try: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) cursor = conn.cursor() + cursor.execute(''' - CREATE TABLE IF NOT EXISTS jobs ( - title TEXT, company_name TEXT, location TEXT, description TEXT, - requirements TEXT, qualifications TEXT, salary_range TEXT, - nature_of_work TEXT, job_id TEXT, url TEXT - ) - ''') - cursor.execute(''' - INSERT OR IGNORE INTO jobs + INSERT INTO jobs (title, company_name, location, description, requirements, - qualifications, salary_range, nature_of_work, job_id, url) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (job_id) DO NOTHING ''', ( job_data.get("title", "N/A"), job_data.get("company_name", "N/A"), @@ -135,9 +257,19 @@ class LLMJobRefiner: job_data.get("salary_range", "N/A"), job_data.get("nature_of_work", "N/A"), job_data.get("job_id", "N/A"), - job_data.get("url", "N/A") + job_data.get("url", "N/A"), + job_data.get("category", "N/A"), + job_data.get("scraped_at") )) + conn.commit() + cursor.close() + conn.close() + + print(f" šŸ’¾ Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") + + except Exception as e: + print(f"āŒ Database save error: {e}") async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): os.makedirs("linkedin_jobs", exist_ok=True) @@ -154,6 +286,8 @@ class LLMJobRefiner: f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") + f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n") + f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n") f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") diff --git a/scraping_engine.py b/scraping_engine.py index 87e89bd..31451b2 100644 --- a/scraping_engine.py +++ b/scraping_engine.py @@ -69,7 +69,7 @@ class FingerprintScrapingEngine: self.optimization_params = { "base_delay": 2.0, "max_concurrent_requests": 4, - "request_timeout": 60000, + "request_timeout": 120000, "retry_attempts": 3, "captcha_handling_strategy": "avoid", # or "solve_fallback" "cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload" @@ -155,7 +155,7 @@ class FingerprintScrapingEngine: # Increase timeout if avg response time is high if avg_rt > 20: - self.optimization_params["request_timeout"] = 90000 # 90 seconds + self.optimization_params["request_timeout"] = 150000 # 90 seconds print(f"Optimization Params Updated: {self.optimization_params}") @@ -371,7 +371,7 @@ class FingerprintScrapingEngine: # Reload occasionally to trigger potential client-side checks if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2: print("Reloading page during Cloudflare wait...") - await page.reload(wait_until='load', timeout=30000) + await page.reload(wait_until='load', timeout=80000) print("Timeout waiting for Cloudflare resolution.") return False