Web_scraping_project/llm_agent.py

295 lines
12 KiB
Python

from openai import OpenAI
from typing import Dict, Any
import asyncio
import psycopg2
import os
from datetime import datetime
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
# Database credentials from .env
self.db_url = os.getenv("DB_URL")
self.db_username = os.getenv("DB_USERNAME")
self.db_password = os.getenv("DB_PASSWORD")
self.db_host = os.getenv("DB_HOST")
self.db_port = os.getenv("DB_PORT")
if not self.db_url or not self.db_username or not self.db_password:
raise ValueError("Database credentials not found in .env file.")
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat"
self._init_db()
def _init_db(self):
"""Initialize PostgreSQL database connection and create table"""
try:
self.db_url = os.getenv("DB_URL")
if self.db_url and "supabase.com" in self.db_url:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
else:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id SERIAL PRIMARY KEY,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT UNIQUE,
url TEXT,
category TEXT,
scraped_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Ensure the uniqueness constraint exists
cursor.execute('''
ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key;
ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id);
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)')
conn.commit()
cursor.close()
conn.close()
print("✅ PostgreSQL database initialized successfully")
except Exception as e:
print(f"❌ Database initialization error: {e}")
raise
def _clean_html_for_llm(self, html_content: str) -> str:
"""Clean HTML to make it more readable for LLM while preserving structure"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Extract text but keep some structure
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
# Limit length for LLM context
if len(text) > 10000:
text = text[:10000] + "..."
return text
except Exception as e:
print(f"HTML cleaning error: {e}")
# Fallback to raw content if cleaning fails
return html_content[:100000] if len(html_content) > 100000 else html_content
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
stream=False
)
return response.choices[0].message.content or ""
except Exception as e:
print(f"DeepSeek API error: {e}")
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
# Clean the raw HTML content for better LLM processing
page_content = raw_data.get('page_content', '')
cleaned_content = self._clean_html_for_llm(page_content)
# Get job_id and url from raw data
job_id = raw_data.get('job_id', 'unknown')
url = raw_data.get('url', 'N/A')
prompt = f"""
You are a job posting data extractor with two modes:
PRIMARY MODE (PREFERRED):
- Extract EXACT text as it appears on the page for all fields
- DO NOT summarize, paraphrase, or interpret
- Copy verbatim content including original wording and formatting
FALLBACK MODE (ONLY IF FIELD IS MISSING):
- If a field is NOT explicitly stated anywhere in the content, you MAY infer it using clear contextual clues
- Inference rules:
* company_name: Look for patterns like "at [Company]", "Join [Company]", "[Company] is hiring"
* nature_of_work: Look for "remote", "onsite", "hybrid", "work from home", "office-based"
* location: Extract city/state/country mentions near job title or details
* title: Use the largest/primary heading if no explicit "job title" label exists
REQUIRED FIELDS (must always have a value):
- title: Exact job title or best inference
- company_name: Exact company name or best inference
- job_id: Use provided: {job_id}
- url: Use provided: {url}
OPTIONAL FIELDS (use exact text or "N/A" if not present and not inferable):
- location
- description
- requirements
- qualifications
- salary_range
- nature_of_work
Page Content:
{cleaned_content}
Response format (ONLY return this JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{job_id}",
"url": "{url}"
}}
"""
try:
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
# Final validation - ensure required fields are present and meaningful
if refined_data:
required_fields = ['title', 'company_name', 'job_id', 'url']
for field in required_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown", "Company", "Job"]:
return None # LLM failed to extract properly
return refined_data
return None
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if not json_match:
return None
try:
return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0))
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
await self._save_to_db(job_data)
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
"""Save job data to PostgreSQL database with job_id uniqueness"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
job_data.get("scraped_at")
))
conn.commit()
cursor.close()
conn.close()
print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}")
except Exception as e:
print(f"❌ Database save error: {e}")
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
os.makedirs("linkedin_jobs", exist_ok=True)
filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md")
write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0
with open(filepath, "a", encoding="utf-8") as f:
if write_header:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- **Keyword**: {keyword}\n")
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")