refactor: update timeout values in job scraper classes feat: add spoof config for renderers and vendors build: update pycache files for config and modules
166 lines
6.3 KiB
Python
166 lines
6.3 KiB
Python
import google.generativeai as genai
|
|
from typing import Dict, Any
|
|
import asyncio
|
|
import sqlite3
|
|
import os
|
|
from datetime import datetime
|
|
from config import GEMINI_API_KEY
|
|
|
|
class LLMJobRefiner:
|
|
def __init__(self):
|
|
genai.configure(api_key=GEMINI_API_KEY)
|
|
self.model = genai.GenerativeModel('gemini-latest-flash')
|
|
|
|
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
|
|
"""
|
|
Refine raw job data using Gemini LLM based on target field
|
|
"""
|
|
prompt = f"""
|
|
You are a job data extraction assistant. Extract the following fields from the job posting:
|
|
- title
|
|
- company_name
|
|
- location
|
|
- description
|
|
- requirements
|
|
- qualifications
|
|
- salary_range
|
|
- nature_of_work (remote, onsite, or hybrid)
|
|
- job_id
|
|
|
|
Target Field: {target_field}
|
|
Raw Page Content:
|
|
{raw_data.get('page_content', '')[:6000]} # Limit content size
|
|
|
|
Instructions:
|
|
1. Extract only the information relevant to the target field: {target_field}
|
|
2. Clean up any formatting issues in the description
|
|
3. Standardize location format (city, state/country)
|
|
4. Extract salary range if mentioned in description
|
|
5. Determine nature of work (remote, onsite, or hybrid) from work arrangements
|
|
6. Ensure all fields are properly formatted
|
|
7. If a field cannot be found, use "N/A"
|
|
8. Return the refined data in JSON format
|
|
|
|
Response format (only return the JSON):
|
|
{{
|
|
"title": "...",
|
|
"company_name": "...",
|
|
"location": "...",
|
|
"description": "...",
|
|
"requirements": "...",
|
|
"qualifications": "...",
|
|
"salary_range": "...",
|
|
"nature_of_work": "...",
|
|
"job_id": "{raw_data.get('job_id', 'unknown')}",
|
|
"url": "{raw_data.get('url', 'N/A')}"
|
|
}}
|
|
"""
|
|
|
|
try:
|
|
response = await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: self.model.generate_content(prompt)
|
|
)
|
|
|
|
# Parse the response and return refined data
|
|
refined_data = self._parse_llm_response(response.text)
|
|
|
|
# If parsing fails, return None
|
|
if not refined_data:
|
|
return None
|
|
|
|
return refined_data
|
|
|
|
except Exception as e:
|
|
print(f"LLM refinement failed: {str(e)}")
|
|
return None
|
|
|
|
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse the LLM response to extract refined job data
|
|
"""
|
|
import json
|
|
import re
|
|
|
|
# Extract JSON from response (handle markdown code blocks)
|
|
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(1)
|
|
else:
|
|
# If no code block, try to find JSON directly
|
|
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(0)
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
return json.loads(json_str)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
|
|
"""
|
|
Save job data to both markdown and database
|
|
"""
|
|
# Save to database
|
|
await self._save_to_db(job_data)
|
|
|
|
# Save to markdown
|
|
await self._save_to_markdown(job_data, keyword)
|
|
|
|
async def _save_to_db(self, job_data: Dict[str, Any]):
|
|
"""
|
|
Save job data to database
|
|
"""
|
|
db_path = "linkedin_jobs.db"
|
|
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
|
|
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO jobs
|
|
(title, company_name, location, description, requirements,
|
|
qualifications, salary_range, nature_of_work, job_id, url)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
job_data.get("title", "N/A"),
|
|
job_data.get("company_name", "N/A"),
|
|
job_data.get("location", "N/A"),
|
|
job_data.get("description", "N/A"),
|
|
job_data.get("requirements", "N/A"),
|
|
job_data.get("qualifications", "N/A"),
|
|
job_data.get("salary_range", "N/A"),
|
|
job_data.get("nature_of_work", "N/A"),
|
|
job_data.get("job_id", "N/A"),
|
|
job_data.get("url", "N/A")
|
|
))
|
|
conn.commit()
|
|
|
|
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
|
|
"""
|
|
Save job data to markdown file
|
|
"""
|
|
os.makedirs("linkedin_jobs", exist_ok=True)
|
|
|
|
# Create a single markdown file for all jobs
|
|
filename = "linkedin_jobs_scraped.md"
|
|
filepath = os.path.join("linkedin_jobs", filename)
|
|
|
|
with open(filepath, "a", encoding="utf-8") as f:
|
|
# Only write header if file is empty
|
|
if os.path.getsize(filepath) == 0:
|
|
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
|
|
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
|
|
f.write(f"- **Keyword**: {keyword}\n")
|
|
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
|
|
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
|
|
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
|
|
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
|
|
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
|
|
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
|
|
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
|
|
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
|
|
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
|
|
f.write("---\n\n") |