from openai import OpenAI from typing import Dict, Any import asyncio import psycopg2 import os from datetime import datetime import json import re from bs4 import BeautifulSoup from dotenv import load_dotenv # Load environment variables from .env load_dotenv() class LLMJobRefiner: def __init__(self): deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") if not deepseek_api_key: raise ValueError("DEEPSEEK_API_KEY not found in .env file.") # Database credentials from .env self.db_url = os.getenv("DB_URL") self.db_username = os.getenv("DB_USERNAME") self.db_password = os.getenv("DB_PASSWORD") self.db_host = os.getenv("DB_HOST") self.db_port = os.getenv("DB_PORT") if not self.db_url or not self.db_username or not self.db_password: raise ValueError("Database credentials not found in .env file.") # DeepSeek uses OpenAI-compatible API self.client = OpenAI( api_key=deepseek_api_key, base_url="https://api.deepseek.com/v1" ) self.model = "deepseek-chat" self._init_db() def _init_db(self): """Initialize PostgreSQL database connection and create table""" try: self.db_url = os.getenv("DB_URL") if self.db_url and "supabase.com" in self.db_url: conn = psycopg2.connect( host=self.db_host, port=self.db_port, database="postgres", user=self.db_username, password=self.db_password ) else: conn = psycopg2.connect( host=self.db_host, port=self.db_port, database="postgres", user=self.db_username, password=self.db_password ) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS jobs ( id SERIAL PRIMARY KEY, title TEXT, company_name TEXT, location TEXT, description TEXT, requirements TEXT, qualifications TEXT, salary_range TEXT, nature_of_work TEXT, job_id TEXT UNIQUE, url TEXT, category TEXT, scraped_at TIMESTAMP, posted_date TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Ensure the uniqueness constraint exists cursor.execute(''' ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key; ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id); ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON jobs(posted_date)') conn.commit() cursor.close() conn.close() print("✅ PostgreSQL database initialized successfully") except Exception as e: print(f"❌ Database initialization error: {e}") raise def _clean_html_for_llm(self, html_content: str) -> str: """Clean HTML to make it more readable for LLM while preserving structure""" try: soup = BeautifulSoup(html_content, 'html.parser') # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Extract text but keep some structure text = soup.get_text(separator=' ', strip=True) # Clean up whitespace text = re.sub(r'\s+', ' ', text) # Limit length for LLM context if len(text) > 10000: text = text[:10000] + "..." return text except Exception as e: print(f"HTML cleaning error: {e}") # Fallback to raw content if cleaning fails return html_content[:100000] if len(html_content) > 100000 else html_content def _generate_content_sync(self, prompt: str) -> str: """Synchronous call to DeepSeek API""" try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.2, max_tokens=2048, stream=False ) return response.choices[0].message.content or "" except Exception as e: print(f"DeepSeek API error: {e}") return "" async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: page_content = raw_data.get('page_content', '') cleaned_content = self._clean_html_for_llm(page_content) job_id = raw_data.get('job_id', 'unknown') url = raw_data.get('url', 'N/A') posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) prompt = f""" You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent. CRITICAL INSTRUCTIONS: - The job is from AMAZON. Look for these exact section headings: - "## Basic Qualifications" → extract as "qualifications" - "## Preferred Qualifications" → include this in "qualifications" too - "## Description" or "About the Role" or "Key job responsibilities" → extract as "description" - "You Will:" or "Job responsibilities" → include in "description" - Requirements are often embedded in qualifications or description FIELD RULES: - description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists. - qualifications: MUST include ALL content from "Basic Qualifications" and "Preferred Qualifications" sections. Combine them. - requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description. - For Amazon jobs, company_name = "Amazon". REQUIRED FIELDS (must have valid values, never "N/A"): - title, company_name, job_id, url OPTIONAL FIELDS (can be "Not provided"): - location, salary_range, nature_of_work Page Content: {cleaned_content} Response format (ONLY return this JSON): {{ "title": "...", "company_name": "...", "location": "...", "description": "...", "qualifications": "...", "salary_range": "...", "nature_of_work": "...", "job_id": "{job_id}", "url": "{url}" }} """ try: response_text = await asyncio.get_event_loop().run_in_executor( None, lambda: self._generate_content_sync(prompt) ) refined_data = self._parse_llm_response(response_text) if not refined_data: return None # Validate required fields required_fields = ['title', 'company_name', 'job_id', 'url'] for field in required_fields: if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]: return None # CRITICAL: Validate content fields - check if they SHOULD exist content_fields = ['description', 'qualifications'] cleaned_original = cleaned_content.lower() # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided" job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master'] has_job_content = any(indicator in cleaned_original for indicator in job_indicators) if has_job_content: for field in content_fields: value = refined_data.get(field, "").strip() if value in ["Not provided", "N/A", ""]: # LLM failed to extract existing content print(f" ⚠ LLM returned '{value}' for {field} but job content appears present") return None # Add the posted_date to the refined data refined_data['posted_date'] = posted_date return refined_data except Exception as e: print(f"LLM refinement failed: {str(e)}") return None def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) if not json_match: json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if not json_match: return None try: return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) except json.JSONDecodeError: return None async def save_job_data(self, job_data: Dict[str, Any], keyword: str): await self._save_to_db(job_data) await self._save_to_markdown(job_data, keyword) async def _save_to_db(self, job_data: Dict[str, Any]): """Save job data to PostgreSQL database with job_id uniqueness""" try: conn = psycopg2.connect( host=self.db_host, port=self.db_port, database="postgres", user=self.db_username, password=self.db_password ) cursor = conn.cursor() cursor.execute(''' INSERT INTO jobs (title, company_name, location, description, requirements, qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO NOTHING ''', ( job_data.get("title", "N/A"), job_data.get("company_name", "N/A"), job_data.get("location", "N/A"), job_data.get("description", "N/A"), job_data.get("requirements", "N/A"), job_data.get("qualifications", "N/A"), job_data.get("salary_range", "N/A"), job_data.get("nature_of_work", "N/A"), job_data.get("job_id", "N/A"), job_data.get("url", "N/A"), job_data.get("category", "N/A"), job_data.get("scraped_at"), job_data.get("posted_date", "N/A") )) conn.commit() cursor.close() conn.close() print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") except Exception as e: print(f"❌ Database save error: {e}") async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): os.makedirs("linkedin_jobs", exist_ok=True) filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 with open(filepath, "a", encoding="utf-8") as f: if write_header: f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") f.write(f"- *Keyword*: {keyword}\n") f.write(f"- *Company*: {job_data.get('company_name', 'N/A')}\n") f.write(f"- *Location*: {job_data.get('location', 'N/A')}\n") f.write(f"- *Nature of Work*: {job_data.get('nature_of_work', 'N/A')}\n") f.write(f"- *Salary Range*: {job_data.get('salary_range', 'N/A')}\n") f.write(f"- *Job ID*: {job_data.get('job_id', 'N/A')}\n") f.write(f"- *Posted Date*: {job_data.get('posted_date', 'N/A')}\n") f.write(f"- *Category*: {job_data.get('category', 'N/A')}\n") f.write(f"- *Scraped At*: {job_data.get('scraped_at', 'N/A')}\n") f.write(f"- *URL*: <{job_data.get('url', 'N/A')}>\n\n") f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") f.write("---\n\n")