From 38ef08c734cdee20dce2cba6dd056783dafc02f6 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Wed, 24 Dec 2025 11:57:26 +0100 Subject: [PATCH] Add LLMJobRefiner class and update requirements for job data extraction --- llm_agent.py | 303 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 8 ++ 2 files changed, 311 insertions(+) create mode 100644 llm_agent.py create mode 100644 requirements.txt diff --git a/llm_agent.py b/llm_agent.py new file mode 100644 index 0000000..2d72299 --- /dev/null +++ b/llm_agent.py @@ -0,0 +1,303 @@ + +from openai import OpenAI +from typing import Dict, Any +import asyncio +import psycopg2 +import os +from datetime import datetime +import json +import re +from bs4 import BeautifulSoup +from dotenv import load_dotenv + +# Load environment variables from .env +load_dotenv() + + +class LLMJobRefiner: + def __init__(self): + deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") + if not deepseek_api_key: + raise ValueError("DEEPSEEK_API_KEY not found in .env file.") + + # Database credentials from .env + self.db_url = os.getenv("DB_URL") + self.db_username = os.getenv("DB_USERNAME") + self.db_password = os.getenv("DB_PASSWORD") + self.db_host = os.getenv("DB_HOST") + self.db_port = os.getenv("DB_PORT") + + if not self.db_url or not self.db_username or not self.db_password: + raise ValueError("Database credentials not found in .env file.") + + # DeepSeek uses OpenAI-compatible API + self.client = OpenAI( + api_key=deepseek_api_key, + base_url="https://api.deepseek.com/v1" + ) + self.model = "deepseek-chat" + self._init_db() + + def _init_db(self): + """Initialize PostgreSQL database connection and create table""" + try: + self.db_url = os.getenv("DB_URL") + if self.db_url and "supabase.com" in self.db_url: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + else: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + cursor = conn.cursor() + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS crypto_jobs ( + id SERIAL PRIMARY KEY, + title TEXT, + company_name TEXT, + location TEXT, + description TEXT, + requirements TEXT, + qualifications TEXT, + salary_range TEXT, + nature_of_work TEXT, + job_id TEXT UNIQUE, + url TEXT, + category TEXT, + scraped_at TIMESTAMP, + posted_date TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) +''') + + # Ensure the uniqueness constraint exists + cursor.execute(''' + ALTER TABLE crypto_jobs DROP CONSTRAINT IF EXISTS crypto_jobs_job_id_key; + ALTER TABLE crypto_jobs ADD CONSTRAINT crypto_jobs_job_id_key UNIQUE (job_id); +''') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON crypto_jobs(job_id)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON crypto_jobs(category)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON crypto_jobs(posted_date)') + + conn.commit() + cursor.close() + conn.close() + print("✅ PostgreSQL database initialized successfully") + except Exception as e: + print(f"❌ Database initialization error: {e}") + raise + + def _clean_html_for_llm(self, html_content: str) -> str: + """Clean HTML to make it more readable for LLM while preserving structure""" + try: + soup = BeautifulSoup(html_content, 'html.parser') + + # Remove script and style elements + for script in soup(["script", "style", "nav", "footer", "header"]): + script.decompose() + + # Extract text but keep some structure + text = soup.get_text(separator=' ', strip=True) + + # Clean up whitespace + text = re.sub(r'\s+', ' ', text) + + # Limit length for LLM context + if len(text) > 10000: + text = text[:10000] + "..." + + return text + except Exception as e: + print(f"HTML cleaning error: {e}") + # Fallback to raw content if cleaning fails + return html_content[:100000] if len(html_content) > 100000 else html_content + + def _generate_content_sync(self, prompt: str) -> str: + """Synchronous call to DeepSeek API""" + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.2, + max_tokens=2048, + stream=False + ) + return response.choices[0].message.content or "" + except Exception as e: + print(f"DeepSeek API error: {e}") + return "" + + async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: + page_content = raw_data.get('page_content', '') + cleaned_content = self._clean_html_for_llm(page_content) + job_id = raw_data.get('job_id', 'unknown') + url = raw_data.get('url', 'N/A') + posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) + + prompt = f""" + You are a job posting data extractor. + + EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT. + + For these critical fields, follow these rules: + - description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists. + - requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist. + - qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist. + + REQUIRED FIELDS (must have valid values, never "N/A"): + - title, company_name, job_id, url + + OPTIONAL FIELDS (can be "Not provided"): + - location, salary_range, nature_of_work + + Page Content: + {cleaned_content} + + Response format (ONLY return this JSON): + {{ + "title": "...", + "company_name": "...", + "location": "...", + "description": "...", + "requirements": "...", + "qualifications": "...", + "salary_range": "...", + "nature_of_work": "...", + "job_id": "{job_id}", + "url": "{url}" + }} + """ + + try: + response_text = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self._generate_content_sync(prompt) + ) + refined_data = self._parse_llm_response(response_text) + + if not refined_data: + return None + + # Validate required fields + required_fields = ['title', 'company_name', 'job_id', 'url'] + for field in required_fields: + if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]: + return None + + # CRITICAL: Validate content fields - check if they SHOULD exist + content_fields = ['description', 'requirements', 'qualifications'] + cleaned_original = cleaned_content.lower() + + # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided" + job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master'] + has_job_content = any(indicator in cleaned_original for indicator in job_indicators) + + if has_job_content: + for field in content_fields: + value = refined_data.get(field, "").strip() + if value in ["Not provided", "N/A", ""]: + # LLM failed to extract existing content + print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present") + return None + + # Add the posted_date to the refined data + refined_data['posted_date'] = posted_date + + return refined_data + + except Exception as e: + print(f"LLM refinement failed: {str(e)}") + return None + + def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) + if not json_match: + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if not json_match: + return None + + try: + return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0)) + except json.JSONDecodeError: + return None + + async def save_job_data(self, job_data: Dict[str, Any], keyword: str): + await self._save_to_db(job_data) + await self._save_to_markdown(job_data, keyword) + + async def _save_to_db(self, job_data: Dict[str, Any]): + """Save job data to PostgreSQL database with job_id uniqueness""" + try: + conn = psycopg2.connect( + host=self.db_host, + port=self.db_port, + database="postgres", + user=self.db_username, + password=self.db_password + ) + cursor = conn.cursor() + + cursor.execute(''' + INSERT INTO crypto_jobs + (title, company_name, location, description, requirements, + qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (job_id) DO NOTHING +''', ( + job_data.get("title", "N/A"), + job_data.get("company_name", "N/A"), + job_data.get("location", "N/A"), + job_data.get("description", "N/A"), + job_data.get("requirements", "N/A"), + job_data.get("qualifications", "N/A"), + job_data.get("salary_range", "N/A"), + job_data.get("nature_of_work", "N/A"), + job_data.get("job_id", "N/A"), + job_data.get("url", "N/A"), + job_data.get("category", "N/A"), + job_data.get("scraped_at"), + job_data.get("posted_date", "N/A") + )) + + conn.commit() + cursor.close() + conn.close() + + print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}") + + except Exception as e: + print(f"❌ Database save error: {e}") + + async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): + os.makedirs("linkedin_jobs", exist_ok=True) + filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md") + write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0 + + with open(filepath, "a", encoding="utf-8") as f: + if write_header: + f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") + f.write(f"- **Keyword**: {keyword}\n") + f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n") + f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n") + f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") + f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") + f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") + f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n") + f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n") + f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n") + f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") + f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") + f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") + f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") + f.write("---\n\n") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1e2d98c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ + +playwright==1.48.0 +browserforge==0.1.8 +openai==1.40.0 +psycopg2-binary==2.9.9 +beautifulsoup4==4.12.3 +python-dotenv==1.0.1 +redis==5.0.8 \ No newline at end of file