refactor(llm_agent): switch from XAI to DeepSeek API and simplify job refinement

- Replace XAI/Grok integration with DeepSeek's OpenAI-compatible API
- Remove schema generation and caching logic
- Simplify prompt structure and response parsing
- Standardize database schema and markdown output format
- Update config to use DEEPSEEK_API_KEY instead of XAI_API_KEY
- Change default search keyword in linkedin_main.py
This commit is contained in:
Ofure Ikheloa 2025-12-01 10:25:37 +01:00
parent d7d92ba8bb
commit 4f78a845ae
3 changed files with 94 additions and 103 deletions

View File

@ -8,9 +8,9 @@ from dotenv import load_dotenv
load_dotenv()
# LLM Agent Configuration
GEMINI_API_KEY = os.getenv("XAI_API_KEY")
if not GEMINI_API_KEY:
raise ValueError("XAI_API_KEY environment variable not set in .env file")
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:
raise ValueError("DEEPSEEK_API_KEY environment variable not set in .env file")
def load_spoof_config():

View File

@ -21,7 +21,7 @@ async def main():
scraper = LinkedInJobScraper(engine, human_speed=1.6, user_request="Extract title, company, location, description, requirements, qualifications, nature of job(remote, onsite, hybrid) and salary")
await scraper.scrape_jobs(
search_keywords="Web Designer location:New York",
search_keywords="Lecturer location:New York",
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")

View File

@ -1,6 +1,5 @@
from openai import OpenAI
from typing import Dict, Any, Optional
from typing import Dict, Any
import asyncio
import sqlite3
import os
@ -9,123 +8,91 @@ import json
import re
from dotenv import load_dotenv
# ✅ Actually load .env
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
xai_api_key = os.getenv("XAI_API_KEY")
if not xai_api_key:
raise ValueError("XAI_API_KEY not found in environment variables.")
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
self.client = OpenAI(api_key=xai_api_key, base_url="https://api.x.ai/v1")
self.model = "grok-4-latest"
self.extraction_schema_cache = {}
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat" # or "deepseek-coder" if preferred
def generate_content(self, prompt: str, system_message: str = "You are a helpful assistant.") -> str:
"""Synchronous method to call Grok via xAI API."""
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": prompt}
],
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
stream=False
)
return response.choices[0].message.content or ""
except Exception as e:
print(f"Error in Grok API call: {e}")
print(f"DeepSeek API error: {e}")
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], user_request: str) -> Optional[Dict[str, Any]]:
page_content = raw_data.get('page_content', '')
if not page_content:
return None
schema_key = user_request.lower().strip()
extraction_schema = self.extraction_schema_cache.get(schema_key)
if not extraction_schema:
extraction_schema = await self._generate_extraction_schema(user_request)
if extraction_schema:
self.extraction_schema_cache[schema_key] = extraction_schema
else:
extraction_schema = self._get_default_schema()
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
prompt = f"""
You are a highly skilled web data extraction assistant. Your task is to analyze the raw HTML content of a job posting page and extract specific information requested by the user.
The user's request is: "{user_request}"
The raw HTML content of the page is provided below (limited in size). The content might be noisy or unstructured.
Your goal is to:
1. Analyze the HTML structure to identify relevant sections.
2. Extract the requested information accurately.
3. Clean up formatting issues.
4. If a field cannot be found, use "N/A".
5. Return ONLY the extracted data in a JSON object based on the following schema:
{json.dumps(extraction_schema, indent=2)}
Raw Page Content (HTML):
{page_content[:6000]}
You are a job data extraction assistant. Extract the following fields from the job posting:
- title
- company_name
- location
- description
- requirements
- qualifications
- salary_range
- nature_of_work (remote, onsite, or hybrid)
- job_id
Respond with the JSON object containing the extracted data.
Target Field: {target_field}
Raw Page Content:
{raw_data.get('page_content', '')}
Instructions:
1. Extract only the information relevant to the target field: {target_field}
2. Clean up any formatting issues in the description
3. Standardize location format (city, state/country)
4. Extract salary range if mentioned
5. Determine nature of work from work arrangements
6. Ensure all fields are properly formatted
7. If a field cannot be found, use "N/A"
8. Return ONLY the refined data in JSON format
Response format (only return the JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{raw_data.get('job_id', 'unknown')}",
"url": "{raw_data.get('url', 'N/A')}"
}}
"""
try:
# ✅ Use self (current instance), NOT a new LLMJobRefiner()
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.generate_content(prompt)
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
if not refined_data:
return None
refined_data['job_id'] = raw_data.get('job_id', 'unknown')
refined_data['url'] = raw_data.get('url', 'N/A')
return refined_data
return refined_data if refined_data else None
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
async def _generate_extraction_schema(self, user_request: str) -> Optional[Dict[str, str]]:
schema_prompt = f"""
Based on the user's request: "{user_request}", generate a JSON schema for the data they want to extract from a job posting.
The schema should be a dictionary where keys are field names (snake_case) and values are short descriptions.
Include standard fields like title, company_name, location, description, etc., if relevant.
Respond with only the JSON schema.
"""
try:
# ✅ Use self.generate_content, NOT self.model.generate_content
schema_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.generate_content(schema_prompt)
)
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', schema_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', schema_text, re.DOTALL)
if not json_match:
return None
json_str = json_match.group(1) if '```' in schema_text else json_match.group(0)
return json.loads(json_str)
except Exception as e:
print(f"Schema generation failed: {str(e)}")
return None
def _get_default_schema(self) -> Dict[str, str]:
return {
"title": "The job title",
"company_name": "The name of the company",
"location": "The location of the job",
"description": "The full job description",
"requirements": "List of job requirements",
"qualifications": "List of required qualifications",
"salary_range": "The salary range mentioned",
"nature_of_work": "Remote, onsite, or hybrid"
}
def _parse_llm_response(self, response_text: str) -> Optional[Dict[str, Any]]:
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
@ -146,12 +113,30 @@ class LLMJobRefiner:
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
fields = list(job_data.keys())
placeholders = ', '.join(['?' for _ in fields])
columns = ', '.join([f'"{col}"' for col in fields]) # Escape column names
cursor.execute(f"CREATE TABLE IF NOT EXISTS jobs ({columns})")
cursor.execute(f'INSERT INTO jobs ({columns}) VALUES ({placeholders})',
[job_data.get(field, 'N/A') for field in fields])
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
title TEXT, company_name TEXT, location TEXT, description TEXT,
requirements TEXT, qualifications TEXT, salary_range TEXT,
nature_of_work TEXT, job_id TEXT, url TEXT
)
''')
cursor.execute('''
INSERT OR IGNORE INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A")
))
conn.commit()
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
@ -164,7 +149,13 @@ class LLMJobRefiner:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- **Keyword**: {keyword}\n")
for key, value in job_data.items():
if key != 'title':
f.write(f"- **{key.replace('_', ' ').title()}**: {value}\n")
f.write("\n---\n\n")
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")